Hi,
what seems the most "surprising" to me is that we are using
TimestampCombiners to actually do two (orthogonal) things:
a) calculate a watermark hold for a window, so on-time elements
emitted from a pane are not late in downstream processing
b) calculate timestamp of elements in output pane
These two follow a little different constraints - while in case a) it is
not allowed to shift watermark "back in time" in case b) it seems OK to
output data with timestamp lower than output watermark (what comes late,
might leave late). So, while it seems OK to discard late elements for
the sake of calculation output watermark, it seems wrong to discard them
when calculating output timestamp. Maybe these two timestamps might be
held in different states (the state will be held until GC time for
accumulating panes and reset on discarding panes)?
Jan
On 5/28/20 5:02 PM, David Morávek wrote:
Hi,
I've came across "unexpected" model behaviour when dealing with late
data and custom timestamp combiners. Let's take a following pipeline
as an example:
final PCollection<String> input = ...;
input.apply(
"GlobalWindows",
Window.<String>into(new GlobalWindows())
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))))
.withTimestampCombiner(TimestampCombiner.LATEST)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.accumulatingFiredPanes())
.apply("Aggregate", Count.perElement())
The above pipeline emits updates with the latest input timestamp it
has seen so far (from non-late elements). We write the output from
this timestamp to kafka and read it from another pipeline.
Problem comes when we need to handle late elements behind output
watermark. In this case beam can not use combined timestamp and uses
EOW timestamp instead. Unfortunately this results in downstream
pipeline progressing it's input watermark to end of global window.
Also if we would use fixed windows after this aggregation, it would
yield unexpected results.
There is no reasoning about this behaviour in the last section of
lateness design doc <https://s.apache.org/beam-lateness> [1], so I'd
like to open a discussion about what the expected result should be.
My personal opinion is, that correct approach would be emitting late
elements with currentOutputWatermark rather than EOW in case of
EARLIEST and LATEST timestamp combiners.
I've prepared a faling test case for ReduceFnRunner
<https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
if anyone wants to play around with the issue.
I also think that BEAM-2262
<https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related
to this discussion.
[1] https://s.apache.org/beam-lateness
[2] https://issues.apache.org/jira/browse/BEAM-2262
[3]
https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
Looking forward to hearing your thoughts.
Thanks,
D.