Add tests for corner cases of processing time timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d2b384a2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d2b384a2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d2b384a2 Branch: refs/heads/gearpump-runner Commit: d2b384a20dbb0213d0f63e74713a06d63bad8d39 Parents: fda589c Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 13:05:42 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:58:08 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnRunnerTest.java | 70 ++++++++++++++++++++ .../beam/sdk/transforms/GroupByKeyTest.java | 39 +++++++++++ 2 files changed, 109 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d2b384a2/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index fa5ba8b..4f68038 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -284,6 +284,44 @@ public class ReduceFnRunnerTest { } /** + * Tests that when a processing time timer comes in after a window is expired + * but in the same bundle it does not cause a spurious output. + */ + @Test + public void testCombiningAccumulatingProcessingTime() throws Exception { + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.ZERO) + .withTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceProcessingTime(new Instant(5000)); + injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 + injectElement(tester, 5); + + tester.advanceInputWatermarkNoTimers(new Instant(100)); + tester.advanceProcessingTimeNoTimers(new Instant(5010)); + + // Fires the GC/EOW timer at the same time as the processing time timer. + tester.fireTimers( + new IntervalWindow(new Instant(0), new Instant(100)), + TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)), + TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010))); + + assertThat( + tester.extractOutput(), + contains( + isSingleWindowedValue( + equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); + } + + /** * Tests that the garbage collection time for a fixed window does not overflow the end of time. */ @Test @@ -351,6 +389,38 @@ public class ReduceFnRunnerTest { } /** + * Tests that when a processing time timers comes in after a window is expired + * and GC'd it does not cause a spurious output. + */ + @Test + public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception { + WindowingStrategy<?, IntervalWindow> strategy = + WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.ZERO) + .withTrigger( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); + + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); + + tester.advanceProcessingTime(new Instant(5000)); + injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 + injectElement(tester, 5); + + tester.advanceInputWatermark(new Instant(100)); + tester.advanceProcessingTime(new Instant(5011)); + + assertThat( + tester.extractOutput(), + contains( + isSingleWindowedValue( + equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); + } + + /** * Tests that if end-of-window and GC timers come in together, that the pane is correctly * marked as final. */ http://git-wip-us.apache.org/repos/asf/beam/blob/d2b384a2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 171171f..4b5d5f5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -45,14 +45,19 @@ import org.apache.beam.sdk.coders.CoderProviders; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.LargeKeys; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; @@ -184,6 +189,40 @@ public class GroupByKeyTest { p.run(); } + /** + * Tests that when a processing time timers comes in after a window is expired it does not cause a + * spurious output. + */ + @Test + @Category({ValidatesRunner.class, UsesTestStream.class}) + public void testCombiningAccumulatingProcessingTime() throws Exception { + PCollection<Integer> triggeredSums = + p.apply( + TestStream.create(VarIntCoder.of()) + .advanceWatermarkTo(new Instant(0)) + .addElements( + TimestampedValue.of(2, new Instant(2)), + TimestampedValue.of(5, new Instant(5))) + .advanceWatermarkTo(new Instant(100)) + .advanceProcessingTime(Duration.millis(10)) + .advanceWatermarkToInfinity()) + .apply( + Window.<Integer>into(FixedWindows.of(Duration.millis(100))) + .withTimestampCombiner(TimestampCombiner.EARLIEST) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.millis(10))))) + .apply(Sum.integersGlobally().withoutDefaults()); + + PAssert.that(triggeredSums) + .containsInAnyOrder(7); + + p.run(); + } + @Test public void testGroupByKeyNonDeterministic() throws Exception {
