I realized the problem. I misinterpreted the LateDataDroppingDoFnRunner. It doesn't drop *all* late (arriving after watermark - allowed lateness) data, but only data, that arrive after maxTimestamp + allowedLateness of their respective windows.

Stateful DoFn can run on global window (which was the case of my tests) and there is no dropping then.

Two questions arise then:

 a) does it mean that this is one more argument to move this logic to StatefulDoFnRunner? StatefulDoFnRunner performs state cleanup on window GC time, so without LateDataDroppingDoFnRunner and late data will see empty state and will produce wrong results.

 b) is this behavior generally intentional and correct? Windows and triggers are (in my point of view) features of GBK, not stateful DoFn. Stateful DoFn is a low level primitive, which can be viewed to operate on "instant" windows, which should then probably be defined as dropping every single element arrive after allowed lateness. This might probably relate to question if operations should be built bottom up from most primitive and generic ones to more specific ones - that is GBK be implemented on top of stateful DoFn and not vice versa.

Thoughts?

Jan

On 1/4/20 1:03 AM, Steve Niemitz wrote:
I do agree that the direct runner doesn't drop late data arriving at a stateful DoFn (I just tested as well).

However, I believe this is consistent with other runners. I'm fairly certain (at least last time I checked) that at least Dataflow will also only drop late data at GBK operations, and NOT stateful DoFns.  Whether or not this is intentional is debatable however, without being able to inspect the watermark inside the stateful DoFn, it'd be very difficult to do anything useful with late data.


On Fri, Jan 3, 2020 at 5:47 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    I did write a test that tested if data is dropped in a plain
    stateful DoFn. I did this as part of validating that PR [1] didn't
    drop more data when using @RequiresTimeSortedInput than it would
    without this annotation. This test failed and I didn't commit it, yet.

    The test was basically as follows:

     - use TestStream to generate three elements with timestamps 2, 1
    and 0

     - between elements with timestamp 1 and 0 move watermark to 1

     - use allowed lateness of zero

     - use stateful dofn that just emits arbitrary data for each input
    element

     - use Count.globally to count outputs

    The outcome was that stateful dofn using @RequiresTimeSortedInput
    output 2 elements, without the annotation it was 3 elements. I
    think the correct one would be 2 elements in this case. The
    difference is caused by the annotation having (currently) its own
    logic for dropping data, which could be removed if we agree, that
    the data should be dropped in all cases.

    On 1/3/20 11:23 PM, Kenneth Knowles wrote:
    Did you write such a @Category(ValidatesRunner.class) test? I
    believe the Java  direct runner does drop late data, for both GBK
    and stateful ParDo.

    Stateful ParDo is implemented on top of GBK:
    
https://github.com/apache/beam/blob/64262a61402fad67d9ad8a66eaf6322593d3b5dc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java#L172

    And GroupByKey, via DirectGroupByKey, via
    DirectGroupAlsoByWindow, does drop late data:
    
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java#L220

    I'm not sure why it has its own code, since ReduceFnRunner also
    drops late data, and it does use ReduceFnRunner (the same code
    path all Java-based runners use).

    Kenn


    On Fri, Jan 3, 2020 at 1:02 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Yes, the non-reliability of late data dropping in distributed
        runner is understood. But this is even where DirectRunner can
        play its role, because only there it is actually possible to
        emulate and test specific watermark conditions. Question
        regarding this for the java DirectRunner - should we
        completely drop LataDataDroppingDoFnRunner and delegate the
        late data dropping to StatefulDoFnRunner? Seems logical to
        me, as if we agree that late data should always be dropped,
        then there would no "valid" use of StatefulDoFnRunner without
        the late data dropping functionality.

        On 1/3/20 9:32 PM, Robert Bradshaw wrote:
        I agree, in fact we just recently enabled late data dropping
        to the direct runner in Python to be able to develop better
        tests for Dataflow.

        It should be noted, however, that in a distributed runner
        (absent the quiessence of TestStream) that one can't *count*
        on late data being dropped at a certain point, and in fact
        (due to delays in fully propagating the watermark) late data
        can even become on-time, so the promises about what happens
        behind the watermark are necessarily a bit loose.

        On Fri, Jan 3, 2020 at 9:15 AM Luke Cwik <[email protected]
        <mailto:[email protected]>> wrote:

            I agree that the DirectRunner should drop late data.
            Late data dropping is optional but the DirectRunner is
            used by many for testing and we should have the same
            behaviour they would get on other runners or users may
            be surprised.

            On Fri, Jan 3, 2020 at 3:33 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Hi,

                I just found out that DirectRunner is apparently not
                using
                LateDataDroppingDoFnRunner, which means that it
                doesn't drop late data
                in cases where there is no GBK operation involved
                (dropping in GBK seems
                to be correct). There is apparently no
                @Category(ValidatesRunner) test
                for that behavior (because DirectRunner would fail
                it), so the question
                is - should late data dropping be considered part of
                model (of which
                DirectRunner should be a canonical implementation)
                and therefore that
                should be fixed there, or is the late data dropping
                an optional feature
                of a runner?

                I'm strongly in favor of the first option, and I
                think it is likely that
                all real-world runners would probably adhere to that
                (I didn't check
                that, though).

                Opinions?

                  Jan

Reply via email to