Ignore processing time timers in expired windows
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/951f3cab Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/951f3cab Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/951f3cab Branch: refs/heads/DSL_SQL Commit: 951f3cab3f6558524ee1146e0e3f347bcd02ecda Parents: c167d10 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 18:09:11 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:00 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnRunner.java | 10 ++++++ .../beam/runners/core/ReduceFnRunnerTest.java | 32 ++++++++++++++++++++ 2 files changed, 42 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index ef33bef..0632c05 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -693,6 +693,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { @SuppressWarnings("unchecked") WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace(); W window = windowNamespace.getWindow(); + + if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) { + continue; + } + ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(window, StateStyle.DIRECT); ReduceFn<K, InputT, OutputT, W>.Context renamedContext = @@ -1090,4 +1095,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } + private boolean windowIsExpired(BoundedWindow w) { + return timerInternals + .currentInputWatermarkTime() + .isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness())); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 3a2c220..79ee91b 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -286,6 +286,38 @@ public class ReduceFnRunnerTest { /** * Tests that when a processing time timer comes in after a window is expired + * it is just ignored. + */ + @Test + public void testLateProcessingTimeTimer() throws Exception { + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.ZERO) + .withTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceProcessingTime(new Instant(5000)); + injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 + injectElement(tester, 5); + + // After this advancement, the window is expired and only the GC process + // should be allowed to touch it + tester.advanceInputWatermarkNoTimers(new Instant(100)); + + // This should not output + tester.advanceProcessingTime(new Instant(6000)); + + assertThat(tester.extractOutput(), emptyIterable()); + } + + /** + * Tests that when a processing time timer comes in after a window is expired * but in the same bundle it does not cause a spurious output. */ @Test
