Oh I had forgotten about that thread, good point, it is very related to this. I agree we should fix this, to force the conversation, I've opened a PR to do so: https://github.com/apache/beam/pull/17262
As a good example of this behavior simply in the beam SDK, GroupIntoBatches exhibits this problem by default because it uses output() from inside a processing time timer. - During normal operations (when using maxBufferingDuration), the timestamp of elements it outputs may be arbitrary outside of the current window it's outputting into. I wrote a quick pipeline to verify this and almost immediately hit this case. - When draining a pipeline (in Dataflow), any buffered elements are output with a timestamp of BoundedWindow.TIMESTAMP_MAX_VALUE (since the input watermark has gone to that) Also, not to derail this conversation, but I believe there's a bug (BEAM-14239) in dataflow's timer implementation, where resetting a timer with a new output timestamp doesn't clear the existing one, resulting in multiple timers being set and firing. On Mon, Apr 4, 2022 at 4:47 AM Jan Lukavský <[email protected]> wrote: > On 4/2/22 16:41, Steve Niemitz wrote: > > I've dug into this some more and have a couple observations/questions: > - I added some logging to my DoFn in both @ProcessElement and @OnTimer, I > can confirm that I never have late data coming into ProcessElement > (element.timestamp() is never after the end of the window) > - The OnTimer method does end up having an output timestamp > > window.maxTimestamp (unsurprisingly) > > I traced into SimpleDoFnRunner.onTimer [1], which seems to ignore the > output timestamp completely for processing time timers and uses the input > watermark instead [1]. I believe this is what Luke was talking about > above? > > "There is another bug about the wrong output timestamp being used for > processing time timers that exacerbates this but this only impacts > non-portable runners." > > I'm unclear why the input watermark would be used here. My understanding > is that the input watermark can progress, but the _output_ watermark is > held in the stateful DoFn. > > Is there a reason the input watermark is used here, rather than the actual > output timestamp of the timer (or the output watermark)? > > This was touched in [1], but the discussion stopped there. I think we > should move the discussion forward, because the input watermark can move > arbitrarily and the "position" in event-time is really given by output > watermark. > > [1] https://lists.apache.org/thread/klj24hbl88lpby3q0ormg7op8x84tj4v > > > > On Fri, Apr 1, 2022 at 8:12 PM Steve Niemitz <[email protected]> wrote: > >> While I do agree the symptoms are similar, I don't believe these are >> related. The ESIO bug was centered around both "bundle" operations >> (StartBundle/EndBundle) and watermark updates during bundle processing. In >> my case I'm not using anything related to bundle operations (no >> Start/EndBundle), and am running on dataflow which holds the watermark >> during bundle processing. >> >> I believe the issue here is that the input timestamp of the element is >> "impossible" given the current window it's in, which manifests later as the >> timer output timestamp validation firing, as well as the timestamp combiner >> error I linked in the thread. >> >> On Fri, Apr 1, 2022 at 7:50 PM Evan Galpin <[email protected]> wrote: >> >>> I believe that this thread is entirely related to an another thread[1] >>> where there is discussion that the correct fix for this issue could be to >>> enforce that watermark updates would only happen at bundle boundaries. >>> There’s >>> another related thread[2] citing the same error with ElasticsearchIO. >>> >>> This appears to be the same root cause where multiple entities from >>> different windows end up in the same bundle, and the watermark updates >>> between elements of that bundle causing some timestamps to be “invalid”. >>> >>> [1] >>> https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv >>> [2] >>> https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db >>> >>> - Evan >>> >>> On Fri, Apr 1, 2022 at 18:35 Steve Niemitz <[email protected]> wrote: >>> >>>> > but this only impacts non-portable runners. >>>> >>>> heh like Dataflow? :( >>>> >>>> I'm not sure what the solution is here then? I managed to hit this bug >>>> within 2 hours of running my first pipeline on 2.37. I can't just live >>>> with pipelines breaking randomly, and it seems like everything worked fine >>>> before that check was introduced (even if the output timestamp was >>>> technically wrong?). Is the answer to add a 1ms allowed skew to all my >>>> DoFns that use processing time timers? Remove the check again? (which is >>>> probably what I'll do in the short term in our fork) >>>> >>>> When you say "There is another bug about the wrong output timestamp >>>> being used for processing time timers", which timestamp is that that's >>>> incorrect? The element input timestamp? What should it be instead? >>>> >>>> On Fri, Apr 1, 2022 at 6:09 PM Luke Cwik <[email protected]> wrote: >>>> >>>>> The longstanding issue was that this was always happening but was not >>>>> visible, recently validation was added to make it visible that this was >>>>> wrong[1]. >>>>> >>>>> There is another bug about the wrong output timestamp being used for >>>>> processing time timers that exacerbates this but this only impacts >>>>> non-portable runners. >>>>> >>>>> 1: https://issues.apache.org/jira/browse/BEAM-12931 >>>>> >>>>> >>>>> On Fri, Apr 1, 2022 at 2:17 PM Steve Niemitz <[email protected]> >>>>> wrote: >>>>> >>>>>> I'm unclear how the timer would have even ever been set to output at >>>>>> that timestamp though. The output timestamp falls into the next window, >>>>>> and if unset (like in this case) the output timestamp is derived from the >>>>>> element timestamp [1]. This means we somehow had an element in the wrong >>>>>> window? This seems strangely similar to BEAM-6757 actually. >>>>>> >>>>>> What long-standing bug were you talking about here? We've been >>>>>> running these pipelines for years now and never run into this until now, >>>>>> although maybe we had been, but there was no validation in the past to >>>>>> catch it until now? >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1284 >>>>>> >>>>>> >>>>>> On Fri, Apr 1, 2022 at 5:11 PM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> There is a long-standing bug with processing timestamps. >>>>>>> >>>>>>> On Fri, Apr 1, 2022 at 2:01 PM Steve Niemitz <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> We have a job that uses processing time timers, and just upgraded >>>>>>>> from 2.33 to 2.37. Sporadically we've started seeing jobs fail with >>>>>>>> this >>>>>>>> error: >>>>>>>> >>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp >>>>>>>> 2022-04-01T19:19:59.999Z. Output timestamps must be no earlier than the >>>>>>>> output timestamp of the timer (2022-04-01T19:20:00.000Z) minus the >>>>>>>> allowed >>>>>>>> skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. >>>>>>>> See the >>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc for details on changing the >>>>>>>> allowed >>>>>>>> skew. >>>>>>>> at >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.checkTimestamp(SimpleDoFnRunner.java:883) >>>>>>>> at >>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:863) >>>>>>>> at >>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85) >>>>>>>> <user code> >>>>>>>> >>>>>>>> This windowing is configured with 10 minute fixed windows and 10 >>>>>>>> minute allowed lateness. We're not specifically setting the output >>>>>>>> time on >>>>>>>> the timer, so it seems like it's getting inferred from the element >>>>>>>> timestamp? The code that emits elements from the timer uses >>>>>>>> window.maxTimestamp() to set the output timestamp. I'm not sure I >>>>>>>> understand how an element with a timestamp in what should be the next >>>>>>>> window ended up in the previous one? Given that this is the first >>>>>>>> stateful >>>>>>>> operation in the pipeline and we read from pubsub using pubsub >>>>>>>> timestamps, >>>>>>>> so there should be no late data. >>>>>>>> >>>>>>>> I know there was a change recently to better validate the output >>>>>>>> timestamp from timers [1], I'm having trouble understanding if there's >>>>>>>> a >>>>>>>> bug in that, or if this is actually exposing a real bug in our >>>>>>>> pipeline. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/beam/commit/15048929495ad66963b528d5bd71eb7b4a844c96 >>>>>>>> >>>>>>>
