Repository: beam Updated Branches: refs/heads/master 01408c864 -> 73da9cc40
ReduceFnRunner: test when watermark leapfrogs EOW and GC This is known to fail in older versions; forward porting regression test. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/22b82969 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/22b82969 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/22b82969 Branch: refs/heads/master Commit: 22b82969828508cfbd45e4f90fe74dfed7914b88 Parents: 01408c8 Author: Kenneth Knowles <[email protected]> Authored: Wed Jul 19 15:27:20 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Jul 25 10:52:55 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnRunnerTest.java | 83 ++++++++++++++++++++ 1 file changed, 83 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/22b82969/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 4f13af1..2341502 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -39,6 +39,7 @@ import static org.mockito.Mockito.withSettings; import com.google.common.collect.Iterables; import java.util.List; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.MetricName; @@ -247,6 +248,88 @@ public class ReduceFnRunnerTest { } /** + * When the watermark passes the end-of-window and window expiration time + * in a single update, this tests that it does not crash. + */ + @Test + public void testSessionEowAndGcTogether() throws Exception { + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = + ReduceFnTester.nonCombining( + Sessions.withGapDuration(Duration.millis(10)), + DefaultTriggerStateMachine.<IntervalWindow>of(), + AccumulationMode.ACCUMULATING_FIRED_PANES, + Duration.millis(50), + ClosingBehavior.FIRE_ALWAYS); + + tester.setAutoAdvanceOutputWatermark(true); + + tester.advanceInputWatermark(new Instant(0)); + injectElement(tester, 1); + tester.advanceInputWatermark(new Instant(100)); + + assertThat( + tester.extractOutput(), + contains( + isSingleWindowedValue( + contains(1), 1, 1, 11, PaneInfo.createPane(true, true, Timing.ON_TIME)))); + } + + /** + * When the watermark passes the end-of-window and window expiration time + * in a single update, this tests that it does not crash. + */ + @Test + public void testFixedWindowsEowAndGcTogether() throws Exception { + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = + ReduceFnTester.nonCombining( + FixedWindows.of(Duration.millis(10)), + DefaultTriggerStateMachine.<IntervalWindow>of(), + AccumulationMode.ACCUMULATING_FIRED_PANES, + Duration.millis(50), + ClosingBehavior.FIRE_ALWAYS); + + tester.setAutoAdvanceOutputWatermark(true); + + tester.advanceInputWatermark(new Instant(0)); + injectElement(tester, 1); + tester.advanceInputWatermark(new Instant(100)); + + assertThat( + tester.extractOutput(), + contains( + isSingleWindowedValue( + contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME)))); + } + + /** + * When the watermark passes the end-of-window and window expiration time + * in a single update, this tests that it does not crash. + */ + @Test + public void testFixedWindowsEowAndGcTogetherFireIfNonEmpty() throws Exception { + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = + ReduceFnTester.nonCombining( + FixedWindows.of(Duration.millis(10)), + DefaultTriggerStateMachine.<IntervalWindow>of(), + AccumulationMode.ACCUMULATING_FIRED_PANES, + Duration.millis(50), + ClosingBehavior.FIRE_IF_NON_EMPTY); + + tester.setAutoAdvanceOutputWatermark(true); + + tester.advanceInputWatermark(new Instant(0)); + injectElement(tester, 1); + tester.advanceInputWatermark(new Instant(100)); + + List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); + assertThat( + output, + contains( + isSingleWindowedValue( + contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME)))); + } + + /** * Tests that with the default trigger we will not produce two ON_TIME panes, even * if there are two outputs that are both candidates. */
