Invoke onMerge in AfterWatermarkEarly

This ensures that any triggering state manipulations appropriately
notify the early subtrigger before resetting the finished bit. This
ensures that any timers or state is appropriately migrated to the
merged window.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/13728c1b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/13728c1b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/13728c1b

Branch: refs/heads/master
Commit: 13728c1b940ece4474708dc1a3989dfb4b4d86e3
Parents: 311425c
Author: Thomas Groh <[email protected]>
Authored: Thu Feb 2 09:23:45 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Thu Feb 2 10:27:17 2017 -0800

----------------------------------------------------------------------
 .../triggers/AfterWatermarkStateMachine.java     |  1 +
 .../triggers/AfterWatermarkStateMachineTest.java | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/13728c1b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index 524c057..e83c2f8 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -129,6 +129,7 @@ public class AfterWatermarkStateMachine {
       // the new merged window, because even if the merged window is "done" 
some pending elements
       // haven't had a chance to fire.
       if (!earlyContext.trigger().finishedInAllMergingWindows() || 
!endOfWindowReached(c)) {
+        earlySubtrigger.invokeOnMerge(earlyContext);
         earlyContext.trigger().setFinished(false);
         if (lateTrigger != null) {
           ExecutableTriggerStateMachine lateSubtrigger = 
c.trigger().subTrigger(LATE_INDEX);

http://git-wip-us.apache.org/repos/asf/beam/blob/13728c1b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
index 119c937..e4d10a0 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
@@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -293,6 +295,23 @@ public class AfterWatermarkStateMachineTest {
     assertTrue(tester.shouldFire(mergedWindow));
   }
 
+  @Test
+  public void testEarlyAndLateOnMergeSubtriggerMerges() throws Exception {
+    tester =
+        TriggerStateMachineTester.forTrigger(
+            AfterWatermarkStateMachine.pastEndOfWindow()
+                .withEarlyFirings(mockEarly)
+                .withLateFirings(mockLate),
+            Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    tester.injectElements(5);
+
+    // Merging should re-activate the early trigger in the merged window
+    tester.mergeWindows();
+    verify(mockEarly).onMerge(Mockito.any(OnMergeContext.class));
+  }
+
   /**
    * Tests that the trigger rewinds to be non-finished in the merged window.
    *

Reply via email to