> Generally the watermark update can overtake elements, because runners  can explicitly ignore late data in the watermark calculation (for good reason - those elements are already late, so no need to hold up the watermark advancing any more).

This seems not to affect the decision of _not late_ vs. _late_, is it? If element is late and gets ignored from watermark calculation (whatever that includes in this context), than the watermark cannot move past elements that were not marked as _not late_ and thus nothing can make them _late_.

> For GBK on-time data simply means the first pane marked as on time. For state+timers I don't think it makes sense for Beam to define on-time v.s. late, rather I think the user can come up with their own definition depending on their use case. For example, if you are buffering data into BagState and setting a timer to process it, it would be logical to say that any element that was buffered before the timer expired is on time, and any data that showed up after the timer fired is late. This would roughly correspond to what GBK does, and the answer would be very similar to simply comparing against the watermark (as the timers fire when the watermark advances).

Yes, I'd say that stateful DoFns don't have (well defined) concept of pane, because that is related to concept of trigger and this is a concept of GBK (or windowed operations in general). The only semantic meaning of window in stateful DoFn is that it "scopes" state.

This discussion might have got a little off the original question, so I'll try to rephrase it:

Should stateful DoFn drop *all* late data, not just data that arrive after window boundary + allowed lateness? Some arguments why I think it should:  * in windowed operations (GBK), it is correct to drop data on window boundaries only, because time (as seen by user) effectively hops only on these discrete time points  * in stateful dofn on the other hand time move "smoothly" (yes, with some granularity, millisecond, nanosecond, whatever and with watermark updates only, but still)  * this could be viewed that dropping late data immediately as time (again, from user perspective) moves (not on some more or less artificial boundary having only little semantic meaning) is consistent with both the above properties

The negative side effect of this would be, that more data could be dropped, but ... isn't this what defines allowed lateness? I don't want to discuss the implications on user pipelines of such a change (and if we can or cannot do it), just trying to build some theoretical understanding of the problem as a whole. The decision if any change could / should be made can be done afterwards.

Thanks,
 Jan

On 1/4/20 10:35 PM, Reuven Lax wrote:


On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    > Yes, but invariants should hold. If I add a ParDo that drops
    late elements (or, more commonly,diverts the late elements  to a
    different PCollection), then the result of that ParDo should
    _never_ introduce and more late data. This cannot be guaranteed
    simply with watermark checks. The ParDo may decide that the
    element was not late, but by the time it outputs the element the
    watermark may have advanced, causing the element to actually be late.

    This is actually very interesting. The question is - if I decide
    about lateness based on output watermark of a PTransform, is it
    still the case, that in downstream operator(s) the element could
    be changed from "not late" to "late"? Provided the output
    watermark is updated synchronously based on input data (which
    should be) and watermark update cannot "overtake" elements, I
    think that the downstream decision should not be changed, so the
    invariant should hold. Or am I missing something?


Generally the watermark update can overtake elements, because runners  can explicitly ignore late data in the watermark calculation (for good reason - those elements are already late, so no need to hold up the watermark advancing any more).

For GBK on-time data simply means the first pane marked as on time. For state+timers I don't think it makes sense for Beam to define on-time v.s. late, rather I think the user can come up with their own definition depending on their use case. For example, if you are buffering data into BagState and setting a timer to process it, it would be logical to say that any element that was buffered before the timer expired is on time, and any data that showed up after the timer fired is late. This would roughly correspond to what GBK does, and the answer would be very similar to simply comparing against the watermark (as the timers fire when the watermark advances).

Reuven

    On 1/4/20 8:11 PM, Reuven Lax wrote:


    On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        On 1/4/20 6:14 PM, Reuven Lax wrote:
        There is a very good reason not to define lateness directly
        in terms of the watermark. The model does not make any
        guarantees that the watermark advances synchronously, and in
        fact for the Dataflow runner the watermark advances
        asynchronously (i.e. independent of element processing).
        This means that simply comparing an element timestamp
        against the watermark creates a race condition. There are
        cases where the answer could change depending on exactly
        when you examine the watermark, and if you examine again
        while processing the same bundle you might come to a
        different conclusion about lateness.
        Due to monotonicity of watermark, I don't think that the
        asynchronous updates of watermark can change the answer from
        "late" to "not late". That seems fine to me.


    It's the other way around. You check to see whether an element is
    late and the answer is "not late." An instant later the answer
    changes to "late"  This does cause many problems, and is why this
    was changed.


        This non determinism is undesirable when considering
        lateness, as it can break many invariants that users may
        rely on (e.g. if I could write a ParDo that filtered all
        late data, yet still find late data showing up downstream of
        the ParDo which would be very surprising). For that reason,
        the SDK always marks things as late based on deterministic
        signals. e.g. for a triggered GBK everything in the first
        post-watermark pane is marked as on time (no matter what the
        watermark is) and everything in subsequent panes is marked
        as late.
        Dropping latecomers will always be non-deterministic, that is
        certain. This is true even in case where watermark is updated
        synchronously with element processing, due to shuffling and
        varying (random) differences of processing and event time in
        upstream operator(s). The question was only if a latecomer
        should be dropped only at a window boundaries only (which is
        a sort of artificial time boundary), or right away when
        spotted (in stateful dofns only). Another question would be
        if latecomers should be dropped based on input or output
        watermark, dropping based on output watermark seems even to
        be stable in the sense, that all downstream operators should
        come to the same conclusion (this is a bit of a speculation).


    Yes, but invariants should hold. If I add a ParDo that drops late
    elements (or, more commonly,diverts the late elements  to a
    different PCollection), then the result of that ParDo should
    _never_ introduce and more late data. This cannot be guaranteed
    simply with watermark checks. The ParDo may decide that the
    element was not late, but by the time it outputs the element the
    watermark may have advanced, causing the element to actually be late.

    In practice this is important. And early version of Dataflow (pre
    Beam) implemented lateness by comparing against the watermark,
    and it caused no end of trouble for users.


        FYI - this is also the reason why Beam does not currently
        provide users direct access to the watermark. The
        asynchronous nature of it  can be very confusing, and often
        results in users writing bugs in their pipelines. We decided
        instead to expose easier-to-reason-about signals such as
        timers (triggered by the watermark), windows, and lateness.

        Reuven

        On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            I realized the problem. I misinterpreted the
            LateDataDroppingDoFnRunner. It doesn't drop *all* late
            (arriving after watermark - allowed lateness) data, but
            only data, that arrive after maxTimestamp +
            allowedLateness of their respective windows.

            Stateful DoFn can run on global window (which was the
            case of my tests) and there is no dropping then.

            Two questions arise then:

             a) does it mean that this is one more argument to move
            this logic to StatefulDoFnRunner? StatefulDoFnRunner
            performs state cleanup on window GC time, so without
            LateDataDroppingDoFnRunner and late data will see empty
            state and will produce wrong results.

             b) is this behavior generally intentional and correct?
            Windows and triggers are (in my point of view) features
            of GBK, not stateful DoFn. Stateful DoFn is a low level
            primitive, which can be viewed to operate on "instant"
            windows, which should then probably be defined as
            dropping every single element arrive after allowed
            lateness. This might probably relate to question if
            operations should be built bottom up from most primitive
            and generic ones to more specific ones - that is GBK be
            implemented on top of stateful DoFn and not vice versa.

            Thoughts?

            Jan

            On 1/4/20 1:03 AM, Steve Niemitz wrote:
            I do agree that the direct runner doesn't drop late
            data arriving at a stateful DoFn (I just tested as well).

            However, I believe this is consistent with other
            runners. I'm fairly certain (at least last time I
            checked) that at least Dataflow will also only drop
            late data at GBK operations, and NOT stateful DoFns. 
            Whether or not this is intentional is debatable
            however, without being able to inspect the watermark
            inside the stateful DoFn, it'd be very difficult to do
            anything useful with late data.


            On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                I did write a test that tested if data is dropped
                in a plain stateful DoFn. I did this as part of
                validating that PR [1] didn't drop more data when
                using @RequiresTimeSortedInput than it would
                without this annotation. This test failed and I
                didn't commit it, yet.

                The test was basically as follows:

                 - use TestStream to generate three elements with
                timestamps 2, 1 and 0

                 - between elements with timestamp 1 and 0 move
                watermark to 1

                 - use allowed lateness of zero

                 - use stateful dofn that just emits arbitrary data
                for each input element

                 - use Count.globally to count outputs

                The outcome was that stateful dofn using
                @RequiresTimeSortedInput output 2 elements, without
                the annotation it was 3 elements. I think the
                correct one would be 2 elements in this case. The
                difference is caused by the annotation having
                (currently) its own logic for dropping data, which
                could be removed if we agree, that the data should
                be dropped in all cases.

                On 1/3/20 11:23 PM, Kenneth Knowles wrote:
                Did you write such
                a @Category(ValidatesRunner.class) test? I believe
                the Java direct runner does drop late data, for
                both GBK and stateful ParDo.

                Stateful ParDo is implemented on top of GBK:
                
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

                And GroupByKey, via DirectGroupByKey, via
                DirectGroupAlsoByWindow, does drop late data:
                
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

                I'm not sure why it has its own code, since
                ReduceFnRunner also drops late data, and it does
                use ReduceFnRunner (the same code path all
                Java-based runners use).

                Kenn


                On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    Yes, the non-reliability of late data dropping
                    in distributed runner is understood. But this
                    is even where DirectRunner can play its role,
                    because only there it is actually possible to
                    emulate and test specific watermark
                    conditions. Question regarding this for the
                    java DirectRunner - should we completely drop
                    LataDataDroppingDoFnRunner and delegate the
                    late data dropping to StatefulDoFnRunner?
                    Seems logical to me, as if we agree that late
                    data should always be dropped, then there
                    would no "valid" use of StatefulDoFnRunner
                    without the late data dropping functionality.

                    On 1/3/20 9:32 PM, Robert Bradshaw wrote:
                    I agree, in fact we just recently enabled
                    late data dropping to the direct runner in
                    Python to be able to develop better tests for
                    Dataflow.

                    It should be noted, however, that in a
                    distributed runner (absent the quiessence of
                    TestStream) that one can't *count* on late
                    data being dropped at a certain point, and in
                    fact (due to delays in fully propagating the
                    watermark) late data can even become on-time,
                    so the promises about what happens behind the
                    watermark are necessarily a bit loose.

                    On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik
                    <[email protected] <mailto:[email protected]>>
                    wrote:

                        I agree that the DirectRunner should drop
                        late data. Late data dropping is optional
                        but the DirectRunner is used by many for
                        testing and we should have the same
                        behaviour they would get on other runners
                        or users may be surprised.

                        On Fri, Jan 3, 2020 at 3:33 AM Jan
                        Lukavský <[email protected]
                        <mailto:[email protected]>> wrote:

                            Hi,

                            I just found out that DirectRunner is
                            apparently not using
                            LateDataDroppingDoFnRunner, which
                            means that it doesn't drop late data
                            in cases where there is no GBK
                            operation involved (dropping in GBK
                            seems
                            to be correct). There is apparently
                            no @Category(ValidatesRunner) test
                            for that behavior (because
                            DirectRunner would fail it), so the
                            question
                            is - should late data dropping be
                            considered part of model (of which
                            DirectRunner should be a canonical
                            implementation) and therefore that
                            should be fixed there, or is the late
                            data dropping an optional feature
                            of a runner?

                            I'm strongly in favor of the first
                            option, and I think it is likely that
                            all real-world runners would probably
                            adhere to that (I didn't check
                            that, though).

                            Opinions?

                              Jan

Reply via email to