Do not GC windows based on processing time timer!

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50c43d96
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50c43d96
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50c43d96

Branch: refs/heads/gearpump-runner
Commit: 50c43d96adb8c2523cf38c09f32e241eacc47823
Parents: 412fd7e
Author: Kenneth Knowles <[email protected]>
Authored: Thu Jun 22 12:56:34 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunner.java       |  3 +-
 .../beam/runners/core/ReduceFnRunnerTest.java   | 35 +++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index b5c3e3e..75b6acd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -663,7 +663,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
           && timer.getTimestamp().equals(window.maxTimestamp());
       Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
-      this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
+      this.isGarbageCollection =
+          TimeDomain.EVENT_TIME == timer.getDomain() && 
!timer.getTimestamp().isBefore(cleanupTime);
     }
 
     // Has this window had its trigger finish?

http://git-wip-us.apache.org/repos/asf/beam/blob/50c43d96/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 9e71300..2b66162 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -140,7 +140,40 @@ public class ReduceFnRunnerTest {
       }
     })
     .when(mockTrigger).onFire(anyTriggerContext());
- }
+  }
+
+  /**
+   * Tests that a processing time timer does not cause window GC.
+   */
+  @Test
+  public void testProcessingTimeTimerDoesNotGc() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) 
FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceProcessingTime(new Instant(10000));
+
+    tester.assertHasOnlyGlobalAndStateFor(
+        new IntervalWindow(new Instant(0), new Instant(100)));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, 
Timing.EARLY, 0, 0))));
+  }
 
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {

Reply via email to