On 4/2/22 16:41, Steve Niemitz wrote:
I've dug into this some more and have a couple observations/questions:
- I added some logging to my DoFn in both @ProcessElement and @OnTimer, I can confirm that I never have late data coming into ProcessElement (element.timestamp() is never after the end of the window) - The OnTimer method does end up having an output timestamp > window.maxTimestamp (unsurprisingly)

I traced into SimpleDoFnRunner.onTimer [1], which seems to ignore the output timestamp completely for processing time timers and uses the input watermark instead [1].  I believe this is what Luke was talking about above?

"There is another bug about the wrong output timestamp being used for processing time timers that exacerbates this but this only impacts non-portable runners."

I'm unclear why the input watermark would be used here.  My understanding is that the input watermark can progress, but the _output_ watermark is held in the stateful DoFn.

Is there a reason the input watermark is used here, rather than the actual output timestamp of the timer (or the output watermark)?

This was touched in [1], but the discussion stopped there. I think we should move the discussion forward, because the input watermark can move arbitrarily and the "position" in event-time is really given by output watermark.

[1] https://lists.apache.org/thread/klj24hbl88lpby3q0ormg7op8x84tj4v



On Fri, Apr 1, 2022 at 8:12 PM Steve Niemitz <[email protected]> wrote:

    While I do agree the symptoms are similar, I don't believe these
    are related.  The ESIO bug was centered around both "bundle"
    operations (StartBundle/EndBundle) and watermark updates during
    bundle processing.  In my case I'm not using anything related to
    bundle operations (no Start/EndBundle), and am running on dataflow
    which holds the watermark during bundle processing.

    I believe the issue here is that the input timestamp of the
    element is "impossible" given the current window it's in, which
    manifests later as the timer output timestamp validation firing,
    as well as the timestamp combiner error I linked in the thread.

    On Fri, Apr 1, 2022 at 7:50 PM Evan Galpin <[email protected]> wrote:

        I believe that this thread is entirely related to an another
        thread[1] where there is discussion that the correct fix for
        this issue could be to enforce that watermark updates would
        only happen at bundle boundaries. There’s another related
        thread[2] citing the same error with ElasticsearchIO.

        This appears to be the same root cause where multiple entities
        from different windows end up in the same bundle, and the
        watermark updates between elements of that bundle causing some
        timestamps to be “invalid”.

        [1]
        https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
        [2]
        https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db

        - Evan

        On Fri, Apr 1, 2022 at 18:35 Steve Niemitz
        <[email protected]> wrote:

            > but this only impacts non-portable runners.

            heh like Dataflow? :(

            I'm not sure what the solution is here then? I managed to
            hit this bug within 2 hours of running my first pipeline
            on 2.37.  I can't just live with pipelines breaking
            randomly, and it seems like everything worked fine before
            that check was introduced (even if the output timestamp
            was technically wrong?).  Is the answer to add a 1ms
            allowed skew to all my DoFns that use processing time
            timers?  Remove the check again? (which is probably what
            I'll do in the short term in our fork)

            When you say "There is another bug about the wrong output
            timestamp being used for processing time timers", which
            timestamp is that that's incorrect?  The element input
            timestamp?  What should it be instead?

            On Fri, Apr 1, 2022 at 6:09 PM Luke Cwik
            <[email protected]> wrote:

                The longstanding issue was that this was
                always happening but was not visible, recently
                validation was added to make it visible that this was
                wrong[1].

                There is another bug about the wrong output timestamp
                being used for processing time timers that exacerbates
                this but this only impacts non-portable runners.

                1: https://issues.apache.org/jira/browse/BEAM-12931


                On Fri, Apr 1, 2022 at 2:17 PM Steve Niemitz
                <[email protected]> wrote:

                    I'm unclear how the timer would have even ever
                    been set to output at that timestamp though.  The
                    output timestamp falls into the next window, and
                    if unset (like in this case) the output timestamp
                    is derived from the element timestamp [1].  This
                    means we somehow had an element in the wrong
                    window?  This seems strangely similar to BEAM-6757
                    actually.

                    What long-standing bug were you talking about
                    here?  We've been running these pipelines for
                    years now and never run into this until now,
                    although maybe we had been, but there was no
                    validation in the past to catch it until now?

                    [1]
                    
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1284


                    On Fri, Apr 1, 2022 at 5:11 PM Reuven Lax
                    <[email protected]> wrote:

                        There is a long-standing bug with processing
                        timestamps.

                        On Fri, Apr 1, 2022 at 2:01 PM Steve Niemitz
                        <[email protected]> wrote:

                            We have a job that uses processing time
                            timers, and just upgraded from 2.33 to
                            2.37. Sporadically we've started seeing
                            jobs fail with this error:

                            java.lang.IllegalArgumentException: Cannot
                            output with timestamp
                            2022-04-01T19:19:59.999Z. Output
                            timestamps must be no earlier than the
                            output timestamp of the timer
                            (2022-04-01T19:20:00.000Z) minus the
                            allowed skew (0 milliseconds) and no later
                            than 294247-01-10T04:00:54.775Z. See the
                            DoFn#getAllowedTimestampSkew() Javadoc for
                            details on changing the allowed skew.
                            at
                            
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.checkTimestamp(SimpleDoFnRunner.java:883)
                            at
                            
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:863)
                            at
                            
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
                            <user code>

                            This windowing is configured with 10
                            minute fixed windows and 10 minute allowed
                            lateness. We're not specifically setting
                            the output time on the timer, so it seems
                            like it's getting inferred from the
                            element timestamp?  The code that emits
                            elements from the timer uses
                            window.maxTimestamp() to set the output
                            timestamp.  I'm not sure I understand how
                            an element with a timestamp in what should
                            be the next window ended up in the
                            previous one?  Given that this is the
                            first stateful operation in the pipeline
                            and we read from pubsub using pubsub
                            timestamps, so there should be no late data.

                            I know there was a change recently to
                            better validate the output timestamp from
                            timers [1], I'm having trouble
                            understanding if there's a bug in that, or
                            if this is actually exposing a real bug in
                            our pipeline.

                            [1]
                            
https://github.com/apache/beam/commit/15048929495ad66963b528d5bd71eb7b4a844c96

Reply via email to