Thanks Ankur for your reply.

By default the allowed lateness for a global window is zero but we can also
set  it to be non-zero which will be used in the downstream transforms
where group by or window into with trigger is happening ?
 (using allowedTimeStampSkew for unbounded sources/ sources which have
timestamped elements).

In both scenarios which I described earlier for *source transforms* is it
possible that the pipeline will drop data if I do not
specify allowedTimeStampSkew/ allowedLateness at the source
transforms(given I have late arriving data)? Can I just set allowed
lateness in the transform where I do groupBy or windowInto rather than
source.

In case of TextIO.read which reads from a bounded source and I assign
Timestamps to all elements in the second transform, will it be useful in
this case as well to set allowedTimeStampSkew after assigning timestamps? I
am trying to understand how the elements will be available after assigning
timestamps (Given all files are present on file system), will they be
ordered by timestamp, can some elements be read after watermark has
progressed above an element's event time  ?


TextIO.Read.
     |. Bounded source
     |. Global Window
     |.  -infinity watermark
apply
WithTimeStamps (Based on a timestamp attribute in file)
   |.   timestamped elements (watermark starts from -infinity and follows
the timestamp from timestamp attribute)
   |.   Global Window


Regards,
Amit

On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <[email protected]> wrote:

> Hi Amit,
>
> As you don't have any GroupByKey or trigger in your pipeline, you don't
> need to do allowed lateness.
> For unbounded source, Global window will never fire a trigger or emit
> GroupByKey.
> In the code you linked, a trigger is used which uses allowedLateness.
>
> Thanks,
> Ankur
>
> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <[email protected]> wrote:
>
>> Thanks Jan!
>> I have a question based on this on Global Window and allowed lateness,
>> with default trigger for the following
>>  scenarios:
>>
>> Case 1-
>> TextIO.Read.
>>      |. Bounded source
>>      |. Global Window
>>      |.  -infinity watermark
>> apply
>> WithTimeStamps (Based on a timestamp attribute in file)
>>    |.   timestamped elements (watermark starts from -infinity and follows
>> the timestamp from timestamp attribute)
>>    |.   Global Window
>>    |. (Will I never need to do allowedLateness in this case with default
>> trigger? Will there be any benefit since the window is global and watermark
>> will pass the end of window when everything is processed ?  )
>>
>>
>> Case 2 -
>> KinesisIO.read
>>     | .Unbounded Source
>>     |. Default Global Window
>>     |. watermark based on arrival time
>>  apply
>> WithTimeStamps (Based on a timestamp attribute from the stream)
>>    |.   timestamped elements  ( watermark follows the timestamp from
>> timestamp attribute)
>>    |.   Global Window
>>    |. Watermark based on event timestamp.
>>    | Same question here will there be any benefit of using
>> allowedLateness since window is global ?
>>
>> In the code example below allowedLateness is used for global window ?
>>
>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>>
>> Regards,
>> Amit
>>
>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Hi Amit,
>>>
>>> the window function applied by default is
>>> WindowingStrategy.globalDefault(), [1] - global window with zero allowed
>>> lateness.
>>>
>>> Cheers,
>>>
>>>   Jan
>>>
>>> [1]
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>>
>>> On 3/31/20 10:22 AM, amit kumar wrote:
>>> > Hi All,
>>> >
>>> > Is there a default WindowFn that gets applied to elements of an
>>> > unbounded source.
>>> >
>>> > For example, if I have a Kinesis input source ,for which all elements
>>> > are timestamped with ArrivalTime, what will be the default windowing
>>> > applied to the output of read transform ?
>>> >
>>> > Is this runner dependent ?
>>> >
>>> > Regards,
>>> > Amit
>>>
>>

Reply via email to