Repository: incubator-beam Updated Branches: refs/heads/master 0952f4433 -> f1aa490b9
Add test for empty ON_TIME and no empty final pane Add a test that we get an empty `ON_TIME` pane, and don't get the empty final pane when using accumulation mode with the only if non-empty `ClosingBehavior`. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4c5f530 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4c5f530 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4c5f530 Branch: refs/heads/master Commit: e4c5f530effc591ef56f8d49162a0d82069a9e31 Parents: fa45809 Author: bchambers <[email protected]> Authored: Tue Apr 19 12:43:52 2016 -0700 Committer: bchambers <[email protected]> Committed: Tue Apr 19 15:23:53 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/util/ReduceFnRunnerTest.java | 54 ++++++++++++++++++++ 1 file changed, 54 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4c5f530/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 5eccb04..65b5ee6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -548,6 +548,60 @@ public class ReduceFnRunnerTest { } @Test + public void noEmptyPanesFinalIfNonEmpty() throws Exception { + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining( + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of( + AfterPane.elementCountAtLeast(2), + AfterWatermark.pastEndOfWindow()))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)) + .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); + + tester.advanceInputWatermark(new Instant(0)); + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(2, new Instant(2))); + tester.advanceInputWatermark(new Instant(20)); + tester.advanceInputWatermark(new Instant(250)); + + List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); + assertThat(output, contains( + // Trigger with 2 elements + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), + // Trigger for the empty on time pane + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); + } + + @Test + public void noEmptyPanesFinalAlways() throws Exception { + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining( + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of( + AfterPane.elementCountAtLeast(2), + AfterWatermark.pastEndOfWindow()))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)) + .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); + + tester.advanceInputWatermark(new Instant(0)); + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(2, new Instant(2))); + tester.advanceInputWatermark(new Instant(20)); + tester.advanceInputWatermark(new Instant(250)); + + List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); + assertThat(output, contains( + // Trigger with 2 elements + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), + // Trigger for the empty on time pane + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10), + // Trigger for the final pane + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); + } + + @Test public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
