[BEAM-80] Decide EARLY or ON_TIME based on input watermark This change is based on trigger specs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/968494f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/968494f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/968494f3 Branch: refs/heads/master Commit: 968494f3e427cc242d91ef6de25f5c7c408540dc Parents: f87f35b Author: Pei He <[email protected]> Authored: Wed Mar 2 16:28:43 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Wed Mar 2 19:01:09 2016 -0800 ---------------------------------------------------------------------- .../dataflow/sdk/util/PaneInfoTracker.java | 30 +++++++++---------- .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 31 ++++++++------------ .../dataflow/sdk/util/ReduceFnRunnerTest.java | 9 +++--- 3 files changed, 32 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java index 38499c2..a7818a3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java @@ -54,13 +54,11 @@ public class PaneInfoTracker { * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane * info includes the timing for the pane, who's calculation is quite subtle. * - * @param isEndOfWindow should be {@code true} only if the pane is being emitted - * because an end-of-window timer has fired and the trigger agreed we should fire. * @param isFinal should be {@code true} only if the triggering machinery can guarantee * no further firings for the */ - public ReadableState<PaneInfo> getNextPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, - final boolean isEndOfWindow, final boolean isFinal) { + public ReadableState<PaneInfo> getNextPaneInfo( + ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) { final Object key = context.key(); final ReadableState<PaneInfo> previousPaneFuture = context.state().access(PaneInfoTracker.PANE_INFO_TAG); @@ -76,7 +74,7 @@ public class PaneInfoTracker { @Override public PaneInfo read() { PaneInfo previousPane = previousPaneFuture.read(); - return describePane(key, windowMaxTimestamp, previousPane, isEndOfWindow, isFinal); + return describePane(key, windowMaxTimestamp, previousPane, isFinal); } }; } @@ -85,8 +83,8 @@ public class PaneInfoTracker { context.state().access(PANE_INFO_TAG).write(currentPane); } - private <W> PaneInfo describePane(Object key, Instant windowMaxTimestamp, PaneInfo previousPane, - boolean isEndOfWindow, boolean isFinal) { + private <W> PaneInfo describePane( + Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) { boolean isFirst = previousPane == null; Timing previousTiming = isFirst ? null : previousPane.getTiming(); long index = isFirst ? 0 : previousPane.getIndex() + 1; @@ -104,26 +102,28 @@ public class PaneInfoTracker { // if the output watermark is behind the end of the window. boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY; + // True is the input watermark hasn't passed the window's max timestamp. + boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp); + Timing timing; if (isLateForOutput || !onlyEarlyPanesSoFar) { // The output watermark has already passed the end of this window, or we have already // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must // consider this pane LATE. timing = Timing.LATE; - } else if (isEndOfWindow) { - // This is the unique ON_TIME firing for the window. - timing = Timing.ON_TIME; - } else { - // All other cases are EARLY. + } else if (isEarlyForInput) { + // This is an EARLY firing. timing = Timing.EARLY; nonSpeculativeIndex = -1; + } else { + // This is the unique ON_TIME firing for the window. + timing = Timing.ON_TIME; } WindowTracing.debug( "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; " - + "inputWatermark:{}; outputWatermark:{}; isEndOfWindow:{}; isLateForOutput:{}", - timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isEndOfWindow, - isLateForOutput); + + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}", + timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput); if (previousPane != null) { // Timing transitions should follow EARLY* ON_TIME? LATE* http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index fe5c474..1a009bb 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -289,7 +289,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { ReduceFn<K, InputT, OutputT, W>.Context renamedContext = contextFactory.base(mergedWindow, StateStyle.RENAMED); triggerRunner.prefetchShouldFire(mergedWindow, directContext.state()); - emitIfAppropriate(directContext, renamedContext, false/* isEndOfWindow */); + emitIfAppropriate(directContext, renamedContext); } // We're all done with merging and emitting elements so can compress the activeWindow state. @@ -532,14 +532,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window); } - // If this is an end-of-window timer then: - // 1. We need to set a GC timer - // 2. We need to let the PaneInfoTracker know that we are transitioning from early to late, - // and possibly emitting an on-time pane. - boolean isEndOfWindow = - TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); - // If this is a garbage collection timer then we should trigger and garbage collect the window. Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); boolean isGarbageCollection = @@ -556,7 +548,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. - onTrigger(directContext, renamedContext, isEndOfWindow, true/* isFinished */); + onTrigger(directContext, renamedContext, true/* isFinished */); } // Cleanup flavor B: Clear all the remaining state for this window since we'll never @@ -569,9 +561,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); if (windowIsActive) { - emitIfAppropriate(directContext, renamedContext, isEndOfWindow); + emitIfAppropriate(directContext, renamedContext); } + // If this is an end-of-window timer then, we need to set a GC timer + boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { // Since we are processing an on-time firing we should schedule the garbage collection // timer. (If getAllowedLateness is zero then the timer event will be considered a @@ -649,7 +644,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state. */ private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext, - ReduceFn<K, InputT, OutputT, W>.Context renamedContext, boolean isEndOfWindow) + ReduceFn<K, InputT, OutputT, W>.Context renamedContext) throws Exception { if (!triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { @@ -667,7 +662,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // Run onTrigger to produce the actual pane contents. // As a side effect it will clear all element holds, but not necessarily any // end-of-window or garbage collection holds. - onTrigger(directContext, renamedContext, isEndOfWindow, isFinished); + onTrigger(directContext, renamedContext, isFinished); // Now that we've triggered, the pane is empty. nonEmptyPanes.clearPane(renamedContext.state()); @@ -692,13 +687,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { /** * Do we need to emit a pane? */ - private boolean needToEmit( - boolean isEmpty, boolean isEndOfWindow, boolean isFinished, PaneInfo.Timing timing) { + private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) { if (!isEmpty) { // The pane has elements. return true; } - if (isEndOfWindow && timing == Timing.ON_TIME) { + if (timing == Timing.ON_TIME) { // This is the unique ON_TIME pane. return true; } @@ -715,14 +709,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { private void onTrigger( final ReduceFn<K, InputT, OutputT, W>.Context directContext, ReduceFn<K, InputT, OutputT, W>.Context renamedContext, - boolean isEndOfWindow, boolean isFinished) throws Exception { // Prefetch necessary states ReadableState<Instant> outputTimestampFuture = watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); ReadableState<PaneInfo> paneFuture = - paneInfoTracker.getNextPaneInfo(directContext, isEndOfWindow, isFinished).readLater(); + paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); ReadableState<Boolean> isEmptyFuture = nonEmptyPanes.isEmpty(renamedContext.state()).readLater(); @@ -735,7 +728,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { final Instant outputTimestamp = outputTimestampFuture.read(); // Only emit a pane if it has data or empty panes are observable. - if (needToEmit(isEmptyFuture.read(), isEndOfWindow, isFinished, pane.getTiming())) { + if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method. final List<W> windows = Collections.singletonList(directContext.window()); ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index c85b1ca..4fb3e37 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -479,19 +479,20 @@ public class ReduceFnRunnerTest { when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 3); assertThat(tester.extractOutput(), contains( - // This is late, because the trigger wasn't waiting for AfterWatermark - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 2, -1)))); + WindowMatchers.valueWithPaneInfo( + PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)))); when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 4); assertThat(tester.extractOutput(), contains( - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 3, -1)))); + WindowMatchers.valueWithPaneInfo( + PaneInfo.createPane(false, false, Timing.LATE, 3, 1)))); when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); triggerShouldFinish(mockTrigger); injectElement(tester, 5); assertThat(tester.extractOutput(), contains( - WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.EARLY, 4, -1)))); + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2)))); } @Test
