Oh I had forgotten about that thread, good point, it is very related to
this. I agree we should fix this, to force the conversation, I've opened a
PR to do so: https://github.com/apache/beam/pull/17262

As a good example of this behavior simply in the beam SDK, GroupIntoBatches
exhibits this problem by default because it uses output() from inside a
processing time timer.

- During normal operations (when using maxBufferingDuration), the timestamp
of elements it outputs may be arbitrary outside of the current window it's
outputting into.  I wrote a quick pipeline to verify this and almost
immediately hit this case.
- When draining a pipeline (in Dataflow), any buffered elements are output
with a timestamp of BoundedWindow.TIMESTAMP_MAX_VALUE (since the input
watermark has gone to that)

Also, not to derail this conversation, but I believe there's a bug
(BEAM-14239) in dataflow's timer implementation, where resetting a timer
with a new output timestamp doesn't clear the existing one, resulting in
multiple timers being set and firing.


On Mon, Apr 4, 2022 at 4:47 AM Jan Lukavský <[email protected]> wrote:

> On 4/2/22 16:41, Steve Niemitz wrote:
>
> I've dug into this some more and have a couple observations/questions:
> - I added some logging to my DoFn in both @ProcessElement and @OnTimer, I
> can confirm that I never have late data coming into ProcessElement
> (element.timestamp() is never after the end of the window)
> - The OnTimer method does end up having an output timestamp >
> window.maxTimestamp (unsurprisingly)
>
> I traced into SimpleDoFnRunner.onTimer [1], which seems to ignore the
> output timestamp completely for processing time timers and uses the input
> watermark instead [1].  I believe this is what Luke was talking about
> above?
>
> "There is another bug about the wrong output timestamp being used for
> processing time timers that exacerbates this but this only impacts
> non-portable runners."
>
> I'm unclear why the input watermark would be used here.  My understanding
> is that the input watermark can progress, but the _output_ watermark is
> held in the stateful DoFn.
>
> Is there a reason the input watermark is used here, rather than the actual
> output timestamp of the timer (or the output watermark)?
>
> This was touched in [1], but the discussion stopped there. I think we
> should move the discussion forward, because the input watermark can move
> arbitrarily and the "position" in event-time is really given by output
> watermark.
>
> [1] https://lists.apache.org/thread/klj24hbl88lpby3q0ormg7op8x84tj4v
>
>
>
> On Fri, Apr 1, 2022 at 8:12 PM Steve Niemitz <[email protected]> wrote:
>
>> While I do agree the symptoms are similar, I don't believe these are
>> related.  The ESIO bug was centered around both "bundle" operations
>> (StartBundle/EndBundle) and watermark updates during bundle processing.  In
>> my case I'm not using anything related to bundle operations (no
>> Start/EndBundle), and am running on dataflow which holds the watermark
>> during bundle processing.
>>
>> I believe the issue here is that the input timestamp of the element is
>> "impossible" given the current window it's in, which manifests later as the
>> timer output timestamp validation firing, as well as the timestamp combiner
>> error I linked in the thread.
>>
>> On Fri, Apr 1, 2022 at 7:50 PM Evan Galpin <[email protected]> wrote:
>>
>>> I believe that this thread is entirely related to an another thread[1]
>>> where there is discussion that the correct fix for this issue could be to
>>> enforce that watermark updates would only happen at bundle boundaries. 
>>> There’s
>>> another related thread[2] citing the same error with ElasticsearchIO.
>>>
>>> This appears to be the same root cause where multiple entities from
>>> different windows end up in the same bundle, and the watermark updates
>>> between elements of that bundle causing some timestamps to be “invalid”.
>>>
>>> [1]
>>> https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>>> [2]
>>> https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
>>>
>>> - Evan
>>>
>>> On Fri, Apr 1, 2022 at 18:35 Steve Niemitz <[email protected]> wrote:
>>>
>>>> > but this only impacts non-portable runners.
>>>>
>>>> heh like Dataflow? :(
>>>>
>>>> I'm not sure what the solution is here then?  I managed to hit this bug
>>>> within 2 hours of running my first pipeline on 2.37.  I can't just live
>>>> with pipelines breaking randomly, and it seems like everything worked fine
>>>> before that check was introduced (even if the output timestamp was
>>>> technically wrong?).  Is the answer to add a 1ms allowed skew to all my
>>>> DoFns that use processing time timers?  Remove the check again? (which is
>>>> probably what I'll do in the short term in our fork)
>>>>
>>>> When you say "There is another bug about the wrong output timestamp
>>>> being used for processing time timers", which timestamp is that that's
>>>> incorrect?  The element input timestamp?  What should it be instead?
>>>>
>>>> On Fri, Apr 1, 2022 at 6:09 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> The longstanding issue was that this was always happening but was not
>>>>> visible, recently validation was added to make it visible that this was
>>>>> wrong[1].
>>>>>
>>>>> There is another bug about the wrong output timestamp being used for
>>>>> processing time timers that exacerbates this but this only impacts
>>>>> non-portable runners.
>>>>>
>>>>> 1: https://issues.apache.org/jira/browse/BEAM-12931
>>>>>
>>>>>
>>>>> On Fri, Apr 1, 2022 at 2:17 PM Steve Niemitz <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I'm unclear how the timer would have even ever been set to output at
>>>>>> that timestamp though.  The output timestamp falls into the next window,
>>>>>> and if unset (like in this case) the output timestamp is derived from the
>>>>>> element timestamp [1].  This means we somehow had an element in the wrong
>>>>>> window?  This seems strangely similar to BEAM-6757 actually.
>>>>>>
>>>>>> What long-standing bug were you talking about here?  We've been
>>>>>> running these pipelines for years now and never run into this until now,
>>>>>> although maybe we had been, but there was no validation in the past to
>>>>>> catch it until now?
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1284
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 1, 2022 at 5:11 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> There is a long-standing bug with processing timestamps.
>>>>>>>
>>>>>>> On Fri, Apr 1, 2022 at 2:01 PM Steve Niemitz <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> We have a job that uses processing time timers, and just upgraded
>>>>>>>> from 2.33 to 2.37.  Sporadically we've started seeing jobs fail with 
>>>>>>>> this
>>>>>>>> error:
>>>>>>>>
>>>>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp
>>>>>>>> 2022-04-01T19:19:59.999Z. Output timestamps must be no earlier than the
>>>>>>>> output timestamp of the timer (2022-04-01T19:20:00.000Z) minus the 
>>>>>>>> allowed
>>>>>>>> skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. 
>>>>>>>> See the
>>>>>>>> DoFn#getAllowedTimestampSkew() Javadoc for details on changing the 
>>>>>>>> allowed
>>>>>>>> skew.
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.checkTimestamp(SimpleDoFnRunner.java:883)
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:863)
>>>>>>>> at
>>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
>>>>>>>> <user code>
>>>>>>>>
>>>>>>>> This windowing is configured with 10 minute fixed windows and 10
>>>>>>>> minute allowed lateness.  We're not specifically setting the output 
>>>>>>>> time on
>>>>>>>> the timer, so it seems like it's getting inferred from the element
>>>>>>>> timestamp?  The code that emits elements from the timer uses
>>>>>>>> window.maxTimestamp() to set the output timestamp.  I'm not sure I
>>>>>>>> understand how an element with a timestamp in what should be the next
>>>>>>>> window ended up in the previous one?  Given that this is the first 
>>>>>>>> stateful
>>>>>>>> operation in the pipeline and we read from pubsub using pubsub 
>>>>>>>> timestamps,
>>>>>>>> so there should be no late data.
>>>>>>>>
>>>>>>>> I know there was a change recently to better validate the output
>>>>>>>> timestamp from timers [1], I'm having trouble understanding if there's 
>>>>>>>> a
>>>>>>>> bug in that, or if this is actually exposing a real bug in our 
>>>>>>>> pipeline.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/commit/15048929495ad66963b528d5bd71eb7b4a844c96
>>>>>>>>
>>>>>>>

Reply via email to