Agreed, let's resolve the conversation in that PR and we should be good to
land it.

On Mon, Apr 4, 2022 at 5:15 PM Reuven Lax <[email protected]> wrote:

> FYI - the bug you found is the one I was referring to. I actually had
> fixed this as part of a larger PR (that fixes bugs in DirectRunner), but to
> get this in faster we should definitely move forward with the more-targeted
> PR.
>
> On Mon, Apr 4, 2022 at 6:59 AM Steve Niemitz <[email protected]> wrote:
>
>> Oh I had forgotten about that thread, good point, it is very related to
>> this. I agree we should fix this, to force the conversation, I've opened a
>> PR to do so: https://github.com/apache/beam/pull/17262
>>
>> As a good example of this behavior simply in the beam SDK,
>> GroupIntoBatches exhibits this problem by default because it uses output()
>> from inside a processing time timer.
>>
>> - During normal operations (when using maxBufferingDuration), the
>> timestamp of elements it outputs may be arbitrary outside of the current
>> window it's outputting into.  I wrote a quick pipeline to verify this and
>> almost immediately hit this case.
>> - When draining a pipeline (in Dataflow), any buffered elements are
>> output with a timestamp of BoundedWindow.TIMESTAMP_MAX_VALUE (since the
>> input watermark has gone to that)
>>
>> Also, not to derail this conversation, but I believe there's a bug
>> (BEAM-14239) in dataflow's timer implementation, where resetting a timer
>> with a new output timestamp doesn't clear the existing one, resulting in
>> multiple timers being set and firing.
>>
>>
>> On Mon, Apr 4, 2022 at 4:47 AM Jan Lukavský <[email protected]> wrote:
>>
>>> On 4/2/22 16:41, Steve Niemitz wrote:
>>>
>>> I've dug into this some more and have a couple observations/questions:
>>> - I added some logging to my DoFn in both @ProcessElement and @OnTimer,
>>> I can confirm that I never have late data coming into ProcessElement
>>> (element.timestamp() is never after the end of the window)
>>> - The OnTimer method does end up having an output timestamp >
>>> window.maxTimestamp (unsurprisingly)
>>>
>>> I traced into SimpleDoFnRunner.onTimer [1], which seems to ignore the
>>> output timestamp completely for processing time timers and uses the input
>>> watermark instead [1].  I believe this is what Luke was talking about
>>> above?
>>>
>>> "There is another bug about the wrong output timestamp being used for
>>> processing time timers that exacerbates this but this only impacts
>>> non-portable runners."
>>>
>>> I'm unclear why the input watermark would be used here.  My
>>> understanding is that the input watermark can progress, but the _output_
>>> watermark is held in the stateful DoFn.
>>>
>>> Is there a reason the input watermark is used here, rather than the
>>> actual output timestamp of the timer (or the output watermark)?
>>>
>>> This was touched in [1], but the discussion stopped there. I think we
>>> should move the discussion forward, because the input watermark can move
>>> arbitrarily and the "position" in event-time is really given by output
>>> watermark.
>>>
>>> [1] https://lists.apache.org/thread/klj24hbl88lpby3q0ormg7op8x84tj4v
>>>
>>>
>>>
>>> On Fri, Apr 1, 2022 at 8:12 PM Steve Niemitz <[email protected]>
>>> wrote:
>>>
>>>> While I do agree the symptoms are similar, I don't believe these are
>>>> related.  The ESIO bug was centered around both "bundle" operations
>>>> (StartBundle/EndBundle) and watermark updates during bundle processing.  In
>>>> my case I'm not using anything related to bundle operations (no
>>>> Start/EndBundle), and am running on dataflow which holds the watermark
>>>> during bundle processing.
>>>>
>>>> I believe the issue here is that the input timestamp of the element is
>>>> "impossible" given the current window it's in, which manifests later as the
>>>> timer output timestamp validation firing, as well as the timestamp combiner
>>>> error I linked in the thread.
>>>>
>>>> On Fri, Apr 1, 2022 at 7:50 PM Evan Galpin <[email protected]> wrote:
>>>>
>>>>> I believe that this thread is entirely related to an another thread[1]
>>>>> where there is discussion that the correct fix for this issue could be to
>>>>> enforce that watermark updates would only happen at bundle boundaries. 
>>>>> There’s
>>>>> another related thread[2] citing the same error with ElasticsearchIO.
>>>>>
>>>>> This appears to be the same root cause where multiple entities from
>>>>> different windows end up in the same bundle, and the watermark updates
>>>>> between elements of that bundle causing some timestamps to be “invalid”.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>>>>> [2]
>>>>> https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
>>>>>
>>>>> - Evan
>>>>>
>>>>> On Fri, Apr 1, 2022 at 18:35 Steve Niemitz <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> > but this only impacts non-portable runners.
>>>>>>
>>>>>> heh like Dataflow? :(
>>>>>>
>>>>>> I'm not sure what the solution is here then?  I managed to hit this
>>>>>> bug within 2 hours of running my first pipeline on 2.37.  I can't just 
>>>>>> live
>>>>>> with pipelines breaking randomly, and it seems like everything worked 
>>>>>> fine
>>>>>> before that check was introduced (even if the output timestamp was
>>>>>> technically wrong?).  Is the answer to add a 1ms allowed skew to all my
>>>>>> DoFns that use processing time timers?  Remove the check again? (which is
>>>>>> probably what I'll do in the short term in our fork)
>>>>>>
>>>>>> When you say "There is another bug about the wrong output timestamp
>>>>>> being used for processing time timers", which timestamp is that that's
>>>>>> incorrect?  The element input timestamp?  What should it be instead?
>>>>>>
>>>>>> On Fri, Apr 1, 2022 at 6:09 PM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> The longstanding issue was that this was always happening but was
>>>>>>> not visible, recently validation was added to make it visible that this 
>>>>>>> was
>>>>>>> wrong[1].
>>>>>>>
>>>>>>> There is another bug about the wrong output timestamp being used for
>>>>>>> processing time timers that exacerbates this but this only impacts
>>>>>>> non-portable runners.
>>>>>>>
>>>>>>> 1: https://issues.apache.org/jira/browse/BEAM-12931
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 1, 2022 at 2:17 PM Steve Niemitz <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm unclear how the timer would have even ever been set to output
>>>>>>>> at that timestamp though.  The output timestamp falls into the next 
>>>>>>>> window,
>>>>>>>> and if unset (like in this case) the output timestamp is derived from 
>>>>>>>> the
>>>>>>>> element timestamp [1].  This means we somehow had an element in the 
>>>>>>>> wrong
>>>>>>>> window?  This seems strangely similar to BEAM-6757 actually.
>>>>>>>>
>>>>>>>> What long-standing bug were you talking about here?  We've been
>>>>>>>> running these pipelines for years now and never run into this until 
>>>>>>>> now,
>>>>>>>> although maybe we had been, but there was no validation in the past to
>>>>>>>> catch it until now?
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1284
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 1, 2022 at 5:11 PM Reuven Lax <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> There is a long-standing bug with processing timestamps.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 1, 2022 at 2:01 PM Steve Niemitz <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> We have a job that uses processing time timers, and just upgraded
>>>>>>>>>> from 2.33 to 2.37.  Sporadically we've started seeing jobs fail with 
>>>>>>>>>> this
>>>>>>>>>> error:
>>>>>>>>>>
>>>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp
>>>>>>>>>> 2022-04-01T19:19:59.999Z. Output timestamps must be no earlier than 
>>>>>>>>>> the
>>>>>>>>>> output timestamp of the timer (2022-04-01T19:20:00.000Z) minus the 
>>>>>>>>>> allowed
>>>>>>>>>> skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. 
>>>>>>>>>> See the
>>>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc for details on changing the 
>>>>>>>>>> allowed
>>>>>>>>>> skew.
>>>>>>>>>> at
>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.checkTimestamp(SimpleDoFnRunner.java:883)
>>>>>>>>>> at
>>>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:863)
>>>>>>>>>> at
>>>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
>>>>>>>>>> <user code>
>>>>>>>>>>
>>>>>>>>>> This windowing is configured with 10 minute fixed windows and 10
>>>>>>>>>> minute allowed lateness.  We're not specifically setting the output 
>>>>>>>>>> time on
>>>>>>>>>> the timer, so it seems like it's getting inferred from the element
>>>>>>>>>> timestamp?  The code that emits elements from the timer uses
>>>>>>>>>> window.maxTimestamp() to set the output timestamp.  I'm not sure I
>>>>>>>>>> understand how an element with a timestamp in what should be the next
>>>>>>>>>> window ended up in the previous one?  Given that this is the first 
>>>>>>>>>> stateful
>>>>>>>>>> operation in the pipeline and we read from pubsub using pubsub 
>>>>>>>>>> timestamps,
>>>>>>>>>> so there should be no late data.
>>>>>>>>>>
>>>>>>>>>> I know there was a change recently to better validate the output
>>>>>>>>>> timestamp from timers [1], I'm having trouble understanding if 
>>>>>>>>>> there's a
>>>>>>>>>> bug in that, or if this is actually exposing a real bug in our 
>>>>>>>>>> pipeline.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://github.com/apache/beam/commit/15048929495ad66963b528d5bd71eb7b4a844c96
>>>>>>>>>>
>>>>>>>>>

Reply via email to