Add tests for corner cases of processing time timers

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d2b384a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d2b384a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d2b384a2

Branch: refs/heads/gearpump-runner
Commit: d2b384a20dbb0213d0f63e74713a06d63bad8d39
Parents: fda589c
Author: Kenneth Knowles <[email protected]>
Authored: Thu Jun 22 13:05:42 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunnerTest.java   | 70 ++++++++++++++++++++
 .../beam/sdk/transforms/GroupByKeyTest.java     | 39 +++++++++++
 2 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d2b384a2/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index fa5ba8b..4f68038 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -284,6 +284,44 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * Tests that when a processing time timer comes in after a window is expired
+   * but in the same bundle it does not cause a spurious output.
+   */
+  @Test
+  public void testCombiningAccumulatingProcessingTime() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) 
FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceInputWatermarkNoTimers(new Instant(100));
+    tester.advanceProcessingTimeNoTimers(new Instant(5010));
+
+    // Fires the GC/EOW timer at the same time as the processing time timer.
+    tester.fireTimers(
+        new IntervalWindow(new Instant(0), new Instant(100)),
+        TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)),
+        TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010)));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, 
Timing.ON_TIME, 0, 0))));
+  }
+
+  /**
    * Tests that the garbage collection time for a fixed window does not 
overflow the end of time.
    */
   @Test
@@ -351,6 +389,38 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * Tests that when a processing time timers comes in after a window is 
expired
+   * and GC'd it does not cause a spurious output.
+   */
+  @Test
+  public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws 
Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of((WindowFn<?, IntervalWindow>) 
FixedWindows.of(Duration.millis(100)))
+            .withTimestampCombiner(TimestampCombiner.EARLIEST)
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+            .withAllowedLateness(Duration.ZERO)
+            .withTrigger(
+                Repeatedly.forever(
+                    
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+    tester.advanceProcessingTime(new Instant(5000));
+    injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+    injectElement(tester, 5);
+
+    tester.advanceInputWatermark(new Instant(100));
+    tester.advanceProcessingTime(new Instant(5011));
+
+    assertThat(
+        tester.extractOutput(),
+        contains(
+            isSingleWindowedValue(
+                equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, 
Timing.ON_TIME, 0, 0))));
+  }
+
+  /**
    * Tests that if end-of-window and GC timers come in together, that the pane 
is correctly
    * marked as final.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/d2b384a2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 171171f..4b5d5f5 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -45,14 +45,19 @@ import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.LargeKeys;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -184,6 +189,40 @@ public class GroupByKeyTest {
     p.run();
   }
 
+  /**
+   * Tests that when a processing time timers comes in after a window is 
expired it does not cause a
+   * spurious output.
+   */
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testCombiningAccumulatingProcessingTime() throws Exception {
+    PCollection<Integer> triggeredSums =
+        p.apply(
+                TestStream.create(VarIntCoder.of())
+                    .advanceWatermarkTo(new Instant(0))
+                    .addElements(
+                        TimestampedValue.of(2, new Instant(2)),
+                        TimestampedValue.of(5, new Instant(5)))
+                    .advanceWatermarkTo(new Instant(100))
+                    .advanceProcessingTime(Duration.millis(10))
+                    .advanceWatermarkToInfinity())
+            .apply(
+                Window.<Integer>into(FixedWindows.of(Duration.millis(100)))
+                    .withTimestampCombiner(TimestampCombiner.EARLIEST)
+                    .accumulatingFiredPanes()
+                    .withAllowedLateness(Duration.ZERO)
+                    .triggering(
+                        Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(Duration.millis(10)))))
+            .apply(Sum.integersGlobally().withoutDefaults());
+
+    PAssert.that(triggeredSums)
+        .containsInAnyOrder(7);
+
+    p.run();
+  }
+
   @Test
   public void testGroupByKeyNonDeterministic() throws Exception {
 

Reply via email to