Agreed, let's resolve the conversation in that PR and we should be good to land it.
On Mon, Apr 4, 2022 at 5:15 PM Reuven Lax <[email protected]> wrote: > FYI - the bug you found is the one I was referring to. I actually had > fixed this as part of a larger PR (that fixes bugs in DirectRunner), but to > get this in faster we should definitely move forward with the more-targeted > PR. > > On Mon, Apr 4, 2022 at 6:59 AM Steve Niemitz <[email protected]> wrote: > >> Oh I had forgotten about that thread, good point, it is very related to >> this. I agree we should fix this, to force the conversation, I've opened a >> PR to do so: https://github.com/apache/beam/pull/17262 >> >> As a good example of this behavior simply in the beam SDK, >> GroupIntoBatches exhibits this problem by default because it uses output() >> from inside a processing time timer. >> >> - During normal operations (when using maxBufferingDuration), the >> timestamp of elements it outputs may be arbitrary outside of the current >> window it's outputting into. I wrote a quick pipeline to verify this and >> almost immediately hit this case. >> - When draining a pipeline (in Dataflow), any buffered elements are >> output with a timestamp of BoundedWindow.TIMESTAMP_MAX_VALUE (since the >> input watermark has gone to that) >> >> Also, not to derail this conversation, but I believe there's a bug >> (BEAM-14239) in dataflow's timer implementation, where resetting a timer >> with a new output timestamp doesn't clear the existing one, resulting in >> multiple timers being set and firing. >> >> >> On Mon, Apr 4, 2022 at 4:47 AM Jan Lukavský <[email protected]> wrote: >> >>> On 4/2/22 16:41, Steve Niemitz wrote: >>> >>> I've dug into this some more and have a couple observations/questions: >>> - I added some logging to my DoFn in both @ProcessElement and @OnTimer, >>> I can confirm that I never have late data coming into ProcessElement >>> (element.timestamp() is never after the end of the window) >>> - The OnTimer method does end up having an output timestamp > >>> window.maxTimestamp (unsurprisingly) >>> >>> I traced into SimpleDoFnRunner.onTimer [1], which seems to ignore the >>> output timestamp completely for processing time timers and uses the input >>> watermark instead [1]. I believe this is what Luke was talking about >>> above? >>> >>> "There is another bug about the wrong output timestamp being used for >>> processing time timers that exacerbates this but this only impacts >>> non-portable runners." >>> >>> I'm unclear why the input watermark would be used here. My >>> understanding is that the input watermark can progress, but the _output_ >>> watermark is held in the stateful DoFn. >>> >>> Is there a reason the input watermark is used here, rather than the >>> actual output timestamp of the timer (or the output watermark)? >>> >>> This was touched in [1], but the discussion stopped there. I think we >>> should move the discussion forward, because the input watermark can move >>> arbitrarily and the "position" in event-time is really given by output >>> watermark. >>> >>> [1] https://lists.apache.org/thread/klj24hbl88lpby3q0ormg7op8x84tj4v >>> >>> >>> >>> On Fri, Apr 1, 2022 at 8:12 PM Steve Niemitz <[email protected]> >>> wrote: >>> >>>> While I do agree the symptoms are similar, I don't believe these are >>>> related. The ESIO bug was centered around both "bundle" operations >>>> (StartBundle/EndBundle) and watermark updates during bundle processing. In >>>> my case I'm not using anything related to bundle operations (no >>>> Start/EndBundle), and am running on dataflow which holds the watermark >>>> during bundle processing. >>>> >>>> I believe the issue here is that the input timestamp of the element is >>>> "impossible" given the current window it's in, which manifests later as the >>>> timer output timestamp validation firing, as well as the timestamp combiner >>>> error I linked in the thread. >>>> >>>> On Fri, Apr 1, 2022 at 7:50 PM Evan Galpin <[email protected]> wrote: >>>> >>>>> I believe that this thread is entirely related to an another thread[1] >>>>> where there is discussion that the correct fix for this issue could be to >>>>> enforce that watermark updates would only happen at bundle boundaries. >>>>> There’s >>>>> another related thread[2] citing the same error with ElasticsearchIO. >>>>> >>>>> This appears to be the same root cause where multiple entities from >>>>> different windows end up in the same bundle, and the watermark updates >>>>> between elements of that bundle causing some timestamps to be “invalid”. >>>>> >>>>> [1] >>>>> https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv >>>>> [2] >>>>> https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db >>>>> >>>>> - Evan >>>>> >>>>> On Fri, Apr 1, 2022 at 18:35 Steve Niemitz <[email protected]> >>>>> wrote: >>>>> >>>>>> > but this only impacts non-portable runners. >>>>>> >>>>>> heh like Dataflow? :( >>>>>> >>>>>> I'm not sure what the solution is here then? I managed to hit this >>>>>> bug within 2 hours of running my first pipeline on 2.37. I can't just >>>>>> live >>>>>> with pipelines breaking randomly, and it seems like everything worked >>>>>> fine >>>>>> before that check was introduced (even if the output timestamp was >>>>>> technically wrong?). Is the answer to add a 1ms allowed skew to all my >>>>>> DoFns that use processing time timers? Remove the check again? (which is >>>>>> probably what I'll do in the short term in our fork) >>>>>> >>>>>> When you say "There is another bug about the wrong output timestamp >>>>>> being used for processing time timers", which timestamp is that that's >>>>>> incorrect? The element input timestamp? What should it be instead? >>>>>> >>>>>> On Fri, Apr 1, 2022 at 6:09 PM Luke Cwik <[email protected]> wrote: >>>>>> >>>>>>> The longstanding issue was that this was always happening but was >>>>>>> not visible, recently validation was added to make it visible that this >>>>>>> was >>>>>>> wrong[1]. >>>>>>> >>>>>>> There is another bug about the wrong output timestamp being used for >>>>>>> processing time timers that exacerbates this but this only impacts >>>>>>> non-portable runners. >>>>>>> >>>>>>> 1: https://issues.apache.org/jira/browse/BEAM-12931 >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 1, 2022 at 2:17 PM Steve Niemitz <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I'm unclear how the timer would have even ever been set to output >>>>>>>> at that timestamp though. The output timestamp falls into the next >>>>>>>> window, >>>>>>>> and if unset (like in this case) the output timestamp is derived from >>>>>>>> the >>>>>>>> element timestamp [1]. This means we somehow had an element in the >>>>>>>> wrong >>>>>>>> window? This seems strangely similar to BEAM-6757 actually. >>>>>>>> >>>>>>>> What long-standing bug were you talking about here? We've been >>>>>>>> running these pipelines for years now and never run into this until >>>>>>>> now, >>>>>>>> although maybe we had been, but there was no validation in the past to >>>>>>>> catch it until now? >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1284 >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Apr 1, 2022 at 5:11 PM Reuven Lax <[email protected]> wrote: >>>>>>>> >>>>>>>>> There is a long-standing bug with processing timestamps. >>>>>>>>> >>>>>>>>> On Fri, Apr 1, 2022 at 2:01 PM Steve Niemitz <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> We have a job that uses processing time timers, and just upgraded >>>>>>>>>> from 2.33 to 2.37. Sporadically we've started seeing jobs fail with >>>>>>>>>> this >>>>>>>>>> error: >>>>>>>>>> >>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp >>>>>>>>>> 2022-04-01T19:19:59.999Z. Output timestamps must be no earlier than >>>>>>>>>> the >>>>>>>>>> output timestamp of the timer (2022-04-01T19:20:00.000Z) minus the >>>>>>>>>> allowed >>>>>>>>>> skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. >>>>>>>>>> See the >>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc for details on changing the >>>>>>>>>> allowed >>>>>>>>>> skew. >>>>>>>>>> at >>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.checkTimestamp(SimpleDoFnRunner.java:883) >>>>>>>>>> at >>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:863) >>>>>>>>>> at >>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85) >>>>>>>>>> <user code> >>>>>>>>>> >>>>>>>>>> This windowing is configured with 10 minute fixed windows and 10 >>>>>>>>>> minute allowed lateness. We're not specifically setting the output >>>>>>>>>> time on >>>>>>>>>> the timer, so it seems like it's getting inferred from the element >>>>>>>>>> timestamp? The code that emits elements from the timer uses >>>>>>>>>> window.maxTimestamp() to set the output timestamp. I'm not sure I >>>>>>>>>> understand how an element with a timestamp in what should be the next >>>>>>>>>> window ended up in the previous one? Given that this is the first >>>>>>>>>> stateful >>>>>>>>>> operation in the pipeline and we read from pubsub using pubsub >>>>>>>>>> timestamps, >>>>>>>>>> so there should be no late data. >>>>>>>>>> >>>>>>>>>> I know there was a change recently to better validate the output >>>>>>>>>> timestamp from timers [1], I'm having trouble understanding if >>>>>>>>>> there's a >>>>>>>>>> bug in that, or if this is actually exposing a real bug in our >>>>>>>>>> pipeline. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://github.com/apache/beam/commit/15048929495ad66963b528d5bd71eb7b4a844c96 >>>>>>>>>> >>>>>>>>>
