On Wed, Apr 1, 2020 at 12:53 AM Jan Lukavský <je...@seznam.cz> wrote:
> Hi Amit, > > answers inline. > On 4/1/20 12:23 AM, amit kumar wrote: > > Thanks Ankur for your reply. > > By default the allowed lateness for a global window is zero but we can > also set it to be non-zero which will be used in the downstream transforms > where group by or window into with trigger is happening ? > (using allowedTimeStampSkew for unbounded sources/ sources which have > timestamped elements). > > Setting allowedLateness for global window has no semantic meaning, because > global window will be triggered (using default trigger) only at the end of > input. Allowed lateness plays no role in that for global window. > > allowedTimestampSkew is used for something different, it is used when you > reassign timestamps to elements which already have timestamps (e.g. > assigned by source) and you want to move them into past. The skew says how > far in the past you can go. > > > In both scenarios which I described earlier for *source transforms* is it > possible that the pipeline will drop data if I do not > specify allowedTimeStampSkew/ allowedLateness at the source > transforms(given I have late arriving data)? Can I just set allowed > lateness in the transform where I do groupBy or windowInto rather than > source. > > AllowedLateness is parameter of stateful operation (e.g. GroupByKey) not > the source. The source emits _watermarks_, which marks progress in event > time, but the data is then handled in the stateful operator. Each operator > can have its own allowedLateness (although the model ensures that the > lateness is by default inherited from one operator to the other). Sources > should simply assign elements to global windows (with no allowed lateness, > as allowed lateness has no meaning for global windows as mentioned above). > > > In case of TextIO.read which reads from a bounded source and I assign > Timestamps to all elements in the second transform, will it be useful in > this case as well to set allowedTimeStampSkew after assigning timestamps? I > am trying to understand how the elements will be available after assigning > timestamps (Given all files are present on file system), will they be > ordered by timestamp, can some elements be read after watermark has > progressed above an element's event time ? > > When executing batch pipeline, there is actually no watermark. Event time > moves discretely from -inf (computation not finished yet) to +inf > (computation finished). In the case you describe, you should not even need > to set allowedTimestampSkew, because elements output from TextIO should > (probably) be assigned timestamp of BoundedWindow.TIMESTAMP_MIN_VALUE (I'm > not sure if the model guarantees this, but it seems reasonable). You can > then reassign timestamps to the future as you wish. You don't have to worry > about allowed lateness either, because that only applies to streaming > pipelines, where event time moves more smoothly. By the definition of how > event time progresses in case of batch pipelines, there is no "late" (after > watermark) data in this case. > Clarification: sources should assign elements to their upstream window (similar to DoFns), generally with the appropriate timestamp (unless they are timestamp aware). The upstream of a bounded source is typically Impulse, which is in the global window with MIN_TIMESTAMP, but could be different. This better unifies the case of reading the elements from a set of filenames published to pubsub, for example. > > > TextIO.Read. > |. Bounded source > |. Global Window > |. -infinity watermark > apply > WithTimeStamps (Based on a timestamp attribute in file) > |. timestamped elements (watermark starts from -infinity and follows > the timestamp from timestamp attribute) > |. Global Window > > > Regards, > Amit > > On Tue, Mar 31, 2020 at 11:26 AM Ankur Goenka <goe...@google.com> wrote: > >> Hi Amit, >> >> As you don't have any GroupByKey or trigger in your pipeline, you don't >> need to do allowed lateness. >> For unbounded source, Global window will never fire a trigger or emit >> GroupByKey. >> In the code you linked, a trigger is used which uses allowedLateness. >> >> Thanks, >> Ankur >> >> On Tue, Mar 31, 2020 at 11:20 AM amit kumar <akdata...@gmail.com> wrote: >> >>> Thanks Jan! >>> I have a question based on this on Global Window and allowed lateness, >>> with default trigger for the following >>> scenarios: >>> >>> Case 1- >>> TextIO.Read. >>> |. Bounded source >>> |. Global Window >>> |. -infinity watermark >>> apply >>> WithTimeStamps (Based on a timestamp attribute in file) >>> |. timestamped elements (watermark starts from -infinity and >>> follows the timestamp from timestamp attribute) >>> |. Global Window >>> |. (Will I never need to do allowedLateness in this case with >>> default trigger? Will there be any benefit since the window is global and >>> watermark will pass the end of window when everything is processed ? ) >>> >>> >>> Case 2 - >>> KinesisIO.read >>> | .Unbounded Source >>> |. Default Global Window >>> |. watermark based on arrival time >>> apply >>> WithTimeStamps (Based on a timestamp attribute from the stream) >>> |. timestamped elements ( watermark follows the timestamp from >>> timestamp attribute) >>> |. Global Window >>> |. Watermark based on event timestamp. >>> | Same question here will there be any benefit of using >>> allowedLateness since window is global ? >>> >>> In the code example below allowedLateness is used for global window ? >>> >>> https://github.com/apache/beam/blob/828b897a2439437d483b1bd7f2a04871f077bde0/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java#L307 >>> >>> Regards, >>> Amit >>> >>> On Tue, Mar 31, 2020 at 2:34 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Amit, >>>> >>>> the window function applied by default is >>>> WindowingStrategy.globalDefault(), [1] - global window with zero >>>> allowed >>>> lateness. >>>> >>>> Cheers, >>>> >>>> Jan >>>> >>>> [1] >>>> >>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java#L105 >>>> >>>> On 3/31/20 10:22 AM, amit kumar wrote: >>>> > Hi All, >>>> > >>>> > Is there a default WindowFn that gets applied to elements of an >>>> > unbounded source. >>>> > >>>> > For example, if I have a Kinesis input source ,for which all elements >>>> > are timestamped with ArrivalTime, what will be the default windowing >>>> > applied to the output of read transform ? >>>> > >>>> > Is this runner dependent ? >>>> > >>>> > Regards, >>>> > Amit >>>> >>>