Repository: beam Updated Branches: refs/heads/master 219b9caa1 -> a5254e730
ReduceFnRunner: Do not manage EOW hold or timer, set GC hold and timer always Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c994ca4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c994ca4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c994ca4 Branch: refs/heads/master Commit: 9c994ca44a161c13120bd44f1415ffec38c4b244 Parents: 296cba0 Author: Kenneth Knowles <[email protected]> Authored: Mon Oct 16 16:46:05 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Nov 13 15:03:21 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnRunner.java | 72 ++---- .../apache/beam/runners/core/WatermarkHold.java | 221 +++---------------- .../beam/runners/core/ReduceFnRunnerTest.java | 191 ++++++++++++---- 3 files changed, 197 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 634a2d1..bc1d0db 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -45,7 +45,6 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; @@ -596,28 +595,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED); nonEmptyPanes.recordContent(renamedContext.state()); - - // Make sure we've scheduled the end-of-window or garbage collection timer for this window. - Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext); + scheduleGarbageCollectionTimer(directContext); // Hold back progress of the output watermark until we have processed the pane this - // element will be included within. If the element is too late for that, place a hold at - // the end-of-window or garbage collection time to allow empty panes to contribute elements - // which won't be dropped due to lateness by a following computation (assuming the following - // computation uses the same allowed lateness value...) - @Nullable Instant hold = watermarkHold.addHolds(renamedContext); - - if (hold != null) { - // Assert that holds have a proximate timer. - boolean holdInWindow = !hold.isAfter(window.maxTimestamp()); - boolean timerInWindow = !timer.isAfter(window.maxTimestamp()); - checkState( - holdInWindow == timerInWindow, - "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s", - hold, - timer, - directContext.window()); - } + // element will be included within. If the element is later than the output watermark, the + // hold will be at GC time. + watermarkHold.addHolds(renamedContext); // Execute the reduceFn, which will buffer the value as appropriate reduceFn.processValue(renamedContext); @@ -1070,48 +1053,33 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } /** - * Make sure we'll eventually have a timer fire which will tell us to garbage collect - * the window state. For efficiency we may need to do this in two steps rather - * than one. Return the time at which the timer will fire. + * Schedule a timer to garbage collect the window. + * + * <p>The timer: * * <ul> - * <li>If allowedLateness is zero then we'll garbage collect at the end of the window. - * For simplicity we'll set our own timer for this situation even though an - * {@link AfterWatermark} trigger may have also set an end-of-window timer. - * ({@code setTimer} is idempotent.) - * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage - * collection time. However if the windows are large (eg hourly) and the allowedLateness is small - * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we - * instead set an end-of-window timer and then roll that forward to a garbage collection timer - * when it fires. We use the input watermark to distinguish those cases. + * <li>...must be fired strictly after the expiration of the window. + * <li>...should be as close to the expiration as possible, to have a timely output of + * remaining buffered data, and GC. * </ul> */ - private Instant scheduleEndOfWindowOrGarbageCollectionTimer( - ReduceFn<?, ?, ?, W>.Context directContext) { + private void scheduleGarbageCollectionTimer(ReduceFn<?, ?, ?, W>.Context directContext) { Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant endOfWindow = directContext.window().maxTimestamp(); - String which; - Instant timer; - if (endOfWindow.isBefore(inputWM)) { - timer = LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); - which = "garbage collection"; - } else { - timer = endOfWindow; - which = "end-of-window"; - } + Instant gcTime = + LateDataUtils.garbageCollectionTime(directContext.window(), windowingStrategy); WindowTracing.trace( - "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for " + "ReduceFnRunner.scheduleGarbageCollectionTimer: Scheduling at {} for " + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", - which, - timer, + gcTime, key, directContext.window(), inputWM, timerInternals.currentOutputWatermarkTime()); - checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Timer %s is beyond end-of-time", timer); - directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME); - return timer; + checkState( + !gcTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Timer %s is beyond end-of-time", + gcTime); + directContext.timers().setTimer(gcTime, TimeDomain.EVENT_TIME); } private void cancelEndOfWindowAndGarbageCollectionTimers( http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 8859bbb..9890826 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -25,9 +25,9 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.state.ReadableState; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; @@ -84,112 +84,22 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { /** * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp - * of the element in {@code context}. We allow the actual hold time to be shifted later by the - * {@link TimestampCombiner}, but no further than the end of the window. The hold will - * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold - * was placed, or {@literal null} if no hold was placed. + * of the element in {@code context}. * - * <p>In the following we'll write {@code E} to represent an element's timestamp after passing - * through the window strategy's output time function, {@code IWM} for the local input watermark, - * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection - * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right, - * and we write {@code [ ... ]} to denote a bounded window with implied lower bound. + * <p>The target time for the aggregated output is shifted by the {@link WindowFn} and combined + * with a {@link TimestampCombiner} to determine where the output watermark is held. * - * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness} - * is {@code ZERO}. + * <p>If the target time would be late, then we do not set this hold, but instead add the hold + * to allow a final output at GC time. * - * <p>Here are the cases we need to handle. They are conceptually considered in the - * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM. - * <ol> - * <li>(Normal) - * <pre> - * | - * [ | E ] - * | - * IWM - * </pre> - * This is, hopefully, the common and happy case. The element is locally on-time and can - * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer - * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's - * timestamp (depending on the output time function). Thus the OWM will not proceed past E - * until the next pane fires. - * - * <li>(Discard - no target window) - * <pre> - * | | - * [ E ] | | - * | | - * GCWM <-getAllowedLateness-> IWM - * </pre> - * The element is very locally late. The window has been garbage collected, thus there - * is no target pane E could be assigned to. We discard E. - * - * <li>(Unobservably late) - * <pre> - * | | - * [ | E | ] - * | | - * OWM IWM - * </pre> - * The element is locally late, however we can still treat this case as for 'Normal' above - * since the IWM has not yet passed the end of the window and the element is ahead of the - * OWM. In effect, we get to 'launder' the locally late element and consider it as locally - * on-time because no downstream computation can observe the difference. - * - * <li>(Maybe late 1) - * <pre> - * | | - * [ | E ] | - * | | - * OWM IWM - * </pre> - * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME} - * pane may have already been emitted. However, if timer firings have been delayed then it - * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element - * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find - * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's - * timestamp. We may however set a garbage collection hold if required. - * - * <li>(Maybe late 2) - * <pre> - * | | - * [ E | | ] - * | | - * OWM IWM - * </pre> - * The end-of-window timer has not yet fired, so this element may still appear in an - * {@code ON_TIME} pane. However the element is too late to contribute to the output - * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an - * end-of-window hold. - * - * <li>(Maybe late 3) - * <pre> - * | | - * [ E | ] | - * | | - * OWM IWM - * </pre> - * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer - * has already fired, or it is about to fire. We can place only the garbage collection hold, - * if required. - * - * <li>(Definitely late) - * <pre> - * | | - * [ E ] | | - * | | - * OWM IWM - * </pre> - * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to - * place an end-of-window hold. We can still place a garbage collection hold if required. - * - * </ol> + * <p>See https://s.apache.org/beam-lateness for the full design of how late data and watermarks + * interact. */ @Nullable public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) { Instant hold = addElementHold(context); if (hold == null) { - hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/); + hold = addGarbageCollectionHold(context, false /*paneIsEmpty*/); } return hold; } @@ -268,94 +178,22 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { } /** - * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required). - * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added. - */ - @Nullable - private Instant addEndOfWindowOrGarbageCollectionHolds( - ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { - Instant hold = addEndOfWindowHold(context, paneIsEmpty); - if (hold == null) { - hold = addGarbageCollectionHold(context, paneIsEmpty); - } - return hold; - } - - /** - * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added - * (ie the end of window time), or {@literal null} if no end of window hold is possible and we - * should fallback to a garbage collection hold. - * - * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner}) - * to clear it. In other words, the input watermark cannot be ahead of the end of window time. - * - * <p>An end-of-window hold is added in two situations: - * <ol> - * <li>An incoming element came in behind the output watermark (so we are too late for placing - * the usual element hold), but it may still be possible to include the element in an - * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will - * not be considered late by any downstream computation. - * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at - * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in - * a pane are processed due to a fired trigger we must set both an end of window timer and an end - * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered - * late by any downstream computation. - * </ol> - */ - @Nullable - private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant eowHold = context.window().maxTimestamp(); - - if (eowHold.isBefore(inputWM)) { - WindowTracing.trace( - "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for " - + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - eowHold, context.key(), context.window(), inputWM, outputWM); - return null; - } - - checkState(outputWM == null || !eowHold.isBefore(outputWM), - "End-of-window hold %s cannot be before output watermark %s", - eowHold, outputWM); - checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "End-of-window hold %s is beyond end-of-time", eowHold); - // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep - // the hold away from the combining function in elementHoldTag. - // However if !paneIsEmpty then it could make sense to use the elementHoldTag here. - // Alas, onMerge is forced to add an end of window or garbage collection hold without - // knowing whether an element hold is already in place (stopping to check is too expensive). - // This it would end up adding an element hold at the end of the window which could - // upset the elementHoldTag combining function. - context.state().access(EXTRA_HOLD_TAG).add(eowHold); - WindowTracing.trace( - "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - eowHold, context.key(), context.window(), inputWM, outputWM); - return eowHold; - } - - /** * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at - * which the hold was added (ie the end of window time plus allowed lateness), - * or {@literal null} if no hold was added. - * - * <p>We only add the hold if it is distinct from what would be added by - * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness} - * must be non-zero. + * which the hold was added (ie the end of window time plus allowed lateness), or {@literal null} + * if no hold was added. * * <p>A garbage collection hold is added in two situations: + * * <ol> - * <li>An incoming element came in behind the output watermark, and was too late for placing - * the usual element hold or an end of window hold. Place the garbage collection hold so that - * we can guarantee when the pane is finally triggered its output will not be dropped due to - * excessive lateness by any downstream computation. - * <li>The {@link WindowingStrategy#getClosingBehavior()} is - * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted - * for all windows which saw at least one element. Again, the garbage collection hold guarantees - * that any empty final pane can be given a timestamp which will not be considered beyond - * allowed lateness by any downstream computation. + * <li>An incoming element has a timestamp earlier than the output watermark, and was too late + * for placing the usual element hold or an end of window hold. Place the garbage collection + * hold so that we can guarantee when the pane is finally triggered its output will not be + * dropped due to excessive lateness by any downstream computation. + * <li>The {@link WindowingStrategy#getClosingBehavior()} is {@link + * ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted for all + * windows which saw at least one element. Again, the garbage collection hold guarantees + * that any empty final pane can be given a timestamp which will not be considered beyond + * allowed lateness by any downstream computation. * </ol> * * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2. @@ -367,12 +205,11 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { Instant inputWM = timerInternals.currentInputWatermarkTime(); Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), windowingStrategy); - if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) { + if (gcHold.isBefore(inputWM)) { WindowTracing.trace( - "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " - + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - gcHold, context.key(), context.window(), inputWM, outputWM); + "{}.addGarbageCollectionHold: gc hold would be before the input watermark " + + "for key:{}; window: {}; inputWatermark: {}; outputWatermark: {}", + getClass().getSimpleName(), context.key(), context.window(), inputWM, outputWM); return null; } @@ -432,7 +269,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { // the hold depends on the min of the element timestamps. // At least one merged window must be non-empty for the merge to have been triggered. StateMerging.clear(context.state(), EXTRA_HOLD_TAG); - addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/); + addGarbageCollectionHold(context, false /*paneIsEmpty*/); } /** @@ -497,7 +334,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { oldHold = extraHold; } if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { - // If no hold (eg because all elements came in behind the output watermark), or + // If no hold (eg because all elements came in before the output watermark), or // the hold was for garbage collection, take the end of window as the result. WindowTracing.debug( "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " @@ -514,9 +351,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { @Nullable Instant newHold = null; if (!isFinished) { - // Only need to leave behind an end-of-window or garbage collection hold - // if future elements will be processed. - newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/); + newHold = addGarbageCollectionHold(context, true /*paneIsEmpty*/); } return new OldAndNewHolds(oldHold, newHold); http://git-wip-us.apache.org/repos/asf/beam/blob/9c994ca4/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 2341502..982e934 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -84,7 +85,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -670,9 +670,9 @@ public class ReduceFnRunnerTest { TestOptions options = PipelineOptionsFactory.as(TestOptions.class); options.setValue(expectedValue); - when(mockSideInputReader.contains(Matchers.<PCollectionView<Integer>>any())).thenReturn(true); - when(mockSideInputReader.get( - Matchers.<PCollectionView<Integer>>any(), any(BoundedWindow.class))) + when(mockSideInputReader.contains(org.mockito.Matchers.any(PCollectionView.class))) + .thenReturn(true); + when(mockSideInputReader.get(any(PCollectionView.class), any(BoundedWindow.class))) .then( new Answer<Integer>() { @Override @@ -721,9 +721,13 @@ public class ReduceFnRunnerTest { MetricsContainerImpl container = new MetricsContainerImpl("any"); MetricsEnvironment.setCurrentContainer(container); // Test handling of late data. Specifically, ensure the watermark hold is correct. + Duration allowedLateness = Duration.millis(10); ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, - AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10), + ReduceFnTester.nonCombining( + FixedWindows.of(Duration.millis(10)), + mockTriggerStateMachine, + AccumulationMode.ACCUMULATING_FIRED_PANES, + allowedLateness, ClosingBehavior.FIRE_IF_NON_EMPTY); // Input watermark -> null @@ -731,22 +735,27 @@ public class ReduceFnRunnerTest { assertEquals(null, tester.getOutputWatermark()); // All on time data, verify watermark hold. + IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10)); injectElement(tester, 1); injectElement(tester, 3); assertEquals(new Instant(1), tester.getWatermarkHold()); when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 2); List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); - assertThat(output, contains( - isSingleWindowedValue(containsInAnyOrder(1, 2, 3), - 1, // timestamp - 0, // window start - 10))); // window end + assertThat( + output, + contains( + isSingleWindowedValue( + containsInAnyOrder(1, 2, 3), + equalTo(new Instant(1)), + equalTo((BoundedWindow) expectedWindow)))); assertThat(output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); - // Holding for the end-of-window transition. - assertEquals(new Instant(9), tester.getWatermarkHold()); + // There is no end-of-window hold, but the timer set by the trigger holds the watermark + assertThat( + tester.getWatermarkHold(), nullValue()); + // Nothing dropped. long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, @@ -763,9 +772,16 @@ public class ReduceFnRunnerTest { tester.advanceInputWatermark(new Instant(4)); injectElement(tester, 2); injectElement(tester, 3); - assertEquals(new Instant(9), tester.getWatermarkHold()); + + // Late data has arrived behind the _output_ watermark. The ReduceFnRunner sets a GC hold + // since this data is not permitted to hold up the output watermark. + assertThat( + tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness))); + + // Now data just ahead of the output watermark arrives and sets an earlier "element" hold injectElement(tester, 5); assertEquals(new Instant(5), tester.getWatermarkHold()); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 4); output = tester.extractOutput(); @@ -780,17 +796,29 @@ public class ReduceFnRunnerTest { assertThat(output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))); - // All late -- output at end of window timestamp. + // Since the element hold is cleared, there is no hold remaining + assertThat(tester.getWatermarkHold(), nullValue()); + + // All behind the output watermark -- hold is at GC time (if we imagine the + // trigger sets a timer for ON_TIME firing, that is actually when they'll be emitted) when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); tester.advanceInputWatermark(new Instant(8)); injectElement(tester, 6); injectElement(tester, 5); - assertEquals(new Instant(9), tester.getWatermarkHold()); + assertThat( + tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness))); + injectElement(tester, 4); // Fire the ON_TIME pane when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); - tester.advanceInputWatermark(new Instant(10)); + + // To get an ON_TIME pane, we need the output watermark to be held back a little; this would + // be done by way of the timers set by the trigger, which are mocked here + tester.setAutoAdvanceOutputWatermark(false); + + tester.advanceInputWatermark(expectedWindow.maxTimestamp().plus(1)); + tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); // Output time is end of the window, because all the new data was late, but the pane // is the ON_TIME pane. @@ -806,6 +834,8 @@ public class ReduceFnRunnerTest { assertThat(output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))); + tester.setAutoAdvanceOutputWatermark(true); + // This is "pending" at the time the watermark makes it way-late. // Because we're about to expire the window, we output it. when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); @@ -845,26 +875,32 @@ public class ReduceFnRunnerTest { tester.assertHasOnlyGlobalAndFinishedSetsFor(); } + /** Make sure that if data comes in too late to make it on time, the hold is the GC time. */ @Test public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception { - // Make sure holds are only set if they are accompanied by an end-of-window timer. ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, - AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10), + ReduceFnTester.nonCombining( + FixedWindows.of(Duration.millis(10)), + mockTriggerStateMachine, + AccumulationMode.ACCUMULATING_FIRED_PANES, + Duration.millis(10), ClosingBehavior.FIRE_ALWAYS); tester.setAutoAdvanceOutputWatermark(false); - // Case: Unobservably late + // Case: Unobservably "late" relative to input watermark, but on time for output watermark tester.advanceInputWatermark(new Instant(15)); tester.advanceOutputWatermark(new Instant(11)); + + IntervalWindow expectedWindow = new IntervalWindow(new Instant(10), new Instant(20)); injectElement(tester, 14); // Hold was applied, waiting for end-of-window timer. assertEquals(new Instant(14), tester.getWatermarkHold()); - assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME)); - // Trigger the end-of-window timer. + // Trigger the end-of-window timer, fire a timer as though the mock trigger set it when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); tester.advanceInputWatermark(new Instant(20)); + tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); // Hold has been replaced with garbage collection hold. Waiting for garbage collection. assertEquals(new Instant(29), tester.getWatermarkHold()); @@ -887,10 +923,15 @@ public class ReduceFnRunnerTest { @Test public void testPaneInfoAllStates() throws Exception { ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, - AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), + ReduceFnTester.nonCombining( + FixedWindows.of(Duration.millis(10)), + mockTriggerStateMachine, + AccumulationMode.DISCARDING_FIRED_PANES, + Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceInputWatermark(new Instant(0)); when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 1); @@ -903,7 +944,9 @@ public class ReduceFnRunnerTest { WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)))); when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); + tester.setAutoAdvanceOutputWatermark(false); tester.advanceInputWatermark(new Instant(15)); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 3); assertThat(tester.extractOutput(), contains( @@ -911,6 +954,7 @@ public class ReduceFnRunnerTest { PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)))); when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + tester.setAutoAdvanceOutputWatermark(true); injectElement(tester, 4); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo( @@ -1032,6 +1076,54 @@ public class ReduceFnRunnerTest { WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); } + /** + * If the trigger does not care about the watermark, the ReduceFnRunner should still emit an + * element for the ON_TIME pane. + */ + @Test + public void testNoWatermarkTriggerNoHold() throws Exception { + Duration allowedLateness = Duration.standardDays(1); + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = + ReduceFnTester.nonCombining( + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardSeconds(5)))) + .withAllowedLateness(allowedLateness)); + + // First, an element comes in on time in [0, 10) but ReduceFnRunner should + // not set a hold or timer for 9. That is the trigger's job. + IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10)); + tester.advanceInputWatermark(new Instant(0)); + tester.advanceProcessingTime(new Instant(0)); + + tester.injectElements(TimestampedValue.of(1, new Instant(1))); + + // Since some data arrived, the element hold will be the end of the window. + assertThat(tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp())); + + tester.advanceProcessingTime(new Instant(6000)); + + // Sanity check; we aren't trying to verify output in this test + assertThat(tester.getOutputSize(), equalTo(1)); + + // Since we did not request empty final panes, no hold + assertThat(tester.getWatermarkHold(), nullValue()); + + // So when the input watermark advanced, the output advances with it (automated by tester) + tester.advanceInputWatermark( + new Instant(expectedWindow.maxTimestamp().plus(Duration.standardHours(1)))); + + // Now late data arrives + tester.injectElements(TimestampedValue.of(3, new Instant(3))); + + // The ReduceFnRunner should set a GC hold since the element was too late and its timestamp + // will be ignored for the purposes of the watermark hold + assertThat( + tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness))); + } + @Test public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining( @@ -1216,32 +1308,39 @@ public class ReduceFnRunnerTest { */ @Test public void testMergingWithCloseTrigger() throws Exception { + Duration allowedLateness = Duration.millis(50); ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + ReduceFnTester.nonCombining( + Sessions.withGapDuration(Duration.millis(10)), mockTriggerStateMachine, - AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), - ClosingBehavior.FIRE_IF_NON_EMPTY); + AccumulationMode.DISCARDING_FIRED_PANES, + allowedLateness, + ClosingBehavior.FIRE_IF_NON_EMPTY); // Create a new merged session window. - tester.injectElements(TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(2, new Instant(2))); + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12)); + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); // Force the trigger to be closed for the merged window. when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); triggerShouldFinish(mockTriggerStateMachine); + + // Fire and end-of-window timer as though the trigger set it tester.advanceInputWatermark(new Instant(13)); + tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); // Trigger is now closed. - assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12)))); + assertTrue(tester.isMarkedFinished(mergedWindow)); when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); // Revisit the same session window. - tester.injectElements(TimestampedValue.of(1, new Instant(1)), - TimestampedValue.of(2, new Instant(2))); + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); // Trigger is still closed. - assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12)))); + assertTrue(tester.isMarkedFinished(mergedWindow)); } /** @@ -1250,11 +1349,16 @@ public class ReduceFnRunnerTest { */ @Test public void testMergingWithReusedWindow() throws Exception { + Duration allowedLateness = Duration.millis(50); ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + ReduceFnTester.nonCombining( + Sessions.withGapDuration(Duration.millis(10)), mockTriggerStateMachine, - AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), - ClosingBehavior.FIRE_IF_NON_EMPTY); + AccumulationMode.DISCARDING_FIRED_PANES, + allowedLateness, + ClosingBehavior.FIRE_IF_NON_EMPTY); + + IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(11)); // One elements in one session window. tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. @@ -1263,6 +1367,7 @@ public class ReduceFnRunnerTest { when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); triggerShouldFinish(mockTriggerStateMachine); tester.advanceInputWatermark(new Instant(15)); + tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); // Another element in the same session window. // Should be discarded with 'window closed'. @@ -1276,11 +1381,13 @@ public class ReduceFnRunnerTest { List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); assertThat(output.size(), equalTo(1)); - assertThat(output.get(0), - isSingleWindowedValue(containsInAnyOrder(1), - 1, // timestamp - 1, // window start - 11)); // window end + assertThat( + output.get(0), + isSingleWindowedValue( + containsInAnyOrder(1), + equalTo(new Instant(1)), // timestamp + equalTo((BoundedWindow) mergedWindow))); + assertThat( output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
