Invoke onMerge in AfterWatermarkEarly This ensures that any triggering state manipulations appropriately notify the early subtrigger before resetting the finished bit. This ensures that any timers or state is appropriately migrated to the merged window.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/13728c1b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/13728c1b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/13728c1b Branch: refs/heads/master Commit: 13728c1b940ece4474708dc1a3989dfb4b4d86e3 Parents: 311425c Author: Thomas Groh <[email protected]> Authored: Thu Feb 2 09:23:45 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Thu Feb 2 10:27:17 2017 -0800 ---------------------------------------------------------------------- .../triggers/AfterWatermarkStateMachine.java | 1 + .../triggers/AfterWatermarkStateMachineTest.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/13728c1b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 524c057..e83c2f8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -129,6 +129,7 @@ public class AfterWatermarkStateMachine { // the new merged window, because even if the merged window is "done" some pending elements // haven't had a chance to fire. if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { + earlySubtrigger.invokeOnMerge(earlyContext); earlyContext.trigger().setFinished(false); if (lateTrigger != null) { ExecutableTriggerStateMachine lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); http://git-wip-us.apache.org/repos/asf/beam/blob/13728c1b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java index 119c937..e4d10a0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java @@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -293,6 +295,23 @@ public class AfterWatermarkStateMachineTest { assertTrue(tester.shouldFire(mergedWindow)); } + @Test + public void testEarlyAndLateOnMergeSubtriggerMerges() throws Exception { + tester = + TriggerStateMachineTester.forTrigger( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(mockEarly) + .withLateFirings(mockLate), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + + // Merging should re-activate the early trigger in the merged window + tester.mergeWindows(); + verify(mockEarly).onMerge(Mockito.any(OnMergeContext.class)); + } + /** * Tests that the trigger rewinds to be non-finished in the merged window. *
