> Yes, but invariants should hold. If I add a ParDo that drops late elements (or, more commonly,diverts the late elements  to a different PCollection), then the result of that ParDo should _never_ introduce and more late data. This cannot be guaranteed simply with watermark checks. The ParDo may decide that the element was not late, but by the time it outputs the element the watermark may have advanced, causing the element to actually be late.

This is actually very interesting. The question is - if I decide about lateness based on output watermark of a PTransform, is it still the case, that in downstream operator(s) the element could be changed from "not late" to "late"? Provided the output watermark is updated synchronously based on input data (which should be) and watermark update cannot "overtake" elements, I think that the downstream decision should not be changed, so the invariant should hold. Or am I missing something?

On 1/4/20 8:11 PM, Reuven Lax wrote:


On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    On 1/4/20 6:14 PM, Reuven Lax wrote:
    There is a very good reason not to define lateness directly in
    terms of the watermark. The model does not make any guarantees
    that the watermark advances synchronously, and in fact for the
    Dataflow runner the watermark advances asynchronously (i.e.
    independent of element processing). This means that simply
    comparing an element timestamp against the watermark creates a
    race condition. There are cases where the answer could change
    depending on exactly when you examine the watermark, and if you
    examine again while processing the same bundle you might come to
    a different conclusion about lateness.
    Due to monotonicity of watermark, I don't think that the
    asynchronous updates of watermark can change the answer from
    "late" to "not late". That seems fine to me.


It's the other way around. You check to see whether an element is late and the answer is "not late." An instant later the answer changes to "late"  This does cause many problems, and is why this was changed.


    This non determinism is undesirable when considering lateness, as
    it can break many invariants that users may rely on (e.g. if I
    could write a ParDo that filtered all late data, yet still find
    late data showing up downstream of the ParDo which would be very
    surprising). For that reason, the SDK always marks things as late
    based on deterministic signals. e.g. for a triggered GBK
    everything in the first post-watermark pane is marked as on time
    (no matter what the watermark is) and everything in subsequent
    panes is marked as late.
    Dropping latecomers will always be non-deterministic, that is
    certain. This is true even in case where watermark is updated
    synchronously with element processing, due to shuffling and
    varying (random) differences of processing and event time in
    upstream operator(s). The question was only if a latecomer should
    be dropped only at a window boundaries only (which is a sort of
    artificial time boundary), or right away when spotted (in stateful
    dofns only). Another question would be if latecomers should be
    dropped based on input or output watermark, dropping based on
    output watermark seems even to be stable in the sense, that all
    downstream operators should come to the same conclusion (this is a
    bit of a speculation).


Yes, but invariants should hold. If I add a ParDo that drops late elements (or, more commonly,diverts the late elements  to a different PCollection), then the result of that ParDo should _never_ introduce and more late data. This cannot be guaranteed simply with watermark checks. The ParDo may decide that the element was not late, but by the time it outputs the element the watermark may have advanced, causing the element to actually be late.

In practice this is important. And early version of Dataflow (pre Beam) implemented lateness by comparing against the watermark, and it caused no end of trouble for users.


    FYI - this is also the reason why Beam does not currently provide
    users direct access to the watermark. The asynchronous nature of
    it  can be very confusing, and often results in users writing
    bugs in their pipelines. We decided instead to expose
    easier-to-reason-about signals such as timers (triggered by the
    watermark), windows, and lateness.

    Reuven

    On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        I realized the problem. I misinterpreted the
        LateDataDroppingDoFnRunner. It doesn't drop *all* late
        (arriving after watermark - allowed lateness) data, but only
        data, that arrive after maxTimestamp + allowedLateness of
        their respective windows.

        Stateful DoFn can run on global window (which was the case of
        my tests) and there is no dropping then.

        Two questions arise then:

         a) does it mean that this is one more argument to move this
        logic to StatefulDoFnRunner? StatefulDoFnRunner performs
        state cleanup on window GC time, so without
        LateDataDroppingDoFnRunner and late data will see empty state
        and will produce wrong results.

         b) is this behavior generally intentional and correct?
        Windows and triggers are (in my point of view) features of
        GBK, not stateful DoFn. Stateful DoFn is a low level
        primitive, which can be viewed to operate on "instant"
        windows, which should then probably be defined as dropping
        every single element arrive after allowed lateness. This
        might probably relate to question if operations should be
        built bottom up from most primitive and generic ones to more
        specific ones - that is GBK be implemented on top of stateful
        DoFn and not vice versa.

        Thoughts?

        Jan

        On 1/4/20 1:03 AM, Steve Niemitz wrote:
        I do agree that the direct runner doesn't drop late data
        arriving at a stateful DoFn (I just tested as well).

        However, I believe this is consistent with other runners. 
        I'm fairly certain (at least last time I checked) that at
        least Dataflow will also only drop late data at GBK
        operations, and NOT stateful DoFns. Whether or not this is
        intentional is debatable however, without being able to
        inspect the watermark inside the stateful DoFn, it'd be very
        difficult to do anything useful with late data.


        On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            I did write a test that tested if data is dropped in a
            plain stateful DoFn. I did this as part of validating
            that PR [1] didn't drop more data when using
            @RequiresTimeSortedInput than it would without this
            annotation. This test failed and I didn't commit it, yet.

            The test was basically as follows:

             - use TestStream to generate three elements with
            timestamps 2, 1 and 0

             - between elements with timestamp 1 and 0 move
            watermark to 1

             - use allowed lateness of zero

             - use stateful dofn that just emits arbitrary data for
            each input element

             - use Count.globally to count outputs

            The outcome was that stateful dofn using
            @RequiresTimeSortedInput output 2 elements, without the
            annotation it was 3 elements. I think the correct one
            would be 2 elements in this case. The difference is
            caused by the annotation having (currently) its own
            logic for dropping data, which could be removed if we
            agree, that the data should be dropped in all cases.

            On 1/3/20 11:23 PM, Kenneth Knowles wrote:
            Did you write such a @Category(ValidatesRunner.class)
            test? I believe the Java  direct runner does drop late
            data, for both GBK and stateful ParDo.

            Stateful ParDo is implemented on top of GBK:
            
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

            And GroupByKey, via DirectGroupByKey, via
            DirectGroupAlsoByWindow, does drop late data:
            
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

            I'm not sure why it has its own code, since
            ReduceFnRunner also drops late data, and it does use
            ReduceFnRunner (the same code path all Java-based
            runners use).

            Kenn


            On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Yes, the non-reliability of late data dropping in
                distributed runner is understood. But this is even
                where DirectRunner can play its role, because only
                there it is actually possible to emulate and test
                specific watermark conditions. Question regarding
                this for the java DirectRunner - should we
                completely drop LataDataDroppingDoFnRunner and
                delegate the late data dropping to
                StatefulDoFnRunner? Seems logical to me, as if we
                agree that late data should always be dropped, then
                there would no "valid" use of StatefulDoFnRunner
                without the late data dropping functionality.

                On 1/3/20 9:32 PM, Robert Bradshaw wrote:
                I agree, in fact we just recently enabled late
                data dropping to the direct runner in Python to be
                able to develop better tests for Dataflow.

                It should be noted, however, that in a distributed
                runner (absent the quiessence of TestStream) that
                one can't *count* on late data being dropped at a
                certain point, and in fact (due to delays in fully
                propagating the watermark) late data can even
                become on-time, so the promises about what happens
                behind the watermark are necessarily a bit loose.

                On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik
                <[email protected] <mailto:[email protected]>> wrote:

                    I agree that the DirectRunner should drop late
                    data. Late data dropping is optional but the
                    DirectRunner is used by many for testing and
                    we should have the same behaviour they would
                    get on other runners or users may be surprised.

                    On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        Hi,

                        I just found out that DirectRunner is
                        apparently not using
                        LateDataDroppingDoFnRunner, which means
                        that it doesn't drop late data
                        in cases where there is no GBK operation
                        involved (dropping in GBK seems
                        to be correct). There is apparently no
                        @Category(ValidatesRunner) test
                        for that behavior (because DirectRunner
                        would fail it), so the question
                        is - should late data dropping be
                        considered part of model (of which
                        DirectRunner should be a canonical
                        implementation) and therefore that
                        should be fixed there, or is the late data
                        dropping an optional feature
                        of a runner?

                        I'm strongly in favor of the first option,
                        and I think it is likely that
                        all real-world runners would probably
                        adhere to that (I didn't check
                        that, though).

                        Opinions?

                          Jan

Reply via email to