See https://s.apache.org/beam-lateness for detailed rationale about where
the holds end up. It is a pretty massive read, but at this point I think
even the details there are relevant.

TL;DR is that the hold policy:

 - never makes on time data late
 - never makes non-droppable data droppable
 - never holds up downstream processing more than required

I don't believe there is another policy that achieves these.

Kenn

On Tue, Jun 2, 2020 at 6:57 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
>
> agree that for on-time elements, the hold has to respect the output
> timestamps. For already late elements, it should be possible to calculate
> the output timestamp independently. Currently, the output watermark is
> *always* the value of watermark hold, which might be inappropriate for
> cases when the hold is explicitly cleared (because input is late and the
> hold is cleared in order not to hold downstream processing). My proposal
> would be to update the hold to input watermark (or more precisely to result
> of TimestampCombiner.assign(window, inputWatermark)), which seems to work,
> although it might hold watermark in cases when watermark updates even when
> no data arrives (I'm not sure how exactly to solve this). Another way
> around would be to clearly separate output timestamp (store it in different
> state than the watermark hold), because the output watermark should be
> cleared on different condition (hold clears after every firing to enable
> watemark progress, while output timestamp can be kept in case of
> accumulating panes).
>
> Jan
> On 6/2/20 1:42 AM, Kenneth Knowles wrote:
>
> Quick reply about one top-level thing: output timestamps and watermark
> holds are closely related. A hold is precisely reserving the right to
> output at a particular time. The part that is unintuitive is that these are
> ever different. That is, really, a hack to allow downstream processing to
> proceed more quickly when input is already late. I wonder if there is
> another way to set up the requirements so that they are always the same
> value, but the downstream on time processing is not held up by late data.
>
>
>
> On Sun, May 31, 2020 at 3:44 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Minor self-correction - in the property c) it MIGHT be possible to update
>> output watermark to time greater than input watermark, _as long as any
>> future element cannot be assigned timestamp that is less than the output
>> watermark_. That seems to be the case only for
>> TimestampCombiner.END_OF_WINDOW, as that actually does not depend on
>> timestamps of the actual elements. This doesn't quite change the reasoning
>> below, it might be possible to not store input watermark to watermark hold
>> for this combiner, although it would probably have negligible practical
>> impact.
>> On 5/31/20 12:17 PM, Jan Lukavský wrote:
>>
>> Hi Reuven,
>>
>> the asynchronicity of watermark update is what I was missing - it is what
>> relates watermarkhold with element output timestamp. On the other hand, we
>> have some invariants that have to hold, namely:
>>
>>  a) element arriving as non-late MUST NOT be changed to late
>>
>>  b) element arriving as late MIGHT be changed to non-late
>>
>>  c) operator's output watermark MUST be less than input watermark at any
>> time
>>
>> Properties a) and b) are somewhat natural requirements and property c)
>> follows the fact, that it is impossible to exactly predict future.
>>
>> Now, having these three properties, would it be possible to:
>>
>>  a) when pane contains both late and on-time elements, split the pane
>> into two, containing only late and on-time elements
>>
>>  b) calculate output timestamp of all panes using timestamp combiner (now
>> pane contains only late or on time elements, so no timestamp combiner
>> should be able to violate neither of properties a) or b))
>>
>>  c) calculate when there is pane that contains only late elements, update
>> watermark hold to min(current input watermark, window gc time) - so that
>> the output watermark can progress up to input watermark (and not violate
>> property c) above)
>>
>> I seems to me that what currently stands in the way is that
>>
>>  a) panes are not split to late and non-late only (and this might be
>> tricky, mostly for combining transforms)
>>
>>  b) the watermark hold with late-only pane is set to window gc time
>> (instead of adding the input watermark as well) - [1]
>>
>> With TimestampCombiner.LATEST and END_OF_WINDOW it seems that splitting
>> the panes would not be required, as the timestamp combiner can only shift
>> late elements forward (make use of property b)). TimestampCombiner.EARLIEST
>> would probably require splitting the panes, which seems to solve the
>> mentioned [BEAM-2262].
>>
>> WDYT?
>>
>> [1]
>> https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f
>> On 5/29/20 5:01 PM, Reuven Lax wrote:
>>
>> This does seem non intuitive, though I'm not sure what the best approach
>> is.
>>
>> The problem with using currentOutputWatermark as the output timestamp is
>> that Beam does not define watermark advancement to be synchronous, and at
>> least the Dataflow runner calculates watermarks completely independent of
>> bundle processing. This means that the output watermark could advance
>> immediately after being checked, which would cause the records output to be
>> arbitrarily late. So for example, if allowedLateness is 10 seconds, then
>> this trigger will accept a record that is 5 seconds late. However if
>> currentOutputWaternark advances by 15 seconds after checking it, then you
>> would end up outputting a result that is 15 seconds late and therefore
>> would be dropped.
>>
>> IMO it's most important that on-time elements are never turned into late.
>> elements. However the above behavior also seems confusing to users.
>>
>> Worth noting that I don't think that the current behavior is that much
>> better. If the output watermark is close to the end of the window, then I
>> think the existing model can also cause this scenario to happen.
>>
>> Reuven
>>
>> On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> what seems the most "surprising" to me is that we are using
>>> TimestampCombiners to actually do two (orthogonal) things:
>>>
>>>  a) calculate a watermark hold for a window, so on-time elements emitted
>>> from a pane are not late in downstream processing
>>>
>>>  b) calculate timestamp of elements in output pane
>>>
>>> These two follow a little different constraints - while in case a) it is
>>> not allowed to shift watermark "back in time" in case b) it seems OK to
>>> output data with timestamp lower than output watermark (what comes late,
>>> might leave late). So, while it seems OK to discard late elements for the
>>> sake of calculation output watermark, it seems wrong to discard them when
>>> calculating output timestamp. Maybe these two timestamps might be held in
>>> different states (the state will be held until GC time for accumulating
>>> panes and reset on discarding panes)?
>>>
>>> Jan
>>> On 5/28/20 5:02 PM, David Morávek wrote:
>>>
>>> Hi,
>>>
>>> I've came across "unexpected" model behaviour when dealing with late
>>> data and custom timestamp combiners. Let's take a following pipeline as an
>>> example:
>>>
>>> final PCollection<String> input = ...;
>>> input.apply(
>>>       "GlobalWindows",
>>>       Window.<String>into(new GlobalWindows())
>>>           .triggering(
>>>               AfterWatermark.pastEndOfWindow()
>>>                   .withEarlyFirings(
>>>                       AfterProcessingTime.pastFirstElementInPane()
>>>                           .plusDelayOf(Duration.standardSeconds(10))))
>>>           .withTimestampCombiner(TimestampCombiner.LATEST)
>>>           .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>>>           .accumulatingFiredPanes())
>>>   .apply("Aggregate", Count.perElement())
>>>
>>> The above pipeline emits updates with the latest input timestamp it has
>>> seen so far (from non-late elements). We write the output from this
>>> timestamp to kafka and read it from another pipeline.
>>>
>>> Problem comes when we need to handle late elements behind output
>>> watermark. In this case beam can not use combined timestamp and uses EOW
>>> timestamp instead. Unfortunately this results in downstream pipeline
>>> progressing it's input watermark to end of global window. Also if we would
>>> use fixed windows after this aggregation, it would yield unexpected results.
>>>
>>> There is no reasoning about this behaviour in the last section of lateness
>>> design doc <https://s.apache.org/beam-lateness> [1], so I'd like to
>>> open a discussion about what the expected result should be.
>>>
>>> My personal opinion is, that correct approach would be emitting late
>>> elements with currentOutputWatermark rather than EOW in case of EARLIEST
>>> and LATEST timestamp combiners.
>>>
>>> I've prepared a faling test case for ReduceFnRunner
>>> <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
>>> if anyone wants to play around with the issue.
>>>
>>> I also think that BEAM-2262
>>> <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to
>>> this discussion.
>>>
>>> [1] https://s.apache.org/beam-lateness
>>> [2] https://issues.apache.org/jira/browse/BEAM-2262
>>> [3]
>>> https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>>>
>>> Looking forward to hearing your thoughts.
>>>
>>> Thanks,
>>> D.
>>>
>>>

Reply via email to