This thread has a lot in it, so I am just top-posting.
- Stateful DoFn is a windowed operation; state is per-window. When the
window expires, any further inputs are dropped.
- "Late" is not synonymous with out-of-order. It doesn't really have an
independent meaning.
- For a GBK/Combine "late" means "not included prior to the on-time
output", and "droppable" means "arriving after window expiry".
- For Stateful DoFn there is no real meaning to "late" except if one is
talking about "droppable", which still means "arriving after window
expiry". A user may have a special timer where they flip a flag and treat
elements after the timer differently.
I think the definition of when data is droppable is very simple. We
explicitly moved to this definition, away from the "out of order == late",
because it is more robust and simpler to think about. Users saw lots of
confusing behavior when we had "out of order by allowed lateness ==
droppable" logic.
Kenn
On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <[email protected]> wrote:
> > Generally the watermark update can overtake elements, because runners
> can explicitly ignore late data in the watermark calculation (for good
> reason - those elements are already late, so no need to hold up the
> watermark advancing any more).
> This seems not to affect the decision of _not late_ vs. _late_, is it? If
> element is late and gets ignored from watermark calculation (whatever that
> includes in this context), than the watermark cannot move past elements
> that were not marked as _not late_ and thus nothing can make them _late_.
>
> > For GBK on-time data simply means the first pane marked as on time. For
> state+timers I don't think it makes sense for Beam to define on-time v.s.
> late, rather I think the user can come up with their own definition
> depending on their use case. For example, if you are buffering data into
> BagState and setting a timer to process it, it would be logical to say that
> any element that was buffered before the timer expired is on time, and any
> data that showed up after the timer fired is late. This would roughly
> correspond to what GBK does, and the answer would be very similar to simply
> comparing against the watermark (as the timers fire when the watermark
> advances).
>
> Yes, I'd say that stateful DoFns don't have (well defined) concept of
> pane, because that is related to concept of trigger and this is a concept
> of GBK (or windowed operations in general). The only semantic meaning of
> window in stateful DoFn is that it "scopes" state.
>
> This discussion might have got a little off the original question, so I'll
> try to rephrase it:
>
> Should stateful DoFn drop *all* late data, not just data that arrive after
> window boundary + allowed lateness? Some arguments why I think it should:
> * in windowed operations (GBK), it is correct to drop data on window
> boundaries only, because time (as seen by user) effectively hops only on
> these discrete time points
> * in stateful dofn on the other hand time move "smoothly" (yes, with some
> granularity, millisecond, nanosecond, whatever and with watermark updates
> only, but still)
> * this could be viewed that dropping late data immediately as time
> (again, from user perspective) moves (not on some more or less artificial
> boundary having only little semantic meaning) is consistent with both the
> above properties
>
> The negative side effect of this would be, that more data could be
> dropped, but ... isn't this what defines allowed lateness? I don't want to
> discuss the implications on user pipelines of such a change (and if we can
> or cannot do it), just trying to build some theoretical understanding of
> the problem as a whole. The decision if any change could / should be made
> can be done afterwards.
>
> Thanks,
> Jan
>
> On 1/4/20 10:35 PM, Reuven Lax wrote:
>
>
>
> On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <[email protected]> wrote:
>
>> > Yes, but invariants should hold. If I add a ParDo that drops late
>> elements (or, more commonly,diverts the late elements to a different
>> PCollection), then the result of that ParDo should _never_ introduce and
>> more late data. This cannot be guaranteed simply with watermark checks. The
>> ParDo may decide that the element was not late, but by the time it outputs
>> the element the watermark may have advanced, causing the element to
>> actually be late.
>>
>> This is actually very interesting. The question is - if I decide about
>> lateness based on output watermark of a PTransform, is it still the case,
>> that in downstream operator(s) the element could be changed from "not late"
>> to "late"? Provided the output watermark is updated synchronously based on
>> input data (which should be) and watermark update cannot "overtake"
>> elements, I think that the downstream decision should not be changed, so
>> the invariant should hold. Or am I missing something?
>>
>
> Generally the watermark update can overtake elements, because runners can
> explicitly ignore late data in the watermark calculation (for good reason -
> those elements are already late, so no need to hold up the watermark
> advancing any more).
>
> For GBK on-time data simply means the first pane marked as on time. For
> state+timers I don't think it makes sense for Beam to define on-time v.s.
> late, rather I think the user can come up with their own definition
> depending on their use case. For example, if you are buffering data into
> BagState and setting a timer to process it, it would be logical to say that
> any element that was buffered before the timer expired is on time, and any
> data that showed up after the timer fired is late. This would roughly
> correspond to what GBK does, and the answer would be very similar to simply
> comparing against the watermark (as the timers fire when the watermark
> advances).
>
> Reuven
>
>> On 1/4/20 8:11 PM, Reuven Lax wrote:
>>
>>
>>
>> On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <[email protected]> wrote:
>>
>>> On 1/4/20 6:14 PM, Reuven Lax wrote:
>>>
>>> There is a very good reason not to define lateness directly in terms of
>>> the watermark. The model does not make any guarantees that the watermark
>>> advances synchronously, and in fact for the Dataflow runner the watermark
>>> advances asynchronously (i.e. independent of element processing). This
>>> means that simply comparing an element timestamp against the watermark
>>> creates a race condition. There are cases where the answer could change
>>> depending on exactly when you examine the watermark, and if you examine
>>> again while processing the same bundle you might come to a different
>>> conclusion about lateness.
>>>
>>> Due to monotonicity of watermark, I don't think that the asynchronous
>>> updates of watermark can change the answer from "late" to "not late". That
>>> seems fine to me.
>>>
>>
>> It's the other way around. You check to see whether an element is late
>> and the answer is "not late." An instant later the answer changes to
>> "late" This does cause many problems, and is why this was changed.
>>
>>>
>>> This non determinism is undesirable when considering lateness, as it can
>>> break many invariants that users may rely on (e.g. if I could write a ParDo
>>> that filtered all late data, yet still find late data showing up downstream
>>> of the ParDo which would be very surprising). For that reason, the SDK
>>> always marks things as late based on deterministic signals. e.g. for a
>>> triggered GBK everything in the first post-watermark pane is marked as on
>>> time (no matter what the watermark is) and everything in subsequent panes
>>> is marked as late.
>>>
>>> Dropping latecomers will always be non-deterministic, that is certain.
>>> This is true even in case where watermark is updated synchronously with
>>> element processing, due to shuffling and varying (random) differences of
>>> processing and event time in upstream operator(s). The question was only if
>>> a latecomer should be dropped only at a window boundaries only (which is a
>>> sort of artificial time boundary), or right away when spotted (in stateful
>>> dofns only). Another question would be if latecomers should be dropped
>>> based on input or output watermark, dropping based on output watermark
>>> seems even to be stable in the sense, that all downstream operators should
>>> come to the same conclusion (this is a bit of a speculation).
>>>
>>
>> Yes, but invariants should hold. If I add a ParDo that drops late
>> elements (or, more commonly,diverts the late elements to a different
>> PCollection), then the result of that ParDo should _never_ introduce and
>> more late data. This cannot be guaranteed simply with watermark checks. The
>> ParDo may decide that the element was not late, but by the time it outputs
>> the element the watermark may have advanced, causing the element to
>> actually be late.
>>
>> In practice this is important. And early version of Dataflow (pre Beam)
>> implemented lateness by comparing against the watermark, and it caused no
>> end of trouble for users.
>>
>>>
>>> FYI - this is also the reason why Beam does not currently provide users
>>> direct access to the watermark. The asynchronous nature of it can be very
>>> confusing, and often results in users writing bugs in their pipelines. We
>>> decided instead to expose easier-to-reason-about signals such as timers
>>> (triggered by the watermark), windows, and lateness.
>>>
>>> Reuven
>>>
>>> On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected]> wrote:
>>>
>>>> I realized the problem. I misinterpreted the
>>>> LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after
>>>> watermark - allowed lateness) data, but only data, that arrive after
>>>> maxTimestamp + allowedLateness of their respective windows.
>>>>
>>>> Stateful DoFn can run on global window (which was the case of my tests)
>>>> and there is no dropping then.
>>>>
>>>> Two questions arise then:
>>>>
>>>> a) does it mean that this is one more argument to move this logic to
>>>> StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on window GC
>>>> time, so without LateDataDroppingDoFnRunner and late data will see empty
>>>> state and will produce wrong results.
>>>>
>>>> b) is this behavior generally intentional and correct? Windows and
>>>> triggers are (in my point of view) features of GBK, not stateful DoFn.
>>>> Stateful DoFn is a low level primitive, which can be viewed to operate on
>>>> "instant" windows, which should then probably be defined as dropping every
>>>> single element arrive after allowed lateness. This might probably relate to
>>>> question if operations should be built bottom up from most primitive and
>>>> generic ones to more specific ones - that is GBK be implemented on top of
>>>> stateful DoFn and not vice versa.
>>>>
>>>> Thoughts?
>>>>
>>>> Jan
>>>> On 1/4/20 1:03 AM, Steve Niemitz wrote:
>>>>
>>>> I do agree that the direct runner doesn't drop late data arriving at a
>>>> stateful DoFn (I just tested as well).
>>>>
>>>> However, I believe this is consistent with other runners. I'm fairly
>>>> certain (at least last time I checked) that at least Dataflow will also
>>>> only drop late data at GBK operations, and NOT stateful DoFns. Whether or
>>>> not this is intentional is debatable however, without being able to inspect
>>>> the watermark inside the stateful DoFn, it'd be very difficult to do
>>>> anything useful with late data.
>>>>
>>>>
>>>> On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> I did write a test that tested if data is dropped in a plain stateful
>>>>> DoFn. I did this as part of validating that PR [1] didn't drop more data
>>>>> when using @RequiresTimeSortedInput than it would without this annotation.
>>>>> This test failed and I didn't commit it, yet.
>>>>>
>>>>> The test was basically as follows:
>>>>>
>>>>> - use TestStream to generate three elements with timestamps 2, 1 and 0
>>>>>
>>>>> - between elements with timestamp 1 and 0 move watermark to 1
>>>>>
>>>>> - use allowed lateness of zero
>>>>>
>>>>> - use stateful dofn that just emits arbitrary data for each input
>>>>> element
>>>>>
>>>>> - use Count.globally to count outputs
>>>>>
>>>>> The outcome was that stateful dofn using @RequiresTimeSortedInput
>>>>> output 2 elements, without the annotation it was 3 elements. I think the
>>>>> correct one would be 2 elements in this case. The difference is caused by
>>>>> the annotation having (currently) its own logic for dropping data, which
>>>>> could be removed if we agree, that the data should be dropped in all
>>>>> cases.
>>>>> On 1/3/20 11:23 PM, Kenneth Knowles wrote:
>>>>>
>>>>> Did you write such a @Category(ValidatesRunner.class) test? I believe
>>>>> the Java direct runner does drop late data, for both GBK and stateful
>>>>> ParDo.
>>>>>
>>>>> Stateful ParDo is implemented on top of GBK:
>>>>> https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172
>>>>>
>>>>> And GroupByKey, via DirectGroupByKey, via DirectGroupAlsoByWindow,
>>>>> does drop late data:
>>>>> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220
>>>>>
>>>>> I'm not sure why it has its own code, since ReduceFnRunner also drops
>>>>> late data, and it does use ReduceFnRunner (the same code path all
>>>>> Java-based runners use).
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>> On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> Yes, the non-reliability of late data dropping in distributed runner
>>>>>> is understood. But this is even where DirectRunner can play its role,
>>>>>> because only there it is actually possible to emulate and test specific
>>>>>> watermark conditions. Question regarding this for the java DirectRunner -
>>>>>> should we completely drop LataDataDroppingDoFnRunner and delegate the
>>>>>> late
>>>>>> data dropping to StatefulDoFnRunner? Seems logical to me, as if we agree
>>>>>> that late data should always be dropped, then there would no "valid" use
>>>>>> of
>>>>>> StatefulDoFnRunner without the late data dropping functionality.
>>>>>> On 1/3/20 9:32 PM, Robert Bradshaw wrote:
>>>>>>
>>>>>> I agree, in fact we just recently enabled late data dropping to the
>>>>>> direct runner in Python to be able to develop better tests for Dataflow.
>>>>>>
>>>>>> It should be noted, however, that in a distributed runner (absent the
>>>>>> quiessence of TestStream) that one can't *count* on late data being
>>>>>> dropped
>>>>>> at a certain point, and in fact (due to delays in fully propagating the
>>>>>> watermark) late data can even become on-time, so the promises about what
>>>>>> happens behind the watermark are necessarily a bit loose.
>>>>>>
>>>>>> On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> I agree that the DirectRunner should drop late data. Late data
>>>>>>> dropping is optional but the DirectRunner is used by many for testing
>>>>>>> and
>>>>>>> we should have the same behaviour they would get on other runners or
>>>>>>> users
>>>>>>> may be surprised.
>>>>>>>
>>>>>>> On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I just found out that DirectRunner is apparently not using
>>>>>>>> LateDataDroppingDoFnRunner, which means that it doesn't drop late
>>>>>>>> data
>>>>>>>> in cases where there is no GBK operation involved (dropping in GBK
>>>>>>>> seems
>>>>>>>> to be correct). There is apparently no @Category(ValidatesRunner)
>>>>>>>> test
>>>>>>>> for that behavior (because DirectRunner would fail it), so the
>>>>>>>> question
>>>>>>>> is - should late data dropping be considered part of model (of
>>>>>>>> which
>>>>>>>> DirectRunner should be a canonical implementation) and therefore
>>>>>>>> that
>>>>>>>> should be fixed there, or is the late data dropping an optional
>>>>>>>> feature
>>>>>>>> of a runner?
>>>>>>>>
>>>>>>>> I'm strongly in favor of the first option, and I think it is likely
>>>>>>>> that
>>>>>>>> all real-world runners would probably adhere to that (I didn't
>>>>>>>> check
>>>>>>>> that, though).
>>>>>>>>
>>>>>>>> Opinions?
>>>>>>>>
>>>>>>>> Jan
>>>>>>>>
>>>>>>>>