That is true. And as I have written at the beginning, I came across this behavior when I was comparing behavior of ordered (@RequiresTimeSortedInput) stateful DoFn to unordered. I think it might be fine that both of these have different dropping semantics (ordered case simply has to drop every data element that is after allowed lateness), but I wanted to be sure that these two cases cannot be unified so that this might not confuse users.

The only "unexpected" behavior that comes to mind regarding the current dropping semantics, is that when doing stateful DoFn on global window, no data element ever gets dropped. Regardless of allowed lateness settings. That _might_ surprise users, but yes, users can filter data on their own.

Thanks for the discussion.

Jan

On 1/8/20 11:29 PM, Reuven Lax wrote:
But there's no ordering inside a window. A stateful DoFn can see the input elements inside of a window in any order at all. This is another reason it's best to think of time spatially - as another data dimension - rather than like normal processing time.

On Wed, Jan 8, 2020 at 2:26 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi Luke and Kenn,

    I agree, my mental model fits this as well. But still, even in the
    presence of simultaneuos existence of all windows at once - GBK
    and stateful DoFns differ in the way they handle time *inside*
    each window (and I'm as well leaving merging windows outside,
    partly because they are not currently supported in stateful
    DoFns). GBK discretizes time (visible to user) through triggers,
    while stateful DoFn doesn't. That is where differences of these
    two come from.

    Jan

    On 1/7/20 10:16 PM, Luke Cwik wrote:
    That is a really good way to describe my mental model as well.

    On Tue, Jan 7, 2020 at 12:20 PM Kenneth Knowles <[email protected]
    <mailto:[email protected]>> wrote:



        On Tue, Jan 7, 2020 at 1:39 AM Jan Lukavský <[email protected]
        <mailto:[email protected]>> wrote:

            Hi Kenn,

            I see that my terminology seems not to be 100% aligned
            with Beam's. I'll work on that. :-)

            I agree with what you say, and by "late" I mostly meant
            "droppable" (arriving too late after watermark).

            I'm definitely not proposing to get back to something
            like "out of order" == "late" or anything like that. I'm
            also aware that stateful operation is windowed operation,
            but the semantics of the windowing is different than of a
            GBK. The difference is how time moves in GBK and how
            moves in stateful DoFn. Throwing away some details (early
            triggers, late data triggers), the main difference is
            that in GBK case, time hops just between window
            boundaries, while in stateful DoFn time moves "smoothly"
            (with each watermark update). Now, this difference brings
            the question about why the definition of "droppable" data
            is the same for both types of operations, when there is a
            difference in how users "perceive" time. As the more
            generic operation, stateful DoFn might deserve a more
            general definition of droppable data, which should
            degrade naturally to the one of GBK in presence of
            "discrete time hops".


        I understand what you mean. On the other hand, I encourage
        thinking of event time spatially, not as time passing. That
        is a big part of unifying batch/streaming real-time/archival
        processing. The event time window is a secondary key to
        partition the data (merging windows are slightly more
        complex). All event time windows exist simultaneously. So for
        both stateful ParDo and GBK, I find it helpful to consider
        this perspective where all windows are processed
        simultaneously / in an arbitrary order not assuming windows
        are ordered at all. Then you see that GBK and stateful ParDo
        do not really treat windows / watermark differently: both of
        them process a stream of data for each (key, window) pair
        until the watermark informs them that the stream is expired,
        then they GC the state associated with that (key, window) pair.

        Kenn

            This might have some consequences on how the droppable
            data should be handled in presence of (early) triggers,
            because triggerring is actually what makes time to "hop",
            so we might arrive to a conclusion that we might actually
            drop any data that has timestamp less than "last trigger
            time + allowed lateness". This looks appealing to me,
            because IMO it has strong internal logical consistency.
            Although it is possible that it would drop more data,
            which is generally undesirable, I admit that.

            I'm looking for explanation why the current approach was
            chosen instead of the other.

            Jan

            On 1/7/20 12:52 AM, Kenneth Knowles wrote:
            This thread has a lot in it, so I am just top-posting.

             - Stateful DoFn is a windowed operation; state is
            per-window. When the window expires, any further inputs
            are dropped.
             - "Late" is not synonymous with out-of-order. It
            doesn't really have an independent meaning.
                - For a GBK/Combine "late" means "not included prior
            to the on-time output", and "droppable" means "arriving
            after window expiry".
                - For Stateful DoFn there is no real meaning to
            "late" except if one is talking about "droppable", which
            still means "arriving after window expiry". A user may
            have a special timer where they flip a flag and treat
            elements after the timer differently.

            I think the definition of when data is droppable is very
            simple. We explicitly moved to this definition, away
            from the "out of order == late", because it is more
            robust and simpler to think about. Users saw lots of
            confusing behavior when we had "out of order by allowed
            lateness == droppable" logic.

            Kenn

            On Mon, Jan 6, 2020 at 1:42 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                > Generally the watermark update can overtake
                elements, because runners  can explicitly ignore
                late data in the watermark calculation (for good
                reason - those elements are already late, so no need
                to hold up the watermark advancing any more).

                This seems not to affect the decision of _not late_
                vs. _late_, is it? If element is late and gets
                ignored from watermark calculation (whatever that
                includes in this context), than the watermark cannot
                move past elements that were not marked as _not
                late_ and thus nothing can make them _late_.

                > For GBK on-time data simply means the first pane
                marked as on time. For state+timers I don't think it
                makes sense for Beam to define on-time v.s. late,
                rather I think the user can come up with their own
                definition depending on their use case. For example,
                if you are buffering data into BagState and setting
                a timer to process it, it would be logical to say
                that any element that was buffered before the timer
                expired is on time, and any data that showed up
                after the timer fired is late. This would roughly
                correspond to what GBK does, and the answer would be
                very similar to simply comparing against the
                watermark (as the timers fire when the watermark
                advances).

                Yes, I'd say that stateful DoFns don't have (well
                defined) concept of pane, because that is related to
                concept of trigger and this is a concept of GBK (or
                windowed operations in general). The only semantic
                meaning of window in stateful DoFn is that it
                "scopes" state.

                This discussion might have got a little off the
                original question, so I'll try to rephrase it:

                Should stateful DoFn drop *all* late data, not just
                data that arrive after window boundary + allowed
                lateness? Some arguments why I think it should:
                 * in windowed operations (GBK), it is correct to
                drop data on window boundaries only, because time
                (as seen by user) effectively hops only on these
                discrete time points
                 * in stateful dofn on the other hand time move
                "smoothly" (yes, with some granularity, millisecond,
                nanosecond, whatever and with watermark updates
                only, but still)
                 * this could be viewed that dropping late data
                immediately as time (again, from user perspective)
                moves (not on some more or less artificial boundary
                having only little semantic meaning) is consistent
                with both the above properties

                The negative side effect of this would be, that more
                data could be dropped, but ... isn't this what
                defines allowed lateness? I don't want to discuss
                the implications on user pipelines of such a change
                (and if we can or cannot do it), just trying to
                build some theoretical understanding of the problem
                as a whole. The decision if any change could /
                should be made can be done afterwards.

                Thanks,
                 Jan

                On 1/4/20 10:35 PM, Reuven Lax wrote:


                On Sat, Jan 4, 2020 at 12:13 PM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    > Yes, but invariants should hold. If I add a
                    ParDo that drops late elements (or, more
                    commonly,diverts the late elements  to a
                    different PCollection), then the result of that
                    ParDo should _never_ introduce and more late
                    data. This cannot be guaranteed simply with
                    watermark checks. The ParDo may decide that the
                    element was not late, but by the time it
                    outputs the element the watermark may have
                    advanced, causing the element to actually be late.

                    This is actually very interesting. The question
                    is - if I decide about lateness based on output
                    watermark of a PTransform, is it still the
                    case, that in downstream operator(s) the
                    element could be changed from "not late" to
                    "late"? Provided the output watermark is
                    updated synchronously based on input data
                    (which should be) and watermark update cannot
                    "overtake" elements, I think that the
                    downstream decision should not be changed, so
                    the invariant should hold. Or am I missing
                    something?


                Generally the watermark update can overtake
                elements, because runners  can explicitly ignore
                late data in the watermark calculation (for good
                reason - those elements are already late, so no
                need to hold up the watermark advancing any more).

                For GBK on-time data simply means the first pane
                marked as on time. For state+timers I don't think
                it makes sense for Beam to define on-time v.s.
                late, rather I think the user can come up with
                their own definition depending on their use case.
                For example, if you are buffering data into
                BagState and setting a timer to process it, it
                would be logical to say that any element that was
                buffered before the timer expired is on time, and
                any data that showed up after the timer fired is
                late. This would roughly correspond to what GBK
                does, and the answer would be very similar to
                simply comparing against the watermark (as the
                timers fire when the watermark advances).

                Reuven

                    On 1/4/20 8:11 PM, Reuven Lax wrote:


                    On Sat, Jan 4, 2020 at 11:03 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        On 1/4/20 6:14 PM, Reuven Lax wrote:
                        There is a very good reason not to define
                        lateness directly in terms of the
                        watermark. The model does not make any
                        guarantees that the watermark advances
                        synchronously, and in fact for the
                        Dataflow runner the watermark advances
                        asynchronously (i.e. independent of
                        element processing). This means that
                        simply comparing an element timestamp
                        against the watermark creates a race
                        condition. There are cases where the
                        answer could change depending on exactly
                        when you examine the watermark, and if
                        you examine again while processing the
                        same bundle you might come to a different
                        conclusion about lateness.
                        Due to monotonicity of watermark, I don't
                        think that the asynchronous updates of
                        watermark can change the answer from
                        "late" to "not late". That seems fine to me.


                    It's the other way around. You check to see
                    whether an element is late and the answer is
                    "not late." An instant later the answer
                    changes to "late"  This does cause many
                    problems, and is why this was changed.


                        This non determinism is undesirable when
                        considering lateness, as it can break
                        many invariants that users may rely on
                        (e.g. if I could write a ParDo that
                        filtered all late data, yet still find
                        late data showing up downstream of the
                        ParDo which would be very surprising).
                        For that reason, the SDK always marks
                        things as late based on deterministic
                        signals. e.g. for a triggered GBK
                        everything in the first post-watermark
                        pane is marked as on time (no matter what
                        the watermark is) and everything in
                        subsequent panes is marked as late.
                        Dropping latecomers will always be
                        non-deterministic, that is certain. This
                        is true even in case where watermark is
                        updated synchronously with element
                        processing, due to shuffling and varying
                        (random) differences of processing and
                        event time in upstream operator(s). The
                        question was only if a latecomer should be
                        dropped only at a window boundaries only
                        (which is a sort of artificial time
                        boundary), or right away when spotted (in
                        stateful dofns only). Another question
                        would be if latecomers should be dropped
                        based on input or output watermark,
                        dropping based on output watermark seems
                        even to be stable in the sense, that all
                        downstream operators should come to the
                        same conclusion (this is a bit of a
                        speculation).


                    Yes, but invariants should hold. If I add a
                    ParDo that drops late elements (or, more
                    commonly,diverts the late elements  to a
                    different PCollection), then the result of
                    that ParDo should _never_ introduce and more
                    late data. This cannot be guaranteed simply
                    with watermark checks. The ParDo may decide
                    that the element was not late, but by the time
                    it outputs the element the watermark may have
                    advanced, causing the element to actually be late.

                    In practice this is important. And early
                    version of Dataflow (pre Beam) implemented
                    lateness by comparing against the watermark,
                    and it caused no end of trouble for users.


                        FYI - this is also the reason why Beam
                        does not currently provide users direct
                        access to the watermark. The asynchronous
                        nature of it can be very confusing, and
                        often results in users writing bugs in
                        their pipelines. We decided instead to
                        expose easier-to-reason-about signals
                        such as timers (triggered by the
                        watermark), windows, and lateness.

                        Reuven

                        On Sat, Jan 4, 2020 at 1:15 AM Jan
                        Lukavský <[email protected]
                        <mailto:[email protected]>> wrote:

                            I realized the problem. I
                            misinterpreted the
                            LateDataDroppingDoFnRunner. It
                            doesn't drop *all* late (arriving
                            after watermark - allowed lateness)
                            data, but only data, that arrive
                            after maxTimestamp + allowedLateness
                            of their respective windows.

                            Stateful DoFn can run on global
                            window (which was the case of my
                            tests) and there is no dropping then.

                            Two questions arise then:

                             a) does it mean that this is one
                            more argument to move this logic to
                            StatefulDoFnRunner?
                            StatefulDoFnRunner performs state
                            cleanup on window GC time, so without
                            LateDataDroppingDoFnRunner and late
                            data will see empty state and will
                            produce wrong results.

                             b) is this behavior generally
                            intentional and correct? Windows and
                            triggers are (in my point of view)
                            features of GBK, not stateful DoFn.
                            Stateful DoFn is a low level
                            primitive, which can be viewed to
                            operate on "instant" windows, which
                            should then probably be defined as
                            dropping every single element arrive
                            after allowed lateness. This might
                            probably relate to question if
                            operations should be built bottom up
                            from most primitive and generic ones
                            to more specific ones - that is GBK
                            be implemented on top of stateful
                            DoFn and not vice versa.

                            Thoughts?

                            Jan

                            On 1/4/20 1:03 AM, Steve Niemitz wrote:
                            I do agree that the direct runner
                            doesn't drop late data arriving at a
                            stateful DoFn (I just tested as well).

                            However, I believe this is
                            consistent with other runners.  I'm
                            fairly certain (at least last time I
                            checked) that at least Dataflow will
                            also only drop late data at GBK
                            operations, and NOT stateful DoFns.
                            Whether or not this is intentional
                            is debatable however, without being
                            able to inspect the watermark inside
                            the stateful DoFn, it'd be very
                            difficult to do anything useful with
                            late data.


                            On Fri, Jan 3, 2020 at 5:47 PM Jan
                            Lukavský <[email protected]
                            <mailto:[email protected]>> wrote:

                                I did write a test that tested
                                if data is dropped in a plain
                                stateful DoFn. I did this as
                                part of validating that PR [1]
                                didn't drop more data when using
                                @RequiresTimeSortedInput than it
                                would without this annotation.
                                This test failed and I didn't
                                commit it, yet.

                                The test was basically as follows:

                                 - use TestStream to generate
                                three elements with timestamps
                                2, 1 and 0

                                 - between elements with
                                timestamp 1 and 0 move watermark
                                to 1

                                 - use allowed lateness of zero

                                 - use stateful dofn that just
                                emits arbitrary data for each
                                input element

                                 - use Count.globally to count
                                outputs

                                The outcome was that stateful
                                dofn using
                                @RequiresTimeSortedInput output
                                2 elements, without the
                                annotation it was 3 elements. I
                                think the correct one would be 2
                                elements in this case. The
                                difference is caused by the
                                annotation having (currently)
                                its own logic for dropping data,
                                which could be removed if we
                                agree, that the data should be
                                dropped in all cases.

                                On 1/3/20 11:23 PM, Kenneth
                                Knowles wrote:
                                Did you write such
                                a @Category(ValidatesRunner.class)
                                test? I believe the Java 
                                direct runner does drop late
                                data, for both GBK and stateful
                                ParDo.

                                Stateful ParDo is implemented
                                on top of GBK:
                                
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

                                And GroupByKey, via
                                DirectGroupByKey, via
                                DirectGroupAlsoByWindow, does
                                drop late data:
                                
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

                                I'm not sure why it has its own
                                code, since ReduceFnRunner also
                                drops late data, and it does
                                use ReduceFnRunner (the same
                                code path all Java-based
                                runners use).

                                Kenn


                                On Fri, Jan 3, 2020 at 1:02 PM
                                Jan Lukavský <[email protected]
                                <mailto:[email protected]>> wrote:

                                    Yes, the non-reliability of
                                    late data dropping in
                                    distributed runner is
                                    understood. But this is
                                    even where DirectRunner can
                                    play its role, because only
                                    there it is actually
                                    possible to emulate and
                                    test specific watermark
                                    conditions. Question
                                    regarding this for the java
                                    DirectRunner - should we
                                    completely drop
                                    LataDataDroppingDoFnRunner
                                    and delegate the late data
                                    dropping to
                                    StatefulDoFnRunner? Seems
                                    logical to me, as if we
                                    agree that late data should
                                    always be dropped, then
                                    there would no "valid" use
                                    of StatefulDoFnRunner
                                    without the late data
                                    dropping functionality.

                                    On 1/3/20 9:32 PM, Robert
                                    Bradshaw wrote:
                                    I agree, in fact we just
                                    recently enabled late data
                                    dropping to the direct
                                    runner in Python to be
                                    able to develop better
                                    tests for Dataflow.

                                    It should be noted,
                                    however, that in a
                                    distributed runner (absent
                                    the quiessence of
                                    TestStream) that one can't
                                    *count* on late data being
                                    dropped at a certain
                                    point, and in fact (due to
                                    delays in fully
                                    propagating the watermark)
                                    late data can even become
                                    on-time, so the promises
                                    about what happens behind
                                    the watermark are
                                    necessarily a bit loose.

                                    On Fri, Jan 3, 2020 at
                                    9:15 AM Luke Cwik
                                    <[email protected]
                                    <mailto:[email protected]>>
                                    wrote:

                                        I agree that the
                                        DirectRunner should
                                        drop late data. Late
                                        data dropping is
                                        optional but the
                                        DirectRunner is used
                                        by many for testing
                                        and we should have the
                                        same behaviour they
                                        would get on other
                                        runners or users may
                                        be surprised.

                                        On Fri, Jan 3, 2020 at
                                        3:33 AM Jan Lukavský
                                        <[email protected]
                                        <mailto:[email protected]>>
                                        wrote:

                                            Hi,

                                            I just found out
                                            that DirectRunner
                                            is apparently not
                                            using
                                            LateDataDroppingDoFnRunner,
                                            which means that
                                            it doesn't drop
                                            late data
                                            in cases where
                                            there is no GBK
                                            operation involved
                                            (dropping in GBK
                                            seems
                                            to be correct).
                                            There is
                                            apparently no
                                            @Category(ValidatesRunner)
                                            test
                                            for that behavior
                                            (because
                                            DirectRunner would
                                            fail it), so the
                                            question
                                            is - should late
                                            data dropping be
                                            considered part of
                                            model (of which
                                            DirectRunner
                                            should be a
                                            canonical
                                            implementation)
                                            and therefore that
                                            should be fixed
                                            there, or is the
                                            late data dropping
                                            an optional feature
                                            of a runner?

                                            I'm strongly in
                                            favor of the first
                                            option, and I
                                            think it is likely
                                            that
                                            all real-world
                                            runners would
                                            probably adhere to
                                            that (I didn't check
                                            that, though).

                                            Opinions?

                                              Jan

Reply via email to