Hi Kenn,

I see that my terminology seems not to be 100% aligned with Beam's. I'll work on that. :-)

I agree with what you say, and by "late" I mostly meant "droppable" (arriving too late after watermark).

I'm definitely not proposing to get back to something like "out of order" == "late" or anything like that. I'm also aware that stateful operation is windowed operation, but the semantics of the windowing is different than of a GBK. The difference is how time moves in GBK and how moves in stateful DoFn. Throwing away some details (early triggers, late data triggers), the main difference is that in GBK case, time hops just between window boundaries, while in stateful DoFn time moves "smoothly" (with each watermark update). Now, this difference brings the question about why the definition of "droppable" data is the same for both types of operations, when there is a difference in how users "perceive" time. As the more generic operation, stateful DoFn might deserve a more general definition of droppable data, which should degrade naturally to the one of GBK in presence of "discrete time hops".

This might have some consequences on how the droppable data should be handled in presence of (early) triggers, because triggerring is actually what makes time to "hop", so we might arrive to a conclusion that we might actually drop any data that has timestamp less than "last trigger time + allowed lateness". This looks appealing to me, because IMO it has strong internal logical consistency. Although it is possible that it would drop more data, which is generally undesirable, I admit that.

I'm looking for explanation why the current approach was chosen instead of the other.

Jan

On 1/7/20 12:52 AM, Kenneth Knowles wrote:
This thread has a lot in it, so I am just top-posting.

 - Stateful DoFn is a windowed operation; state is per-window. When the window expires, any further inputs are dropped.  - "Late" is not synonymous with out-of-order. It doesn't really have an independent meaning.     - For a GBK/Combine "late" means "not included prior to the on-time output", and "droppable" means "arriving after window expiry".     - For Stateful DoFn there is no real meaning to "late" except if one is talking about "droppable", which still means "arriving after window expiry". A user may have a special timer where they flip a flag and treat elements after the timer differently.

I think the definition of when data is droppable is very simple. We explicitly moved to this definition, away from the "out of order == late", because it is more robust and simpler to think about. Users saw lots of confusing behavior when we had "out of order by allowed lateness == droppable" logic.

Kenn

On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    > Generally the watermark update can overtake elements, because
    runners  can explicitly ignore late data in the watermark
    calculation (for good reason - those elements are already late, so
    no need to hold up the watermark advancing any more).

    This seems not to affect the decision of _not late_ vs. _late_, is
    it? If element is late and gets ignored from watermark calculation
    (whatever that includes in this context), than the watermark
    cannot move past elements that were not marked as _not late_ and
    thus nothing can make them _late_.

    > For GBK on-time data simply means the first pane marked as on
    time. For state+timers I don't think it makes sense for Beam to
    define on-time v.s. late, rather I think the user can come up with
    their own definition depending on their use case. For example, if
    you are buffering data into BagState and setting a timer to
    process it, it would be logical to say that any element that was
    buffered before the timer expired is on time, and any data that
    showed up after the timer fired is late. This would roughly
    correspond to what GBK does, and the answer would be very similar
    to simply comparing against the watermark (as the timers fire when
    the watermark advances).

    Yes, I'd say that stateful DoFns don't have (well defined) concept
    of pane, because that is related to concept of trigger and this is
    a concept of GBK (or windowed operations in general). The only
    semantic meaning of window in stateful DoFn is that it "scopes" state.

    This discussion might have got a little off the original question,
    so I'll try to rephrase it:

    Should stateful DoFn drop *all* late data, not just data that
    arrive after window boundary + allowed lateness? Some arguments
    why I think it should:
     * in windowed operations (GBK), it is correct to drop data on
    window boundaries only, because time (as seen by user) effectively
    hops only on these discrete time points
     * in stateful dofn on the other hand time move "smoothly" (yes,
    with some granularity, millisecond, nanosecond, whatever and with
    watermark updates only, but still)
     * this could be viewed that dropping late data immediately as
    time (again, from user perspective) moves (not on some more or
    less artificial boundary having only little semantic meaning) is
    consistent with both the above properties

    The negative side effect of this would be, that more data could be
    dropped, but ... isn't this what defines allowed lateness? I don't
    want to discuss the implications on user pipelines of such a
    change (and if we can or cannot do it), just trying to build some
    theoretical understanding of the problem as a whole. The decision
    if any change could / should be made can be done afterwards.

    Thanks,
     Jan

    On 1/4/20 10:35 PM, Reuven Lax wrote:


    On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        > Yes, but invariants should hold. If I add a ParDo that
        drops late elements (or, more commonly,diverts the late
        elements  to a different PCollection), then the result of
        that ParDo should _never_ introduce and more late data. This
        cannot be guaranteed simply with watermark checks. The ParDo
        may decide that the element was not late, but by the time it
        outputs the element the watermark may have advanced, causing
        the element to actually be late.

        This is actually very interesting. The question is - if I
        decide about lateness based on output watermark of a
        PTransform, is it still the case, that in downstream
        operator(s) the element could be changed from "not late" to
        "late"? Provided the output watermark is updated
        synchronously based on input data (which should be) and
        watermark update cannot "overtake" elements, I think that the
        downstream decision should not be changed, so the invariant
        should hold. Or am I missing something?


    Generally the watermark update can overtake elements, because
    runners  can explicitly ignore late data in the watermark
    calculation (for good reason - those elements are already late,
    so no need to hold up the watermark advancing any more).

    For GBK on-time data simply means the first pane marked as on
    time. For state+timers I don't think it makes sense for Beam to
    define on-time v.s. late, rather I think the user can come up
    with their own definition depending on their use case. For
    example, if you are buffering data into BagState and setting a
    timer to process it, it would be logical to say that any element
    that was buffered before the timer expired is on time, and any
    data that showed up after the timer fired is late. This would
    roughly correspond to what GBK does, and the answer would be very
    similar to simply comparing against the watermark (as the timers
    fire when the watermark advances).

    Reuven

        On 1/4/20 8:11 PM, Reuven Lax wrote:


        On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            On 1/4/20 6:14 PM, Reuven Lax wrote:
            There is a very good reason not to define lateness
            directly in terms of the watermark. The model does not
            make any guarantees that the watermark advances
            synchronously, and in fact for the Dataflow runner the
            watermark advances asynchronously (i.e. independent of
            element processing). This means that simply comparing
            an element timestamp against the watermark creates a
            race condition. There are cases where the answer could
            change depending on exactly when you examine the
            watermark, and if you examine again while processing
            the same bundle you might come to a different
            conclusion about lateness.
            Due to monotonicity of watermark, I don't think that the
            asynchronous updates of watermark can change the answer
            from "late" to "not late". That seems fine to me.


        It's the other way around. You check to see whether an
        element is late and the answer is "not late." An instant
        later the answer changes to "late"  This does cause many
        problems, and is why this was changed.


            This non determinism is undesirable when considering
            lateness, as it can break many invariants that users
            may rely on (e.g. if I could write a ParDo that
            filtered all late data, yet still find late data
            showing up downstream of the ParDo which would be very
            surprising). For that reason, the SDK always marks
            things as late based on deterministic signals. e.g. for
            a triggered GBK everything in the first post-watermark
            pane is marked as on time (no matter what the watermark
            is) and everything in subsequent panes is marked as late.
            Dropping latecomers will always be non-deterministic,
            that is certain. This is true even in case where
            watermark is updated synchronously with element
            processing, due to shuffling and varying (random)
            differences of processing and event time in upstream
            operator(s). The question was only if a latecomer should
            be dropped only at a window boundaries only (which is a
            sort of artificial time boundary), or right away when
            spotted (in stateful dofns only). Another question would
            be if latecomers should be dropped based on input or
            output watermark, dropping based on output watermark
            seems even to be stable in the sense, that all
            downstream operators should come to the same conclusion
            (this is a bit of a speculation).


        Yes, but invariants should hold. If I add a ParDo that drops
        late elements (or, more commonly,diverts the late elements
        to a different PCollection), then the result of that ParDo
        should _never_ introduce and more late data. This cannot be
        guaranteed simply with watermark checks. The ParDo may
        decide that the element was not late, but by the time it
        outputs the element the watermark may have advanced, causing
        the element to actually be late.

        In practice this is important. And early version of Dataflow
        (pre Beam) implemented lateness by comparing against the
        watermark, and it caused no end of trouble for users.


            FYI - this is also the reason why Beam does not
            currently provide users direct access to the watermark.
            The asynchronous nature of it  can be very confusing,
            and often results in users writing bugs in their
            pipelines. We decided instead to expose
            easier-to-reason-about signals such as timers
            (triggered by the watermark), windows, and lateness.

            Reuven

            On Sat, Jan 4, 2020 at 1:15 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                I realized the problem. I misinterpreted the
                LateDataDroppingDoFnRunner. It doesn't drop *all*
                late (arriving after watermark - allowed lateness)
                data, but only data, that arrive after maxTimestamp
                + allowedLateness of their respective windows.

                Stateful DoFn can run on global window (which was
                the case of my tests) and there is no dropping then.

                Two questions arise then:

                 a) does it mean that this is one more argument to
                move this logic to StatefulDoFnRunner?
                StatefulDoFnRunner performs state cleanup on window
                GC time, so without LateDataDroppingDoFnRunner and
                late data will see empty state and will produce
                wrong results.

                 b) is this behavior generally intentional and
                correct? Windows and triggers are (in my point of
                view) features of GBK, not stateful DoFn. Stateful
                DoFn is a low level primitive, which can be viewed
                to operate on "instant" windows, which should then
                probably be defined as dropping every single
                element arrive after allowed lateness. This might
                probably relate to question if operations should be
                built bottom up from most primitive and generic
                ones to more specific ones - that is GBK be
                implemented on top of stateful DoFn and not vice versa.

                Thoughts?

                Jan

                On 1/4/20 1:03 AM, Steve Niemitz wrote:
                I do agree that the direct runner doesn't drop
                late data arriving at a stateful DoFn (I just
                tested as well).

                However, I believe this is consistent with other
                runners.  I'm fairly certain (at least last time I
                checked) that at least Dataflow will also only
                drop late data at GBK operations, and NOT stateful
                DoFns.  Whether or not this is intentional is
                debatable however, without being able to inspect
                the watermark inside the stateful DoFn, it'd be
                very difficult to do anything useful with late data.


                On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    I did write a test that tested if data is
                    dropped in a plain stateful DoFn. I did this
                    as part of validating that PR [1] didn't drop
                    more data when using @RequiresTimeSortedInput
                    than it would without this annotation. This
                    test failed and I didn't commit it, yet.

                    The test was basically as follows:

                     - use TestStream to generate three elements
                    with timestamps 2, 1 and 0

                     - between elements with timestamp 1 and 0
                    move watermark to 1

                     - use allowed lateness of zero

                     - use stateful dofn that just emits arbitrary
                    data for each input element

                     - use Count.globally to count outputs

                    The outcome was that stateful dofn using
                    @RequiresTimeSortedInput output 2 elements,
                    without the annotation it was 3 elements. I
                    think the correct one would be 2 elements in
                    this case. The difference is caused by the
                    annotation having (currently) its own logic
                    for dropping data, which could be removed if
                    we agree, that the data should be dropped in
                    all cases.

                    On 1/3/20 11:23 PM, Kenneth Knowles wrote:
                    Did you write such
                    a @Category(ValidatesRunner.class) test? I
                    believe the Java  direct runner does drop
                    late data, for both GBK and stateful ParDo.

                    Stateful ParDo is implemented on top of GBK:
                    
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

                    And GroupByKey, via DirectGroupByKey, via
                    DirectGroupAlsoByWindow, does drop late data:
                    
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

                    I'm not sure why it has its own code, since
                    ReduceFnRunner also drops late data, and it
                    does use ReduceFnRunner (the same code path
                    all Java-based runners use).

                    Kenn


                    On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        Yes, the non-reliability of late data
                        dropping in distributed runner is
                        understood. But this is even where
                        DirectRunner can play its role, because
                        only there it is actually possible to
                        emulate and test specific watermark
                        conditions. Question regarding this for
                        the java DirectRunner - should we
                        completely drop
                        LataDataDroppingDoFnRunner and delegate
                        the late data dropping to
                        StatefulDoFnRunner? Seems logical to me,
                        as if we agree that late data should
                        always be dropped, then there would no
                        "valid" use of StatefulDoFnRunner without
                        the late data dropping functionality.

                        On 1/3/20 9:32 PM, Robert Bradshaw wrote:
                        I agree, in fact we just recently
                        enabled late data dropping to the direct
                        runner in Python to be able to develop
                        better tests for Dataflow.

                        It should be noted, however, that in a
                        distributed runner (absent the
                        quiessence of TestStream) that one can't
                        *count* on late data being dropped at a
                        certain point, and in fact (due to
                        delays in fully propagating the
                        watermark) late data can even become
                        on-time, so the promises about what
                        happens behind the watermark are
                        necessarily a bit loose.

                        On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik
                        <[email protected]
                        <mailto:[email protected]>> wrote:

                            I agree that the DirectRunner should
                            drop late data. Late data dropping
                            is optional but the DirectRunner is
                            used by many for testing and we
                            should have the same behaviour they
                            would get on other runners or users
                            may be surprised.

                            On Fri, Jan 3, 2020 at 3:33 AM Jan
                            Lukavský <[email protected]
                            <mailto:[email protected]>> wrote:

                                Hi,

                                I just found out that
                                DirectRunner is apparently not
                                using
                                LateDataDroppingDoFnRunner,
                                which means that it doesn't drop
                                late data
                                in cases where there is no GBK
                                operation involved (dropping in
                                GBK seems
                                to be correct). There is
                                apparently no
                                @Category(ValidatesRunner) test
                                for that behavior (because
                                DirectRunner would fail it), so
                                the question
                                is - should late data dropping
                                be considered part of model (of
                                which
                                DirectRunner should be a
                                canonical implementation) and
                                therefore that
                                should be fixed there, or is the
                                late data dropping an optional
                                feature
                                of a runner?

                                I'm strongly in favor of the
                                first option, and I think it is
                                likely that
                                all real-world runners would
                                probably adhere to that (I
                                didn't check
                                that, though).

                                Opinions?

                                  Jan

Reply via email to