Hi Kenn,

agree that for on-time elements, the hold has to respect the output timestamps. For already late elements, it should be possible to calculate the output timestamp independently. Currently, the output watermark is *always* the value of watermark hold, which might be inappropriate for cases when the hold is explicitly cleared (because input is late and the hold is cleared in order not to hold downstream processing). My proposal would be to update the hold to input watermark (or more precisely to result of TimestampCombiner.assign(window, inputWatermark)), which seems to work, although it might hold watermark in cases when watermark updates even when no data arrives (I'm not sure how exactly to solve this). Another way around would be to clearly separate output timestamp (store it in different state than the watermark hold), because the output watermark should be cleared on different condition (hold clears after every firing to enable watemark progress, while output timestamp can be kept in case of accumulating panes).

Jan

On 6/2/20 1:42 AM, Kenneth Knowles wrote:
Quick reply about one top-level thing: output timestamps and watermark holds are closely related. A hold is precisely reserving the right to output at a particular time. The part that is unintuitive is that these are ever different. That is, really, a hack to allow downstream processing to proceed more quickly when input is already late. I wonder if there is another way to set up the requirements so that they are always the same value, but the downstream on time processing is not held up by late data.



On Sun, May 31, 2020 at 3:44 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Minor self-correction - in the property c) it MIGHT be possible to
    update output watermark to time greater than input watermark, _as
    long as any future element cannot be assigned timestamp that is
    less than the output watermark_. That seems to be the case only
    for TimestampCombiner.END_OF_WINDOW, as that actually does not
    depend on timestamps of the actual elements. This doesn't quite
    change the reasoning below, it might be possible to not store
    input watermark to watermark hold for this combiner, although it
    would probably have negligible practical impact.

    On 5/31/20 12:17 PM, Jan Lukavský wrote:

    Hi Reuven,

    the asynchronicity of watermark update is what I was missing - it
    is what relates watermarkhold with element output timestamp. On
    the other hand, we have some invariants that have to hold, namely:

     a) element arriving as non-late MUST NOT be changed to late

     b) element arriving as late MIGHT be changed to non-late

     c) operator's output watermark MUST be less than input watermark
    at any time

    Properties a) and b) are somewhat natural requirements and
    property c) follows the fact, that it is impossible to exactly
    predict future.

    Now, having these three properties, would it be possible to:

     a) when pane contains both late and on-time elements, split the
    pane into two, containing only late and on-time elements

     b) calculate output timestamp of all panes using timestamp
    combiner (now pane contains only late or on time elements, so no
    timestamp combiner should be able to violate neither of
    properties a) or b))

     c) calculate when there is pane that contains only late
    elements, update watermark hold to min(current input watermark,
    window gc time) - so that the output watermark can progress up to
    input watermark (and not violate property c) above)

    I seems to me that what currently stands in the way is that

     a) panes are not split to late and non-late only (and this might
    be tricky, mostly for combining transforms)

     b) the watermark hold with late-only pane is set to window gc
    time (instead of adding the input watermark as well) - [1]

    With TimestampCombiner.LATEST and END_OF_WINDOW it seems that
    splitting the panes would not be required, as the timestamp
    combiner can only shift late elements forward (make use of
    property b)). TimestampCombiner.EARLIEST would probably require
    splitting the panes, which seems to solve the mentioned [BEAM-2262].

    WDYT?

    [1]
    
https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f

    On 5/29/20 5:01 PM, Reuven Lax wrote:
    This does seem non intuitive, though I'm not sure what the best
    approach is.

    The problem with using currentOutputWatermark as the output
    timestamp is that Beam does not define watermark advancement to
    be synchronous, and at least the Dataflow runner calculates
    watermarks completely independent of bundle processing. This
    means that the output watermark could advance immediately after
    being checked, which would cause the records output to be
    arbitrarily late. So for example, if allowedLateness is 10
    seconds, then this trigger will accept a record that is 5
    seconds late. However if currentOutputWaternark advances by 15
    seconds after checking it, then you would end up outputting a
    result that is 15 seconds late and therefore would be dropped.

    IMO it's most important that on-time elements are never turned
    into late. elements. However the above behavior also seems
    confusing to users.

    Worth noting that I don't think that the current behavior is
    that much better. If the output watermark is close to the end of
    the window, then I think the existing model can also cause this
    scenario to happen.

    Reuven

    On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi,

        what seems the most "surprising" to me is that we are using
        TimestampCombiners to actually do two (orthogonal) things:

         a) calculate a watermark hold for a window, so on-time
        elements emitted from a pane are not late in downstream
        processing

         b) calculate timestamp of elements in output pane

        These two follow a little different constraints - while in
        case a) it is not allowed to shift watermark "back in time"
        in case b) it seems OK to output data with timestamp lower
        than output watermark (what comes late, might leave late).
        So, while it seems OK to discard late elements for the sake
        of calculation output watermark, it seems wrong to discard
        them when calculating output timestamp. Maybe these two
        timestamps might be held in different states (the state will
        be held until GC time for accumulating panes and reset on
        discarding panes)?

        Jan

        On 5/28/20 5:02 PM, David Morávek wrote:
        Hi,

        I've came across "unexpected" model behaviour when dealing
        with late data and custom timestamp combiners. Let's take a
        following pipeline as an example:

        final PCollection<String> input = ...;
        input.apply(
              "GlobalWindows",
              Window.<String>into(new GlobalWindows())
                  .triggering(
        AfterWatermark.pastEndOfWindow()
                          .withEarlyFirings(
        AfterProcessingTime.pastFirstElementInPane()
        .plusDelayOf(Duration.standardSeconds(10))))
        .withTimestampCombiner(TimestampCombiner.LATEST)
        .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
                  .accumulatingFiredPanes())
          .apply("Aggregate", Count.perElement())

        The above pipeline emits updates with the latest input
        timestamp it has seen so far (from non-late elements). We
        write the output from this timestamp to kafka and read it
        from another pipeline.

        Problem comes when we need to handle late elements behind
        output watermark. In this case beam can not use combined
        timestamp and uses EOW timestamp instead. Unfortunately
        this results in downstream pipeline progressing it's input
        watermark to end of global window. Also if we would use
        fixed windows after this aggregation, it would yield
        unexpected results.

        There is no reasoning about this behaviour in the last
        section of lateness design doc
        <https://s.apache.org/beam-lateness> [1], so I'd like to
        open a discussion about what the expected result should be.

        My personal opinion is, that correct approach would be
        emitting late elements with currentOutputWatermark rather
        than EOW in case of EARLIEST and LATEST timestamp combiners.

        I've prepared a faling test case for ReduceFnRunner
        
<https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
        if anyone wants to play around with the issue.

        I also think that BEAM-2262
        <https://issues.apache.org/jira/browse/BEAM-2262> [2] may
        be related to this discussion.

        [1] https://s.apache.org/beam-lateness
        [2] https://issues.apache.org/jira/browse/BEAM-2262
        [3]
        
https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b

        Looking forward to hearing your thoughts.

        Thanks,
        D.

Reply via email to