This does seem non intuitive, though I'm not sure what the best approach is.

The problem with using currentOutputWatermark as the output timestamp is
that Beam does not define watermark advancement to be synchronous, and at
least the Dataflow runner calculates watermarks completely independent of
bundle processing. This means that the output watermark could advance
immediately after being checked, which would cause the records output to be
arbitrarily late. So for example, if allowedLateness is 10 seconds, then
this trigger will accept a record that is 5 seconds late. However if
currentOutputWaternark advances by 15 seconds after checking it, then you
would end up outputting a result that is 15 seconds late and therefore
would be dropped.

IMO it's most important that on-time elements are never turned into late.
elements. However the above behavior also seems confusing to users.

Worth noting that I don't think that the current behavior is that much
better. If the output watermark is close to the end of the window, then I
think the existing model can also cause this scenario to happen.

Reuven

On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> what seems the most "surprising" to me is that we are using
> TimestampCombiners to actually do two (orthogonal) things:
>
>  a) calculate a watermark hold for a window, so on-time elements emitted
> from a pane are not late in downstream processing
>
>  b) calculate timestamp of elements in output pane
>
> These two follow a little different constraints - while in case a) it is
> not allowed to shift watermark "back in time" in case b) it seems OK to
> output data with timestamp lower than output watermark (what comes late,
> might leave late). So, while it seems OK to discard late elements for the
> sake of calculation output watermark, it seems wrong to discard them when
> calculating output timestamp. Maybe these two timestamps might be held in
> different states (the state will be held until GC time for accumulating
> panes and reset on discarding panes)?
>
> Jan
> On 5/28/20 5:02 PM, David Morávek wrote:
>
> Hi,
>
> I've came across "unexpected" model behaviour when dealing with late data
> and custom timestamp combiners. Let's take a following pipeline as an
> example:
>
> final PCollection<String> input = ...;
> input.apply(
>       "GlobalWindows",
>       Window.<String>into(new GlobalWindows())
>           .triggering(
>               AfterWatermark.pastEndOfWindow()
>                   .withEarlyFirings(
>                       AfterProcessingTime.pastFirstElementInPane()
>                           .plusDelayOf(Duration.standardSeconds(10))))
>           .withTimestampCombiner(TimestampCombiner.LATEST)
>           .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>           .accumulatingFiredPanes())
>   .apply("Aggregate", Count.perElement())
>
> The above pipeline emits updates with the latest input timestamp it has
> seen so far (from non-late elements). We write the output from this
> timestamp to kafka and read it from another pipeline.
>
> Problem comes when we need to handle late elements behind output
> watermark. In this case beam can not use combined timestamp and uses EOW
> timestamp instead. Unfortunately this results in downstream pipeline
> progressing it's input watermark to end of global window. Also if we would
> use fixed windows after this aggregation, it would yield unexpected results.
>
> There is no reasoning about this behaviour in the last section of lateness
> design doc <https://s.apache.org/beam-lateness> [1], so I'd like to open
> a discussion about what the expected result should be.
>
> My personal opinion is, that correct approach would be emitting late
> elements with currentOutputWatermark rather than EOW in case of EARLIEST
> and LATEST timestamp combiners.
>
> I've prepared a faling test case for ReduceFnRunner
> <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
> if anyone wants to play around with the issue.
>
> I also think that BEAM-2262
> <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to
> this discussion.
>
> [1] https://s.apache.org/beam-lateness
> [2] https://issues.apache.org/jira/browse/BEAM-2262
> [3]
> https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>
> Looking forward to hearing your thoughts.
>
> Thanks,
> D.
>
>

Reply via email to