Do not GC windows based on processing time timer!
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50c43d96 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50c43d96 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50c43d96 Branch: refs/heads/gearpump-runner Commit: 50c43d96adb8c2523cf38c09f32e241eacc47823 Parents: 412fd7e Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 12:56:34 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:58:08 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnRunner.java | 3 +- .../beam/runners/core/ReduceFnRunnerTest.java | 35 +++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index b5c3e3e..75b6acd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -663,7 +663,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(window.maxTimestamp()); Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); - this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime); + this.isGarbageCollection = + TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore(cleanupTime); } // Has this window had its trigger finish? http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 9e71300..2b66162 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -140,7 +140,40 @@ public class ReduceFnRunnerTest { } }) .when(mockTrigger).onFire(anyTriggerContext()); - } + } + + /** + * Tests that a processing time timer does not cause window GC. + */ + @Test + public void testProcessingTimeTimerDoesNotGc() throws Exception { + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.ZERO) + .withTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceProcessingTime(new Instant(5000)); + injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 + injectElement(tester, 5); + + tester.advanceProcessingTime(new Instant(10000)); + + tester.assertHasOnlyGlobalAndStateFor( + new IntervalWindow(new Instant(0), new Instant(100))); + + assertThat( + tester.extractOutput(), + contains( + isSingleWindowedValue( + equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0, 0)))); + } @Test public void testOnElementBufferingDiscarding() throws Exception {
