Add unit test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045471c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045471c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045471c1 Branch: refs/heads/master Commit: 045471c1dc7ffeecc8ea8b6c0695498261aa631b Parents: c415be8 Author: Mark Shields <[email protected]> Authored: Wed Mar 9 16:50:16 2016 -0800 Committer: Mark Shields <[email protected]> Committed: Wed Mar 9 16:50:16 2016 -0800 ---------------------------------------------------------------------- .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 5 +-- .../dataflow/sdk/util/ReduceFnRunnerTest.java | 38 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 2b6e0d4..2e2d1f6 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -602,7 +602,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { boolean windowIsActive) throws Exception { if (windowIsActive) { - // Since window was still active the trigger may not have closed. + // Since both the window is in the active window set AND the trigger was not yet closed, + // it is possible we still have state. reduceFn.clearState(renamedContext); watermarkHold.clearHolds(renamedContext); nonEmptyPanes.clearPane(renamedContext.state()); @@ -623,7 +624,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } paneInfoTracker.clear(directContext.state()); if (activeWindows.isActive(directContext.window())) { - // Don't need to track address state windows anymore + // Don't need to track address state windows anymore. activeWindows.remove(directContext.window()); } // We'll never need to test for the trigger being closed again. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index 4fb3e37..e1348f7 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -687,6 +687,44 @@ public class ReduceFnRunnerTest { } /** + * It is possible for a session window's trigger to be closed at the point at which + * the (merged) session window is garbage collected. Make sure we don't accidentally + * assume the window is still active. + */ + @Test + public void testMergingWithCloseBeforeGC() throws Exception { + ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), + ClosingBehavior.FIRE_IF_NON_EMPTY); + + // Two elements in two overlapping session windows. + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), // in [1, 11) + TimestampedValue.of(10, new Instant(10))); // in [10, 20) + + // Close the trigger, but the gargbage collection timer is still pending. + when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTrigger); + tester.advanceInputWatermark(new Instant(30)); + + // Now the garbage collection timer will fire, finding the trigger already closed. + tester.advanceInputWatermark(new Instant(100)); + + List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); + assertThat(output.size(), equalTo(1)); + assertThat(output.get(0), + isSingleWindowedValue(containsInAnyOrder(1, 10), + 1, // timestamp + 1, // window start + 20)); // window end + assertThat( + output.get(0).getPane(), + equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); + } + + + /** * Tests that when data is assigned to multiple windows but some of those windows have * had their triggers finish, then the data is dropped and counted accurately. */
