On Wed, Apr 1, 2020 at 12:53 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Amit,
>
> answers inline.
> On 4/1/20 12:23 AM, amit kumar wrote:
>
> Thanks Ankur for your reply.
>
> By default the allowed lateness for a global window is zero but we can
> also set  it to be non-zero which will be used in the downstream transforms
> where group by or window into with trigger is happening ?
>  (using allowedTimeStampSkew for unbounded sources/ sources which have
> timestamped elements).
>
> Setting allowedLateness for global window has no semantic meaning, because
> global window will be triggered (using default trigger) only at the end of
> input. Allowed lateness plays no role in that for global window.
>
> allowedTimestampSkew is used for something different, it is used when you
> reassign timestamps to elements which already have timestamps (e.g.
> assigned by source) and you want to move them into past. The skew says how
> far in the past you can go.
>
>
> In both scenarios which I described earlier for *source transforms* is it
> possible that the pipeline will drop data if I do not
> specify allowedTimeStampSkew/ allowedLateness at the source
> transforms(given I have late arriving data)? Can I just set allowed
> lateness in the transform where I do groupBy or windowInto rather than
> source.
>
> AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not
> the source. The source emits _watermarks_, which marks progress in event
> time, but the data is then handled in the stateful operator. Each operator
> can have its own allowedLateness (although the model ensures that the
> lateness is by default inherited from one operator to the other). Sources
> should simply assign elements to global windows (with no allowed lateness,
> as allowed lateness has no meaning for global windows as mentioned above).
>
>
> In case of TextIO.read which reads from a bounded source and I assign
> Timestamps to all elements in the second transform, will it be useful in
> this case as well to set allowedTimeStampSkew after assigning timestamps? I
> am trying to understand how the elements will be available after assigning
> timestamps (Given all files are present on file system), will they be
> ordered by timestamp, can some elements be read after watermark has
> progressed above an element's event time  ?
>
> When executing batch pipeline, there is actually no watermark. Event time
> moves discretely from -inf (computation not finished yet) to +inf
> (computation finished). In the case you describe, you should not even need
> to set allowedTimestampSkew, because elements output from TextIO should
> (probably) be assigned timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE (I'm
> not sure if the model guarantees this, but it seems reasonable). You can
> then reassign timestamps to the future as you wish. You don't have to worry
> about allowed lateness either, because that only applies to streaming
> pipelines, where event time moves more smoothly. By the definition of how
> event time progresses in case of batch pipelines, there is no "late" (after
> watermark) data in this case.
>

Clarification: sources should assign elements to their upstream window
(similar to DoFns), generally with the appropriate timestamp (unless they
are timestamp aware). The upstream of a bounded source is typically
Impulse, which is in the global window with MIN_TIMESTAMP, but could be
different. This better unifies the case of reading the elements from a set
of filenames published to pubsub, for example.

>
>
> TextIO.Read.
>      |. Bounded source
>      |. Global Window
>      |.  -infinity watermark
> apply
> WithTimeStamps (Based on a timestamp attribute in file)
>    |.   timestamped elements (watermark starts from -infinity and follows
> the timestamp from timestamp attribute)
>    |.   Global Window
>
>
> Regards,
> Amit
>
> On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <goe...@google.com> wrote:
>
>> Hi Amit,
>>
>> As you don't have any GroupByKey or trigger in your pipeline, you don't
>> need to do allowed lateness.
>> For unbounded source, Global window will never fire a trigger or emit
>> GroupByKey.
>> In the code you linked, a trigger is used which uses allowedLateness.
>>
>> Thanks,
>> Ankur
>>
>> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <akdata...@gmail.com> wrote:
>>
>>> Thanks Jan!
>>> I have a question based on this on Global Window and allowed lateness,
>>> with default trigger for the following
>>>  scenarios:
>>>
>>> Case 1-
>>> TextIO.Read.
>>>      |. Bounded source
>>>      |. Global Window
>>>      |.  -infinity watermark
>>> apply
>>> WithTimeStamps (Based on a timestamp attribute in file)
>>>    |.   timestamped elements (watermark starts from -infinity and
>>> follows the timestamp from timestamp attribute)
>>>    |.   Global Window
>>>    |. (Will I never need to do allowedLateness in this case with
>>> default trigger? Will there be any benefit since the window is global and
>>> watermark will pass the end of window when everything is processed ?  )
>>>
>>>
>>> Case 2 -
>>> KinesisIO.read
>>>     | .Unbounded Source
>>>     |. Default Global Window
>>>     |. watermark based on arrival time
>>>  apply
>>> WithTimeStamps (Based on a timestamp attribute from the stream)
>>>    |.   timestamped elements  ( watermark follows the timestamp from
>>> timestamp attribute)
>>>    |.   Global Window
>>>    |. Watermark based on event timestamp.
>>>    | Same question here will there be any benefit of using
>>> allowedLateness since window is global ?
>>>
>>> In the code example below allowedLateness is used for global window ?
>>>
>>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307
>>>
>>> Regards,
>>> Amit
>>>
>>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Amit,
>>>>
>>>> the window function applied by default is
>>>> WindowingStrategy.globalDefault(), [1] - global window with zero
>>>> allowed
>>>> lateness.
>>>>
>>>> Cheers,
>>>>
>>>>   Jan
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105
>>>>
>>>> On 3/31/20 10:22 AM, amit kumar wrote:
>>>> > Hi All,
>>>> >
>>>> > Is there a default WindowFn that gets applied to elements of an
>>>> > unbounded source.
>>>> >
>>>> > For example, if I have a Kinesis input source ,for which all elements
>>>> > are timestamped with ArrivalTime, what will be the default windowing
>>>> > applied to the output of read transform ?
>>>> >
>>>> > Is this runner dependent ?
>>>> >
>>>> > Regards,
>>>> > Amit
>>>>
>>>

Reply via email to