Hi Robert,

I had the same impression that holding the watermark between bundles is actually not part of the computational model. Now the question is - should it be?

As you said, buffering and emitting in-memory buffered data means possibly outputting data that arrived as ON_TIME, but is outputted as LATE (or droppable, which is even worse). My understanding is that this is why there is the (deprecated) getAllowedTimestampSkew() method of DoFn, but that only bypasses the check, does not solve the issue (which is why it is deprecated, I suppose). I strongly believe that outputting elements that switch from ON_TIME to LATE is a correctness bug, because it has the potential to violate causality (which is strongly counter-intuitive in our universe :)). For some pipelines it can definitely cause incorrect outputs.

If we could ensure the output watermark gets updated only between @FinishBundle and @StartBundle call then this problem would go away. I looked into the code of FlinkRunner and it seems to me that we could quite easily ensure this by not outputting watermark when a bundle is open and output it once it finishes. I didn't dig into that too deep, so I don't know if there would be any caveats, the question is apparently, if we could make these guarantees for other runners as well and if we could sensibly create a @ValidatesRunner test.

WDYT?

 Jan

On 3/25/22 23:06, Robert Bradshaw wrote:
I do not think there is a hard and fast rule about updating watermarks
only at bundle boundaries. This seems perfectly legal for a pure 1:1
mapping DoFn. The issue is that DoFns are allowed to buffer data and
emit them in a later process (or finishBundle). If the watermark has
moved on, that may result in late data. We don't really have a way for
a DoFn to declare *it's* output watermark (i.e. "I promise not to emit
any data before this timestamp.")

On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin <[email protected]> wrote:
Thanks for starting this thread Jan, I'm keen to hear thoughts and outcomes!  I 
thought I would mention that answers to the questions posed here will help to 
unblock a 2.38.0 release blocker[1].

[1] https://issues.apache.org/jira/browse/BEAM-14064

On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský <[email protected]> wrote:
Hi,

this is follow-up thread started from [1]. In the thread there is mentioned 
multiple times that (in stateless ParDo), the output watermark is allowed to 
advance only on bundle boundaries [2]. Essentially that would mean that 
anything in between calls to @StartBundle and @FinishBundle would be processed 
in single instant in (output) event-time. This makes perfect sense.

The issue is that it seems that not all runners actually implement this behavior. FlinkRunner for instance 
does not have a "natural" concept of bundles and those are created in a more ad-hoc way to adhere 
with the DoFn life-cycle (see [3]). Watermark updates and elements are completely interleaved without any 
synchronization with bundle "open" or "close". If watermark updates are allowed to happen 
only on boundaries of bundles, then this seems to break this contract.

The question therefore is - should we consider FlinkRunner as non-compliant with this 
aspect of the Apache Beam model or is this an "optional" part that runners are 
free to implement at will? In the case of the former, do we miss some @ValidatesRunner 
tests for this?

  Jan

[1] https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db

[2] https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50

[3] 
https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L786


Reply via email to