@Jan I'm +1 on the idea.  Just confirming that this would not negate the
ability to buffer or otherwise make use of a settings like
FlinkPipelineOptions#setMaxBundleSize[1], the change would imply simply
refraining from outputting a watermark change until @FinishBundle is called
(across all runners)?  Assuming this garners the required support, I'd be
interested in collaborating/contributing if development could be
parallelized for various runners.

[1]
https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L207-L211

- Evan

On Mon, Mar 28, 2022 at 4:07 AM Jan Lukavský <[email protected]> wrote:

> Hi Robert,
>
> I had the same impression that holding the watermark between bundles is
> actually not part of the computational model. Now the question is -
> should it be?
>
> As you said, buffering and emitting in-memory buffered data means
> possibly outputting data that arrived as ON_TIME, but is outputted as
> LATE (or droppable, which is even worse). My understanding is that this
> is why there is the (deprecated) getAllowedTimestampSkew() method of
> DoFn, but that only bypasses the check, does not solve the issue (which
> is why it is deprecated, I suppose). I strongly believe that outputting
> elements that switch from ON_TIME to LATE is a correctness bug, because
> it has the potential to violate causality (which is strongly
> counter-intuitive in our universe :)). For some pipelines it can
> definitely cause incorrect outputs.
>
> If we could ensure the output watermark gets updated only between
> @FinishBundle and @StartBundle call then this problem would go away. I
> looked into the code of FlinkRunner and it seems to me that we could
> quite easily ensure this by not outputting watermark when a bundle is
> open and output it once it finishes. I didn't dig into that too deep, so
> I don't know if there would be any caveats, the question is apparently,
> if we could make these guarantees for other runners as well and if we
> could sensibly create a @ValidatesRunner test.
>
> WDYT?
>
>   Jan
>
> On 3/25/22 23:06, Robert Bradshaw wrote:
> > I do not think there is a hard and fast rule about updating watermarks
> > only at bundle boundaries. This seems perfectly legal for a pure 1:1
> > mapping DoFn. The issue is that DoFns are allowed to buffer data and
> > emit them in a later process (or finishBundle). If the watermark has
> > moved on, that may result in late data. We don't really have a way for
> > a DoFn to declare *it's* output watermark (i.e. "I promise not to emit
> > any data before this timestamp.")
> >
> > On Thu, Mar 24, 2022 at 8:10 AM Evan Galpin <[email protected]> wrote:
> >> Thanks for starting this thread Jan, I'm keen to hear thoughts and
> outcomes!  I thought I would mention that answers to the questions posed
> here will help to unblock a 2.38.0 release blocker[1].
> >>
> >> [1] https://issues.apache.org/jira/browse/BEAM-14064
> >>
> >> On Thu, Mar 24, 2022 at 5:28 AM Jan Lukavský <[email protected]> wrote:
> >>> Hi,
> >>>
> >>> this is follow-up thread started from [1]. In the thread there is
> mentioned multiple times that (in stateless ParDo), the output watermark is
> allowed to advance only on bundle boundaries [2]. Essentially that would
> mean that anything in between calls to @StartBundle and @FinishBundle would
> be processed in single instant in (output) event-time. This makes perfect
> sense.
> >>>
> >>> The issue is that it seems that not all runners actually implement
> this behavior. FlinkRunner for instance does not have a "natural" concept
> of bundles and those are created in a more ad-hoc way to adhere with the
> DoFn life-cycle (see [3]). Watermark updates and elements are completely
> interleaved without any synchronization with bundle "open" or "close". If
> watermark updates are allowed to happen only on boundaries of bundles, then
> this seems to break this contract.
> >>>
> >>> The question therefore is - should we consider FlinkRunner as
> non-compliant with this aspect of the Apache Beam model or is this an
> "optional" part that runners are free to implement at will? In the case of
> the former, do we miss some @ValidatesRunner tests for this?
> >>>
> >>>   Jan
> >>>
> >>> [1] https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
> >>>
> >>> [2] https://lists.apache.org/thread/7foy455spg43xo77zhrs62gc1m383t50
> >>>
> >>> [3]
> https://github.com/apache/beam/blob/14862ccbdf2879574b6ce49149bdd7c9bf197322/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L786
> >>>
> >>>
>

Reply via email to