Process timer firings for a window together
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/935c0773 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/935c0773 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/935c0773 Branch: refs/heads/gearpump-runner Commit: 935c077341de580dddd4b29ffee3926795acf403 Parents: bd631b8 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 18:43:39 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jul 6 14:12:39 2017 -0700 ---------------------------------------------------------------------- .../examples/complete/game/LeaderBoardTest.java | 2 + .../beam/runners/core/ReduceFnRunner.java | 98 +++++++++++++------- .../beam/runners/core/ReduceFnRunnerTest.java | 49 +++++++++- 3 files changed, 115 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java index 745c210..611e2b3 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java @@ -276,6 +276,8 @@ public class LeaderBoardTest implements Serializable { .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), event(TestUser.BLUE_TWO, 3, Duration.ZERO), event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3))) + // Move the watermark to the end of the window to output on time + .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION)) // Move the watermark past the end of the allowed lateness plus the end of the window .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS) .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1))) http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 0632c05..634a2d1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -29,7 +29,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -638,11 +637,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } /** - * Enriches TimerData with state necessary for processing a timer as well as - * common queries about a timer. + * A descriptor of the activation for a window based on a timer. */ - private class EnrichedTimerData { - public final Instant timestamp; + private class WindowActivation { public final ReduceFn<K, InputT, OutputT, W>.Context directContext; public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext; // If this is an end-of-window timer then we may need to set a garbage collection timer @@ -653,19 +650,34 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // end-of-window time to be a signal to garbage collect. public final boolean isGarbageCollection; - EnrichedTimerData( - TimerData timer, + WindowActivation( ReduceFn<K, InputT, OutputT, W>.Context directContext, ReduceFn<K, InputT, OutputT, W>.Context renamedContext) { - this.timestamp = timer.getTimestamp(); this.directContext = directContext; this.renamedContext = renamedContext; W window = directContext.window(); - this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() - && timer.getTimestamp().equals(window.maxTimestamp()); - Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); + + // The output watermark is before the end of the window if it is either unknown + // or it is known to be before it. If it is unknown, that means that there hasn't been + // enough data to advance it. + boolean outputWatermarkBeforeEOW = + timerInternals.currentOutputWatermarkTime() == null + || !timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp()); + + // The "end of the window" is reached when the local input watermark (for this key) surpasses + // it but the local output watermark (also for this key) has not. After data is emitted and + // the output watermark hold is released, the output watermark on this key will immediately + // exceed the end of the window (otherwise we could see multiple ON_TIME outputs) + this.isEndOfWindow = + timerInternals.currentInputWatermarkTime().isAfter(window.maxTimestamp()) + && outputWatermarkBeforeEOW; + + // The "GC time" is reached when the input watermark surpasses the end of the window + // plus allowed lateness. After this, the window is expired and expunged. this.isGarbageCollection = - TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore(cleanupTime); + timerInternals + .currentInputWatermarkTime() + .isAfter(LateDataUtils.garbageCollectionTime(window, windowingStrategy)); } // Has this window had its trigger finish? @@ -684,9 +696,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { return; } - // Create a reusable context for each timer and begin prefetching necessary + // Create a reusable context for each window and begin prefetching necessary // state. - List<EnrichedTimerData> enrichedTimers = new LinkedList(); + Map<BoundedWindow, WindowActivation> windowActivations = new HashMap(); + for (TimerData timer : timers) { checkArgument(timer.getNamespace() instanceof WindowNamespace, "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); @@ -694,7 +707,24 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace(); W window = windowNamespace.getWindow(); - if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) { + WindowTracing.debug("{}: Received timer key:{}; window:{}; data:{} with " + + "inputWatermark:{}; outputWatermark:{}", + ReduceFnRunner.class.getSimpleName(), + key, window, timer, + timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + + // Processing time timers for an expired window are ignored, just like elements + // that show up too late. Window GC is management by an event time timer + if (TimeDomain.EVENT_TIME != timer.getDomain() && windowIsExpired(window)) { + continue; + } + + // How a window is processed is a function only of the current state, not the details + // of the timer. This makes us robust to large leaps in processing time and watermark + // time, where both EOW and GC timers come in together and we need to GC and emit + // the final pane. + if (windowActivations.containsKey(window)) { continue; } @@ -702,11 +732,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { contextFactory.base(window, StateStyle.DIRECT); ReduceFn<K, InputT, OutputT, W>.Context renamedContext = contextFactory.base(window, StateStyle.RENAMED); - EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext); - enrichedTimers.add(enrichedTimer); + WindowActivation windowActivation = new WindowActivation(directContext, renamedContext); + windowActivations.put(window, windowActivation); // Perform prefetching of state to determine if the trigger should fire. - if (enrichedTimer.isGarbageCollection) { + if (windowActivation.isGarbageCollection) { triggerRunner.prefetchIsClosed(directContext.state()); } else { triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); @@ -714,7 +744,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } // For those windows that are active and open, prefetch the triggering or emitting state. - for (EnrichedTimerData timer : enrichedTimers) { + for (WindowActivation timer : windowActivations.values()) { if (timer.windowIsActiveAndOpen()) { ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext; if (timer.isGarbageCollection) { @@ -727,25 +757,27 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } // Perform processing now that everything is prefetched. - for (EnrichedTimerData timer : enrichedTimers) { - ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext; - ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext; + for (WindowActivation windowActivation : windowActivations.values()) { + ReduceFn<K, InputT, OutputT, W>.Context directContext = windowActivation.directContext; + ReduceFn<K, InputT, OutputT, W>.Context renamedContext = windowActivation.renamedContext; - if (timer.isGarbageCollection) { - WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " - + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), timer.timestamp, + if (windowActivation.isGarbageCollection) { + WindowTracing.debug( + "{}: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}", + ReduceFnRunner.class.getSimpleName(), + key, + directContext.window(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen(); + boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen(); if (windowIsActiveAndOpen) { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. @Nullable Instant newHold = onTrigger( - directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow); + directContext, renamedContext, true /* isFinished */, windowActivation.isEndOfWindow); checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); } @@ -753,18 +785,20 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // see elements for it again. clearAllState(directContext, renamedContext, windowIsActiveAndOpen); } else { - WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + WindowTracing.debug( + "{}.onTimers: Triggering for key:{}; window:{} at {} with " + "inputWatermark:{}; outputWatermark:{}", - key, directContext.window(), timer.timestamp, + key, + directContext.window(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - if (timer.windowIsActiveAndOpen() + if (windowActivation.windowIsActiveAndOpen() && triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { emit(directContext, renamedContext); } - if (timer.isEndOfWindow) { + if (windowActivation.isEndOfWindow) { // If the window strategy trigger includes a watermark trigger then at this point // there should be no data holds, either because we'd already cleared them on an // earlier onTrigger, or because we just cleared them on the above emit. http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 79ee91b..4f13af1 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -79,7 +80,6 @@ import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -246,6 +246,52 @@ public class ReduceFnRunnerTest { tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); } + /** + * Tests that with the default trigger we will not produce two ON_TIME panes, even + * if there are two outputs that are both candidates. + */ + @Test + public void testOnlyOneOnTimePane() throws Exception { + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) + .withTrigger(DefaultTrigger.of()) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceInputWatermark(new Instant(0)); + + int value1 = 1; + int value2 = 3; + + // A single element that should be in the ON_TIME output + tester.injectElements( + TimestampedValue.of(value1, new Instant(1))); + + // Should fire ON_TIME + tester.advanceInputWatermark(new Instant(10)); + + // The DefaultTrigger should cause output labeled LATE, even though it does not have to be + // labeled as such. + tester.injectElements( + TimestampedValue.of(value2, new Instant(3))); + + List<WindowedValue<Integer>> output = tester.extractOutput(); + assertEquals(2, output.size()); + + assertThat(output.get(0), WindowMatchers.isWindowedValue(equalTo(value1))); + assertThat(output.get(1), WindowMatchers.isWindowedValue(equalTo(value1 + value2))); + + assertThat( + output.get(0), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0))); + assertThat( + output.get(1), + WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 1))); + } + @Test public void testOnElementCombiningDiscarding() throws Exception { // Test basic execution of a trigger using a non-combining window set and discarding mode. @@ -458,7 +504,6 @@ public class ReduceFnRunnerTest { * marked as final. */ @Test - @Ignore("https://issues.apache.org/jira/browse/BEAM-2505") public void testCombiningAccumulatingEventTime() throws Exception { WindowingStrategy<?, IntervalWindow> strategy = WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
