See https://s.apache.org/beam-lateness for detailed rationale about where the holds end up. It is a pretty massive read, but at this point I think even the details there are relevant.
TL;DR is that the hold policy: - never makes on time data late - never makes non-droppable data droppable - never holds up downstream processing more than required I don't believe there is another policy that achieves these. Kenn On Tue, Jun 2, 2020 at 6:57 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi Kenn, > > agree that for on-time elements, the hold has to respect the output > timestamps. For already late elements, it should be possible to calculate > the output timestamp independently. Currently, the output watermark is > *always* the value of watermark hold, which might be inappropriate for > cases when the hold is explicitly cleared (because input is late and the > hold is cleared in order not to hold downstream processing). My proposal > would be to update the hold to input watermark (or more precisely to result > of TimestampCombiner.assign(window, inputWatermark)), which seems to work, > although it might hold watermark in cases when watermark updates even when > no data arrives (I'm not sure how exactly to solve this). Another way > around would be to clearly separate output timestamp (store it in different > state than the watermark hold), because the output watermark should be > cleared on different condition (hold clears after every firing to enable > watemark progress, while output timestamp can be kept in case of > accumulating panes). > > Jan > On 6/2/20 1:42 AM, Kenneth Knowles wrote: > > Quick reply about one top-level thing: output timestamps and watermark > holds are closely related. A hold is precisely reserving the right to > output at a particular time. The part that is unintuitive is that these are > ever different. That is, really, a hack to allow downstream processing to > proceed more quickly when input is already late. I wonder if there is > another way to set up the requirements so that they are always the same > value, but the downstream on time processing is not held up by late data. > > > > On Sun, May 31, 2020 at 3:44 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Minor self-correction - in the property c) it MIGHT be possible to update >> output watermark to time greater than input watermark, _as long as any >> future element cannot be assigned timestamp that is less than the output >> watermark_. That seems to be the case only for >> TimestampCombiner.END_OF_WINDOW, as that actually does not depend on >> timestamps of the actual elements. This doesn't quite change the reasoning >> below, it might be possible to not store input watermark to watermark hold >> for this combiner, although it would probably have negligible practical >> impact. >> On 5/31/20 12:17 PM, Jan Lukavský wrote: >> >> Hi Reuven, >> >> the asynchronicity of watermark update is what I was missing - it is what >> relates watermarkhold with element output timestamp. On the other hand, we >> have some invariants that have to hold, namely: >> >> a) element arriving as non-late MUST NOT be changed to late >> >> b) element arriving as late MIGHT be changed to non-late >> >> c) operator's output watermark MUST be less than input watermark at any >> time >> >> Properties a) and b) are somewhat natural requirements and property c) >> follows the fact, that it is impossible to exactly predict future. >> >> Now, having these three properties, would it be possible to: >> >> a) when pane contains both late and on-time elements, split the pane >> into two, containing only late and on-time elements >> >> b) calculate output timestamp of all panes using timestamp combiner (now >> pane contains only late or on time elements, so no timestamp combiner >> should be able to violate neither of properties a) or b)) >> >> c) calculate when there is pane that contains only late elements, update >> watermark hold to min(current input watermark, window gc time) - so that >> the output watermark can progress up to input watermark (and not violate >> property c) above) >> >> I seems to me that what currently stands in the way is that >> >> a) panes are not split to late and non-late only (and this might be >> tricky, mostly for combining transforms) >> >> b) the watermark hold with late-only pane is set to window gc time >> (instead of adding the input watermark as well) - [1] >> >> With TimestampCombiner.LATEST and END_OF_WINDOW it seems that splitting >> the panes would not be required, as the timestamp combiner can only shift >> late elements forward (make use of property b)). TimestampCombiner.EARLIEST >> would probably require splitting the panes, which seems to solve the >> mentioned [BEAM-2262]. >> >> WDYT? >> >> [1] >> https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f >> On 5/29/20 5:01 PM, Reuven Lax wrote: >> >> This does seem non intuitive, though I'm not sure what the best approach >> is. >> >> The problem with using currentOutputWatermark as the output timestamp is >> that Beam does not define watermark advancement to be synchronous, and at >> least the Dataflow runner calculates watermarks completely independent of >> bundle processing. This means that the output watermark could advance >> immediately after being checked, which would cause the records output to be >> arbitrarily late. So for example, if allowedLateness is 10 seconds, then >> this trigger will accept a record that is 5 seconds late. However if >> currentOutputWaternark advances by 15 seconds after checking it, then you >> would end up outputting a result that is 15 seconds late and therefore >> would be dropped. >> >> IMO it's most important that on-time elements are never turned into late. >> elements. However the above behavior also seems confusing to users. >> >> Worth noting that I don't think that the current behavior is that much >> better. If the output watermark is close to the end of the window, then I >> think the existing model can also cause this scenario to happen. >> >> Reuven >> >> On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi, >>> >>> what seems the most "surprising" to me is that we are using >>> TimestampCombiners to actually do two (orthogonal) things: >>> >>> a) calculate a watermark hold for a window, so on-time elements emitted >>> from a pane are not late in downstream processing >>> >>> b) calculate timestamp of elements in output pane >>> >>> These two follow a little different constraints - while in case a) it is >>> not allowed to shift watermark "back in time" in case b) it seems OK to >>> output data with timestamp lower than output watermark (what comes late, >>> might leave late). So, while it seems OK to discard late elements for the >>> sake of calculation output watermark, it seems wrong to discard them when >>> calculating output timestamp. Maybe these two timestamps might be held in >>> different states (the state will be held until GC time for accumulating >>> panes and reset on discarding panes)? >>> >>> Jan >>> On 5/28/20 5:02 PM, David Morávek wrote: >>> >>> Hi, >>> >>> I've came across "unexpected" model behaviour when dealing with late >>> data and custom timestamp combiners. Let's take a following pipeline as an >>> example: >>> >>> final PCollection<String> input = ...; >>> input.apply( >>> "GlobalWindows", >>> Window.<String>into(new GlobalWindows()) >>> .triggering( >>> AfterWatermark.pastEndOfWindow() >>> .withEarlyFirings( >>> AfterProcessingTime.pastFirstElementInPane() >>> .plusDelayOf(Duration.standardSeconds(10)))) >>> .withTimestampCombiner(TimestampCombiner.LATEST) >>> .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) >>> .accumulatingFiredPanes()) >>> .apply("Aggregate", Count.perElement()) >>> >>> The above pipeline emits updates with the latest input timestamp it has >>> seen so far (from non-late elements). We write the output from this >>> timestamp to kafka and read it from another pipeline. >>> >>> Problem comes when we need to handle late elements behind output >>> watermark. In this case beam can not use combined timestamp and uses EOW >>> timestamp instead. Unfortunately this results in downstream pipeline >>> progressing it's input watermark to end of global window. Also if we would >>> use fixed windows after this aggregation, it would yield unexpected results. >>> >>> There is no reasoning about this behaviour in the last section of lateness >>> design doc <https://s.apache.org/beam-lateness> [1], so I'd like to >>> open a discussion about what the expected result should be. >>> >>> My personal opinion is, that correct approach would be emitting late >>> elements with currentOutputWatermark rather than EOW in case of EARLIEST >>> and LATEST timestamp combiners. >>> >>> I've prepared a faling test case for ReduceFnRunner >>> <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>, >>> if anyone wants to play around with the issue. >>> >>> I also think that BEAM-2262 >>> <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to >>> this discussion. >>> >>> [1] https://s.apache.org/beam-lateness >>> [2] https://issues.apache.org/jira/browse/BEAM-2262 >>> [3] >>> https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b >>> >>> Looking forward to hearing your thoughts. >>> >>> Thanks, >>> D. >>> >>>