Hi Reuven,

the asynchronicity of watermark update is what I was missing - it is what relates watermarkhold with element output timestamp. On the other hand, we have some invariants that have to hold, namely:

 a) element arriving as non-late MUST NOT be changed to late

 b) element arriving as late MIGHT be changed to non-late

 c) operator's output watermark MUST be less than input watermark at any time

Properties a) and b) are somewhat natural requirements and property c) follows the fact, that it is impossible to exactly predict future.

Now, having these three properties, would it be possible to:

 a) when pane contains both late and on-time elements, split the pane into two, containing only late and on-time elements

 b) calculate output timestamp of all panes using timestamp combiner (now pane contains only late or on time elements, so no timestamp combiner should be able to violate neither of properties a) or b))

 c) calculate when there is pane that contains only late elements, update watermark hold to min(current input watermark, window gc time) - so that the output watermark can progress up to input watermark (and not violate property c) above)

I seems to me that what currently stands in the way is that

 a) panes are not split to late and non-late only (and this might be tricky, mostly for combining transforms)

 b) the watermark hold with late-only pane is set to window gc time (instead of adding the input watermark as well) - [1]

With TimestampCombiner.LATEST and END_OF_WINDOW it seems that splitting the panes would not be required, as the timestamp combiner can only shift late elements forward (make use of property b)). TimestampCombiner.EARLIEST would probably require splitting the panes, which seems to solve the mentioned [BEAM-2262].

WDYT?

[1] https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f

On 5/29/20 5:01 PM, Reuven Lax wrote:
This does seem non intuitive, though I'm not sure what the best approach is.

The problem with using currentOutputWatermark as the output timestamp is that Beam does not define watermark advancement to be synchronous, and at least the Dataflow runner calculates watermarks completely independent of bundle processing. This means that the output watermark could advance immediately after being checked, which would cause the records output to be arbitrarily late. So for example, if allowedLateness is 10 seconds, then this trigger will accept a record that is 5 seconds late. However if currentOutputWaternark advances by 15 seconds after checking it, then you would end up outputting a result that is 15 seconds late and therefore would be dropped.

IMO it's most important that on-time elements are never turned into late. elements. However the above behavior also seems confusing to users.

Worth noting that I don't think that the current behavior is that much better. If the output watermark is close to the end of the window, then I think the existing model can also cause this scenario to happen.

Reuven

On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    what seems the most "surprising" to me is that we are using
    TimestampCombiners to actually do two (orthogonal) things:

     a) calculate a watermark hold for a window, so on-time elements
    emitted from a pane are not late in downstream processing

     b) calculate timestamp of elements in output pane

    These two follow a little different constraints - while in case a)
    it is not allowed to shift watermark "back in time" in case b) it
    seems OK to output data with timestamp lower than output watermark
    (what comes late, might leave late). So, while it seems OK to
    discard late elements for the sake of calculation output
    watermark, it seems wrong to discard them when calculating output
    timestamp. Maybe these two timestamps might be held in different
    states (the state will be held until GC time for accumulating
    panes and reset on discarding panes)?

    Jan

    On 5/28/20 5:02 PM, David Morávek wrote:
    Hi,

    I've came across "unexpected" model behaviour when dealing with
    late data and custom timestamp combiners. Let's take a following
    pipeline as an example:

    final PCollection<String> input = ...;
    input.apply(
          "GlobalWindows",
          Window.<String>into(new GlobalWindows())
              .triggering(
                  AfterWatermark.pastEndOfWindow()
                      .withEarlyFirings(
    AfterProcessingTime.pastFirstElementInPane()
    .plusDelayOf(Duration.standardSeconds(10))))
    .withTimestampCombiner(TimestampCombiner.LATEST)
    .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
              .accumulatingFiredPanes())
      .apply("Aggregate", Count.perElement())

    The above pipeline emits updates with the latest input timestamp
    it has seen so far (from non-late elements). We write the output
    from this timestamp to kafka and read it from another pipeline.

    Problem comes when we need to handle late elements behind output
    watermark. In this case beam can not use combined timestamp and
    uses EOW timestamp instead. Unfortunately this results in
    downstream pipeline progressing it's input watermark to end of
    global window. Also if we would use fixed windows after this
    aggregation, it would yield unexpected results.

    There is no reasoning about this behaviour in the last section of
    lateness design doc <https://s.apache.org/beam-lateness> [1], so
    I'd like to open a discussion about what the expected result
    should be.

    My personal opinion is, that correct approach would be emitting
    late elements with currentOutputWatermark rather than EOW in case
    of EARLIEST and LATEST timestamp combiners.

    I've prepared a faling test case for ReduceFnRunner
    
<https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
    if anyone wants to play around with the issue.

    I also think that BEAM-2262
    <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be
    related to this discussion.

    [1] https://s.apache.org/beam-lateness
    [2] https://issues.apache.org/jira/browse/BEAM-2262
    [3]
    https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b

    Looking forward to hearing your thoughts.

    Thanks,
    D.

Reply via email to