http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
deleted file mode 100644
index fb2b4d5..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/NeverTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link Never}.
- */
-@RunWith(JUnit4.class)
-public class NeverTest {
-  private SimpleTriggerTester<IntervalWindow> triggerTester;
-
-  @Before
-  public void setup() throws Exception {
-    triggerTester =
-        TriggerTester.forTrigger(
-            Never.ever(), FixedWindows.of(Duration.standardMinutes(5)));
-  }
-
-  @Test
-  public void falseAfterEndOfWindow() throws Exception {
-    triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
-    IntervalWindow window =
-        new IntervalWindow(new Instant(0), new 
Instant(0).plus(Duration.standardMinutes(5)));
-    assertThat(triggerTester.shouldFire(window), is(false));
-    triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(triggerTester.shouldFire(window), is(false));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
deleted file mode 100644
index 7289d97..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/OrFinallyTriggerTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link OrFinallyTrigger}.
- */
-@RunWith(JUnit4.class)
-public class OrFinallyTriggerTest {
-
-  private SimpleTriggerTester<IntervalWindow> tester;
-
-  /**
-   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
-   * fires and finishes, the {@code OrFinally} also fires and finishes.
-   */
-  @Test
-  public void testActualFiresAndFinishes() throws Exception {
-    tester = TriggerTester.forTrigger(
-        new OrFinallyTrigger(
-            AfterPane.elementCountAtLeast(2),
-            AfterPane.elementCountAtLeast(100)),
-        FixedWindows.of(Duration.millis(100)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
-
-    // Not yet firing
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-    assertFalse(tester.isMarkedFinished(window));
-
-    // The actual fires and finishes
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  /**
-   * Tests that for {@code OrFinally(actual, ...)} when {@code actual}
-   * fires but does not finish, the {@code OrFinally} also fires and also does 
not
-   * finish.
-   */
-  @Test
-  public void testActualFiresOnly() throws Exception {
-    tester = TriggerTester.forTrigger(
-        new OrFinallyTrigger(
-            Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
-            AfterPane.elementCountAtLeast(100)),
-        FixedWindows.of(Duration.millis(100)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
-
-    // Not yet firing
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-    assertFalse(tester.isMarkedFinished(window));
-
-    // The actual fires but does not finish
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // And again
-    tester.injectElements(3, 4);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-  }
-
-  /**
-   * Tests that if the first trigger rewinds to be non-finished in the merged 
window,
-   * then it becomes the currently active trigger again, with real triggers.
-   */
-  @Test
-  public void testShouldFireAfterMerge() throws Exception {
-    tester = TriggerTester.forTrigger(
-        AfterEach.inOrder(
-            AfterPane.elementCountAtLeast(5)
-                .orFinally(AfterWatermark.pastEndOfWindow()),
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    // Finished the orFinally in the first window
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    assertFalse(tester.shouldFire(firstWindow));
-    tester.advanceInputWatermark(new Instant(11));
-    assertTrue(tester.shouldFire(firstWindow));
-    tester.fireIfShouldFire(firstWindow);
-
-    // Set up second window where it is not done
-    tester.injectElements(5);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Merge them, if the merged window were on the second trigger, it would 
be ready
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-    assertFalse(tester.shouldFire(mergedWindow));
-
-    // Now adding 3 more makes the main trigger ready to fire
-    tester.injectElements(1, 2, 3, 4, 5);
-    tester.mergeWindows();
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  /**
-   * Tests that for {@code OrFinally(actual, until)} when {@code actual}
-   * fires but does not finish, then {@code until} fires and finishes, the
-   * whole thing fires and finished.
-   */
-  @Test
-  public void testActualFiresButUntilFinishes() throws Exception {
-    tester = TriggerTester.forTrigger(
-        new OrFinallyTrigger(
-            Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
-                AfterPane.elementCountAtLeast(3)),
-        FixedWindows.of(Duration.millis(10)));
-
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
-
-    // Before any firing
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-    assertFalse(tester.isMarkedFinished(window));
-
-    // The actual fires but doesn't finish
-    tester.injectElements(2);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.isMarkedFinished(window));
-
-    // The until fires and finishes; the trigger is finished
-    tester.injectElements(3);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertTrue(tester.isMarkedFinished(window));
-  }
-
-  @Test
-  public void testFireDeadline() throws Exception {
-    BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10));
-
-    assertEquals(new Instant(9),
-        Repeatedly.forever(AfterWatermark.pastEndOfWindow())
-        .getWatermarkThatGuaranteesFiring(window));
-    assertEquals(new Instant(9), 
Repeatedly.forever(AfterWatermark.pastEndOfWindow())
-        .orFinally(AfterPane.elementCountAtLeast(1))
-        .getWatermarkThatGuaranteesFiring(window));
-    assertEquals(new Instant(9), 
Repeatedly.forever(AfterPane.elementCountAtLeast(1))
-        .orFinally(AfterWatermark.pastEndOfWindow())
-        .getWatermarkThatGuaranteesFiring(window));
-    assertEquals(new Instant(9),
-        AfterPane.elementCountAtLeast(100)
-            .orFinally(AfterWatermark.pastEndOfWindow())
-            .getWatermarkThatGuaranteesFiring(window));
-
-    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE,
-        Repeatedly.forever(AfterPane.elementCountAtLeast(1))
-        .orFinally(AfterPane.elementCountAtLeast(10))
-        .getWatermarkThatGuaranteesFiring(window));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    OnceTrigger triggerA = AfterProcessingTime.pastFirstElementInPane();
-    OnceTrigger triggerB = AfterWatermark.pastEndOfWindow();
-    Trigger aOrFinallyB = triggerA.orFinally(triggerB);
-    Trigger bOrFinallyA = triggerB.orFinally(triggerA);
-    assertEquals(
-        Repeatedly.forever(
-            
triggerA.getContinuationTrigger().orFinally(triggerB.getContinuationTrigger())),
-        aOrFinallyB.getContinuationTrigger());
-    assertEquals(
-        Repeatedly.forever(
-            
triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())),
-        bOrFinallyA.getContinuationTrigger());
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = 
StubTrigger.named("t1").orFinally(StubTrigger.named("t2"));
-    assertEquals("t1.orFinally(t2)", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
deleted file mode 100644
index 6e8930d..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/RepeatedlyTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.util.TriggerTester;
-import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link Repeatedly}.
- */
-@RunWith(JUnit4.class)
-public class RepeatedlyTest {
-
-  @Mock private Trigger mockTrigger;
-  private SimpleTriggerTester<IntervalWindow> tester;
-  private static Trigger.TriggerContext anyTriggerContext() {
-    return Mockito.<Trigger.TriggerContext>any();
-  }
-
-  public void setUp(WindowFn<Object, IntervalWindow> windowFn) throws 
Exception {
-    MockitoAnnotations.initMocks(this);
-    tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), 
windowFn);
-  }
-
-  /**
-   * Tests that onElement correctly passes the data on to the subtrigger.
-   */
-  @Test
-  public void testOnElement() throws Exception {
-    setUp(FixedWindows.of(Duration.millis(10)));
-    tester.injectElements(37);
-    verify(mockTrigger).onElement(Mockito.<Trigger.OnElementContext>any());
-  }
-
-  /**
-   * Tests that the repeatedly is ready to fire whenever the subtrigger is 
ready.
-   */
-  @Test
-  public void testShouldFire() throws Exception {
-    setUp(FixedWindows.of(Duration.millis(10)));
-
-    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
-    assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
-
-    when(mockTrigger.shouldFire(Mockito.<Trigger.TriggerContext>any()))
-        .thenReturn(false);
-    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new 
Instant(10))));
-  }
-
-  /**
-   * Tests that the watermark that guarantees firing is that of the subtrigger.
-   */
-  @Test
-  public void testFireDeadline() throws Exception {
-    setUp(FixedWindows.of(Duration.millis(10)));
-    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
-    Instant arbitraryInstant = new Instant(34957849);
-
-    
when(mockTrigger.getWatermarkThatGuaranteesFiring(Mockito.<IntervalWindow>any()))
-        .thenReturn(arbitraryInstant);
-
-    assertThat(
-        
Repeatedly.forever(mockTrigger).getWatermarkThatGuaranteesFiring(window),
-        equalTo(arbitraryInstant));
-  }
-
-  @Test
-  public void testContinuation() throws Exception {
-    Trigger trigger = AfterProcessingTime.pastFirstElementInPane();
-    Trigger repeatedly = Repeatedly.forever(trigger);
-    assertEquals(
-        Repeatedly.forever(trigger.getContinuationTrigger()), 
repeatedly.getContinuationTrigger());
-    assertEquals(
-        
Repeatedly.forever(trigger.getContinuationTrigger().getContinuationTrigger()),
-        repeatedly.getContinuationTrigger().getContinuationTrigger());
-  }
-
-  @Test
-  public void testShouldFireAfterMerge() throws Exception {
-    tester = TriggerTester.forTrigger(
-        Repeatedly.forever(AfterPane.elementCountAtLeast(2)),
-        Sessions.withGapDuration(Duration.millis(10)));
-
-    tester.injectElements(1);
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
-    assertFalse(tester.shouldFire(firstWindow));
-
-    tester.injectElements(5);
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
-    assertFalse(tester.shouldFire(secondWindow));
-
-    // Merge them, if the merged window were on the second trigger, it would 
be ready
-    tester.mergeWindows();
-    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
-    assertTrue(tester.shouldFire(mergedWindow));
-  }
-
-  @Test
-  public void testRepeatedlyAfterFirstElementCount() throws Exception {
-    SimpleTriggerTester<GlobalWindow> tester =
-        TriggerTester.forTrigger(
-            Repeatedly.forever(
-                AfterFirst.of(
-                    AfterProcessingTime.pastFirstElementInPane()
-                        .plusDelayOf(Duration.standardMinutes(15)),
-                    AfterPane.elementCountAtLeast(5))),
-            new GlobalWindows());
-
-    GlobalWindow window = GlobalWindow.INSTANCE;
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2, 3, 4, 5);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.shouldFire(window));
-  }
-
-  @Test
-  public void testRepeatedlyAfterFirstProcessingTime() throws Exception {
-    SimpleTriggerTester<GlobalWindow> tester =
-        TriggerTester.forTrigger(
-            Repeatedly.forever(
-                AfterFirst.of(
-                    AfterProcessingTime.pastFirstElementInPane()
-                        .plusDelayOf(Duration.standardMinutes(15)),
-                    AfterPane.elementCountAtLeast(5))),
-            new GlobalWindows());
-
-    GlobalWindow window = GlobalWindow.INSTANCE;
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.shouldFire(window));
-  }
-
-  @Test
-  public void testRepeatedlyElementCount() throws Exception {
-    SimpleTriggerTester<GlobalWindow> tester =
-        TriggerTester.forTrigger(
-            Repeatedly.forever(AfterPane.elementCountAtLeast(5)),
-            new GlobalWindows());
-
-    GlobalWindow window = GlobalWindow.INSTANCE;
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.injectElements(2, 3, 4, 5);
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.shouldFire(window));
-  }
-
-  @Test
-  public void testRepeatedlyProcessingTime() throws Exception {
-    SimpleTriggerTester<GlobalWindow> tester =
-        TriggerTester.forTrigger(
-            Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane()
-                        .plusDelayOf(Duration.standardMinutes(15))),
-            new GlobalWindows());
-
-    GlobalWindow window = GlobalWindow.INSTANCE;
-
-    tester.injectElements(1);
-    assertFalse(tester.shouldFire(window));
-
-    tester.advanceProcessingTime(new 
Instant(0).plus(Duration.standardMinutes(15)));
-    assertTrue(tester.shouldFire(window));
-    tester.fireIfShouldFire(window);
-    assertFalse(tester.shouldFire(window));
-  }
-
-
-  @Test
-  public void testToString() {
-    Trigger trigger = Repeatedly.forever(new StubTrigger() {
-        @Override
-        public String toString() {
-          return "innerTrigger";
-        }
-      });
-
-    assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
deleted file mode 100644
index 83077f4..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/ReshuffleTriggerTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ReshuffleTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ReshuffleTriggerTest {
-
-  /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */
-  public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() {
-    return new ReshuffleTrigger<>();
-  }
-
-  @Test
-  public void testShouldFire() throws Exception {
-    TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
-        new ReshuffleTrigger<IntervalWindow>(), 
FixedWindows.of(Duration.millis(100)));
-    IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new 
Instant(400));
-    assertTrue(tester.shouldFire(arbitraryWindow));
-  }
-
-  @Test
-  public void testOnTimer() throws Exception {
-    TriggerTester<Integer, IntervalWindow> tester = TriggerTester.forTrigger(
-        new ReshuffleTrigger<IntervalWindow>(), 
FixedWindows.of(Duration.millis(100)));
-    IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new 
Instant(200));
-    tester.fireIfShouldFire(arbitraryWindow);
-    assertFalse(tester.isMarkedFinished(arbitraryWindow));
-  }
-
-  @Test
-  public void testToString() {
-    Trigger trigger = new ReshuffleTrigger<>();
-    assertEquals("ReshuffleTrigger()", trigger.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
deleted file mode 100644
index b258a79..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/StubTrigger.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.collect.Lists;
-import java.util.List;
-import org.joda.time.Instant;
-
-/**
- * No-op {@link OnceTrigger} implementation for testing.
- */
-abstract class StubTrigger extends Trigger.OnceTrigger {
-  /**
-   * Create a stub {@link Trigger} instance which returns the specified name 
on {@link #toString()}.
-   */
-  static StubTrigger named(final String name) {
-    return new StubTrigger() {
-      @Override
-      public String toString() {
-        return name;
-      }
-    };
-  }
-
-  protected StubTrigger() {
-    super(Lists.<Trigger>newArrayList());
-  }
-
-  @Override
-  protected void onOnlyFiring(TriggerContext context) throws Exception {
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-  }
-
-  @Override
-  public boolean shouldFire(TriggerContext context) throws Exception {
-    return false;
-  }
-
-  @Override
-  protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) 
{
-    return null;
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
deleted file mode 100644
index cfc03b2..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.List;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link Trigger}.
- */
-@RunWith(JUnit4.class)
-public class TriggerTest {
-
-  @Test
-  public void testTriggerToString() throws Exception {
-    assertEquals("AfterWatermark.pastEndOfWindow()", 
AfterWatermark.pastEndOfWindow().toString());
-    assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())",
-        Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString());
-  }
-
-  @Test
-  public void testIsCompatible() throws Exception {
-    assertTrue(new Trigger1(null).isCompatible(new Trigger1(null)));
-    assertTrue(new Trigger1(Arrays.<Trigger>asList(new Trigger2(null)))
-        .isCompatible(new Trigger1(Arrays.<Trigger>asList(new 
Trigger2(null)))));
-
-    assertFalse(new Trigger1(null).isCompatible(new Trigger2(null)));
-    assertFalse(new Trigger1(Arrays.<Trigger>asList(new Trigger1(null)))
-        .isCompatible(new Trigger1(Arrays.<Trigger>asList(new 
Trigger2(null)))));
-  }
-
-  private static class Trigger1 extends Trigger {
-
-    private Trigger1(List<Trigger> subTriggers) {
-      super(subTriggers);
-    }
-
-    @Override
-    public void onElement(Trigger.OnElementContext c) { }
-
-    @Override
-    public void onMerge(Trigger.OnMergeContext c) { }
-
-    @Override
-    protected Trigger getContinuationTrigger(
-        List<Trigger> continuationTriggers) {
-      return null;
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      return null;
-    }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
-      return false;
-    }
-
-    @Override
-    public void onFire(Trigger.TriggerContext context) throws Exception { }
-  }
-
-  private static class Trigger2 extends Trigger {
-
-    private Trigger2(List<Trigger> subTriggers) {
-      super(subTriggers);
-    }
-
-    @Override
-    public void onElement(Trigger.OnElementContext c) { }
-
-    @Override
-    public void onMerge(Trigger.OnMergeContext c) { }
-
-    @Override
-    protected Trigger getContinuationTrigger(
-        List<Trigger> continuationTriggers) {
-      return null;
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      return null;
-    }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) throws Exception 
{
-      return false;
-    }
-
-    @Override
-    public void onFire(Trigger.TriggerContext context) throws Exception { }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
deleted file mode 100644
index 5fe17ad..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/reactors/TriggerTester.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import 
org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Test utility that runs a {@link Trigger}, using in-memory stub 
implementation to provide
- * the {@link StateInternals}.
- *
- * @param <W> The type of windows being used.
- */
-public class TriggerTester<InputT, W extends BoundedWindow> {
-
-  /**
-   * A {@link TriggerTester} specialized to {@link Integer} values, so 
elements and timestamps
-   * can be conflated. Today, triggers should not observed the element type, 
so this is the
-   * only trigger tester that needs to be used.
-   */
-  public static class SimpleTriggerTester<W extends BoundedWindow>
-      extends TriggerTester<Integer, W> {
-
-    private SimpleTriggerTester(WindowingStrategy<Object, W> 
windowingStrategy) throws Exception {
-      super(windowingStrategy);
-    }
-
-    public void injectElements(int... values) throws Exception {
-      List<TimestampedValue<Integer>> timestampedValues =
-          Lists.newArrayListWithCapacity(values.length);
-      for (int value : values) {
-        timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
-      }
-      injectElements(timestampedValues);
-    }
-
-    public SimpleTriggerTester<W> withAllowedLateness(Duration 
allowedLateness) throws Exception {
-      return new SimpleTriggerTester<>(
-          windowingStrategy.withAllowedLateness(allowedLateness));
-    }
-  }
-
-  protected final WindowingStrategy<Object, W> windowingStrategy;
-
-  private final TestInMemoryStateInternals<?> stateInternals =
-      new TestInMemoryStateInternals<Object>(null /* key */);
-  private final InMemoryTimerInternals timerInternals = new 
InMemoryTimerInternals();
-  private final TriggerContextFactory<W> contextFactory;
-  private final WindowFn<Object, W> windowFn;
-  private final ActiveWindowSet<W> activeWindows;
-  private final Map<W, W> windowToMergeResult;
-
-  /**
-   * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link 
Trigger}
-   * under test.
-   */
-  private final ExecutableTrigger executableTrigger;
-
-  /**
-   * A map from a window and trigger to whether that trigger is finished for 
the window.
-   */
-  private final Map<W, FinishedTriggers> finishedSets;
-
-  public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
-      Trigger trigger, WindowFn<Object, W> windowFn)
-          throws Exception {
-    WindowingStrategy<Object, W> windowingStrategy =
-        WindowingStrategy.of(windowFn).withTrigger(trigger)
-        // Merging requires accumulation mode or early firings can break up a 
session.
-        // Not currently an issue with the tester (because we never GC) but we 
don't want
-        // mystery failures due to violating this need.
-        .withMode(windowFn.isNonMerging()
-            ? AccumulationMode.DISCARDING_FIRED_PANES
-            : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-    return new SimpleTriggerTester<>(windowingStrategy);
-  }
-
-  public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> 
forAdvancedTrigger(
-      Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
-    WindowingStrategy<Object, W> strategy =
-        WindowingStrategy.of(windowFn).withTrigger(trigger)
-        // Merging requires accumulation mode or early firings can break up a 
session.
-        // Not currently an issue with the tester (because we never GC) but we 
don't want
-        // mystery failures due to violating this need.
-        .withMode(windowFn.isNonMerging()
-            ? AccumulationMode.DISCARDING_FIRED_PANES
-            : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-    return new TriggerTester<>(strategy);
-  }
-
-  protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) 
throws Exception {
-    this.windowingStrategy = windowingStrategy;
-    this.windowFn = windowingStrategy.getWindowFn();
-    this.executableTrigger = windowingStrategy.getTrigger();
-    this.finishedSets = new HashMap<>();
-
-    this.activeWindows =
-        windowFn.isNonMerging()
-            ? new NonMergingActiveWindowSet<W>()
-            : new MergingActiveWindowSet<W>(windowFn, stateInternals);
-    this.windowToMergeResult = new HashMap<>();
-
-    this.contextFactory =
-        new TriggerContextFactory<>(windowingStrategy.getWindowFn(), 
stateInternals, activeWindows);
-  }
-
-  /**
-   * Instructs the trigger to clear its state for the given window.
-   */
-  public void clearState(W window) throws Exception {
-    executableTrigger.invokeClear(contextFactory.base(window,
-        new TestTimers(windowNamespace(window)), executableTrigger, 
getFinishedSet(window)));
-  }
-
-  /**
-   * Asserts that the trigger has actually cleared all of its state for the 
given window. Since
-   * the trigger under test is the root, this makes the assert for all 
triggers regardless
-   * of their position in the trigger tree.
-   */
-  public void assertCleared(W window) {
-    for (StateNamespace untypedNamespace : 
stateInternals.getNamespacesInUse()) {
-      if (untypedNamespace instanceof WindowAndTriggerNamespace) {
-        @SuppressWarnings("unchecked")
-        WindowAndTriggerNamespace<W> namespace = 
(WindowAndTriggerNamespace<W>) untypedNamespace;
-        if (namespace.getWindow().equals(window)) {
-          Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
-          assertTrue("Trigger has not cleared tags: " + tagsInUse, 
tagsInUse.isEmpty());
-        }
-      }
-    }
-  }
-
-  /**
-   * Returns {@code true} if the {@link Trigger} under test is finished for 
the given window.
-   */
-  public boolean isMarkedFinished(W window) {
-    FinishedTriggers finishedSet = finishedSets.get(window);
-    if (finishedSet == null) {
-      return false;
-    }
-
-    return finishedSet.isFinished(executableTrigger);
-  }
-
-  private StateNamespace windowNamespace(W window) {
-    return StateNamespaces.window(windowFn.windowCoder(), 
checkNotNull(window));
-  }
-
-  /**
-   * Advance the input watermark to the specified time, then advance the 
output watermark as far as
-   * possible.
-   */
-  public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
-    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
-    timerInternals.advanceInputWatermark(TimerCallback.NO_OP, 
newInputWatermark);
-  }
-
-  /** Advance the processing time to the specified time. */
-  public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
-    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
-    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, 
newProcessingTime);
-  }
-
-  /**
-   * Inject all the timestamped values (after passing through the window 
function) as if they
-   * arrived in a single chunk of a bundle (or work-unit).
-   */
-  @SafeVarargs
-  public final void injectElements(TimestampedValue<InputT>... values) throws 
Exception {
-    injectElements(Arrays.asList(values));
-  }
-
-  public final void injectElements(Collection<TimestampedValue<InputT>> 
values) throws Exception {
-    for (TimestampedValue<InputT> value : values) {
-      WindowTracing.trace("TriggerTester.injectElements: {}", value);
-    }
-
-    List<WindowedValue<InputT>> windowedValues = 
Lists.newArrayListWithCapacity(values.size());
-
-    for (TimestampedValue<InputT> input : values) {
-      try {
-        InputT value = input.getValue();
-        Instant timestamp = input.getTimestamp();
-        Collection<W> assignedWindows = windowFn.assignWindows(new 
TestAssignContext<W>(
-            windowFn, value, timestamp, GlobalWindow.INSTANCE));
-
-        for (W window : assignedWindows) {
-          activeWindows.addActiveForTesting(window);
-
-          // Today, triggers assume onTimer firing at the watermark time, 
whether or not they
-          // explicitly set the timer themselves. So this tester must set it.
-          timerInternals.setTimer(
-              TimerData.of(windowNamespace(window), window.maxTimestamp(), 
TimeDomain.EVENT_TIME));
-        }
-
-        windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, 
PaneInfo.NO_FIRING));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    for (WindowedValue<InputT> windowedValue : windowedValues) {
-      for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
-        // SDK is responsible for type safety
-        @SuppressWarnings("unchecked")
-        W window = mergeResult((W) untypedWindow);
-
-        Trigger.OnElementContext context = 
contextFactory.createOnElementContext(window,
-            new TestTimers(windowNamespace(window)), 
windowedValue.getTimestamp(),
-            executableTrigger, getFinishedSet(window));
-
-        if (!context.trigger().isFinished()) {
-          executableTrigger.invokeOnElement(context);
-        }
-      }
-    }
-  }
-
-  public boolean shouldFire(W window) throws Exception {
-    Trigger.TriggerContext context = contextFactory.base(
-        window,
-        new TestTimers(windowNamespace(window)),
-        executableTrigger, getFinishedSet(window));
-    executableTrigger.getSpec().prefetchShouldFire(context.state());
-    return executableTrigger.invokeShouldFire(context);
-  }
-
-  public void fireIfShouldFire(W window) throws Exception {
-    Trigger.TriggerContext context = contextFactory.base(
-        window,
-        new TestTimers(windowNamespace(window)),
-        executableTrigger, getFinishedSet(window));
-
-    executableTrigger.getSpec().prefetchShouldFire(context.state());
-    if (executableTrigger.invokeShouldFire(context)) {
-      executableTrigger.getSpec().prefetchOnFire(context.state());
-      executableTrigger.invokeOnFire(context);
-      if (context.trigger().isFinished()) {
-        activeWindows.remove(window);
-        executableTrigger.invokeClear(context);
-      }
-    }
-  }
-
-  public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, 
boolean value) {
-    
getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex),
 value);
-  }
-
-  /**
-   * Invokes merge from the {@link WindowFn} a single time and passes the 
resulting merge
-   * events on to the trigger under test. Does not persist the fact that 
merging happened,
-   * since it is just to test the trigger's {@code OnMerge} method.
-   */
-  public final void mergeWindows() throws Exception {
-    windowToMergeResult.clear();
-    activeWindows.merge(new MergeCallback<W>() {
-      @Override
-      public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) 
throws Exception {}
-
-      @Override
-      public void onMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
-        List<W> activeToBeMerged = new ArrayList<W>();
-        for (W window : toBeMerged) {
-          windowToMergeResult.put(window, mergeResult);
-          if (activeWindows.isActive(window)) {
-            activeToBeMerged.add(window);
-          }
-        }
-        Map<W, FinishedTriggers> mergingFinishedSets =
-            Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
-        for (W oldWindow : activeToBeMerged) {
-          mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
-        }
-        
executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
-            new TestTimers(windowNamespace(mergeResult)), executableTrigger,
-            getFinishedSet(mergeResult), mergingFinishedSets));
-        timerInternals.setTimer(TimerData.of(
-            windowNamespace(mergeResult), mergeResult.maxTimestamp(), 
TimeDomain.EVENT_TIME));
-      }
-    });
-  }
-
-  public  W mergeResult(W window) {
-    W result = windowToMergeResult.get(window);
-    return result == null ? window : result;
-  }
-
-  private FinishedTriggers getFinishedSet(W window) {
-    FinishedTriggers finishedSet = finishedSets.get(window);
-    if (finishedSet == null) {
-      finishedSet = FinishedTriggersSet.fromSet(new 
HashSet<ExecutableTrigger>());
-      finishedSets.put(window, finishedSet);
-    }
-    return finishedSet;
-  }
-
-  private static class TestAssignContext<W extends BoundedWindow>
-      extends WindowFn<Object, W>.AssignContext {
-    private Object element;
-    private Instant timestamp;
-    private BoundedWindow window;
-
-    public TestAssignContext(
-        WindowFn<Object, W> windowFn, Object element, Instant timestamp, 
BoundedWindow window) {
-      windowFn.super();
-      this.element = element;
-      this.timestamp = timestamp;
-      this.window = window;
-    }
-
-    @Override
-    public Object element() {
-      return element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return window;
-    }
-  }
-
-  private class TestTimers implements Timers {
-    private final StateNamespace namespace;
-
-    public TestTimers(StateNamespace namespace) {
-      checkArgument(namespace instanceof WindowNamespace);
-      this.namespace = namespace;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, 
timeDomain));
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timerInternals.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timerInternals.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    public Instant currentEventTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java
new file mode 100644
index 0000000..907292c
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterAllStateMachineTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link AfterAllStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterAllStateMachineTest {
+
+  private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+
+  @Test
+  public void testT1FiresFirst() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterAllStateMachine.of(
+            AfterPaneStateMachine.elementCountAtLeast(1),
+            AfterPaneStateMachine.elementCountAtLeast(2)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testT2FiresFirst() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterAllStateMachine.of(
+            AfterPaneStateMachine.elementCountAtLeast(2),
+            AfterPaneStateMachine.elementCountAtLeast(1)),
+        FixedWindows.of(Duration.millis(100)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(100));
+
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that the AfterAll properly unsets finished bits when a merge 
causing it to become
+   * unfinished.
+   */
+  @Test
+  public void testOnMergeRewinds() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterEachStateMachine.inOrder(
+            AfterAllStateMachine.of(
+                AfterWatermarkStateMachine.pastEndOfWindow(),
+                AfterPaneStateMachine.elementCountAtLeast(1)),
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+
+    // Finish the AfterAll in the first window
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Merge them; the AfterAll should not be finished
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertFalse(tester.isMarkedFinished(mergedWindow));
+
+    // Confirm that we are back on the first trigger by probing that it is not 
ready to fire
+    // after an element (with merging)
+    tester.injectElements(3);
+    tester.mergeWindows();
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Fire the AfterAll in the merged window
+    tester.advanceInputWatermark(new Instant(15));
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+
+    // Confirm that we are on the second trigger by probing
+    tester.injectElements(2);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+    tester.injectElements(2);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+  }
+
+  @Test
+  public void testToString() {
+    TriggerStateMachine trigger =
+        AfterAllStateMachine.of(
+            StubTriggerStateMachine.named("t1"), 
StubTriggerStateMachine.named("t2"));
+    assertEquals("AfterAll.of(t1, t2)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java
new file mode 100644
index 0000000..4fae8f1
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterEachStateMachineTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link AfterEachStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterEachStateMachineTest {
+
+  private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  /**
+   * Tests that the {@link AfterEachStateMachine} trigger fires and finishes 
the first trigger then
+   * the second.
+   */
+  @Test
+  public void testAfterEachInSequence() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterEachStateMachine.inOrder(
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(2))
+                .orFinally(AfterPaneStateMachine.elementCountAtLeast(3)),
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(5))
+                .orFinally(AfterWatermarkStateMachine.pastEndOfWindow())),
+            FixedWindows.of(Duration.millis(10)));
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+
+    // AfterCount(2) not ready
+    tester.injectElements(1);
+    assertFalse(tester.shouldFire(window));
+
+    // AfterCount(2) ready, not finished
+    tester.injectElements(2);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // orFinally(AfterCount(3)) ready and will finish the first
+    tester.injectElements(1, 2, 3);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // Now running as the second trigger
+    assertFalse(tester.shouldFire(window));
+    // This quantity of elements would fire and finish if it were erroneously 
still the first
+    tester.injectElements(1, 2, 3, 4);
+    assertFalse(tester.shouldFire(window));
+
+    // Now fire once
+    tester.injectElements(5);
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertFalse(tester.isMarkedFinished(window));
+
+    // This time advance the watermark to finish the whole mess.
+    tester.advanceInputWatermark(new Instant(10));
+    assertTrue(tester.shouldFire(window));
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testToString() {
+    TriggerStateMachine trigger = AfterEachStateMachine.inOrder(
+        StubTriggerStateMachine.named("t1"),
+        StubTriggerStateMachine.named("t2"),
+        StubTriggerStateMachine.named("t3"));
+
+    assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
new file mode 100644
index 0000000..453c8ff
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link AfterFirstStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterFirstStateMachineTest {
+
+  @Mock private OnceTriggerStateMachine mockTrigger1;
+  @Mock private OnceTriggerStateMachine mockTrigger2;
+  private SimpleTriggerStateMachineTester<IntervalWindow> tester;
+  private static TriggerStateMachine.TriggerContext anyTriggerContext() {
+    return Mockito.<TriggerStateMachine.TriggerContext>any();
+  }
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testNeitherShouldFireFixedWindows() throws Exception {
+    tester =
+        TriggerStateMachineTester.forTrigger(
+            AfterFirstStateMachine.of(mockTrigger1, mockTrigger2),
+            FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
+
+    assertFalse(tester.shouldFire(window)); // should not fire
+    assertFalse(tester.isMarkedFinished(window)); // not finished
+  }
+
+  @Test
+  public void testOnlyT1ShouldFireFixedWindows() throws Exception {
+    tester =
+        TriggerStateMachineTester.forTrigger(
+            AfterFirstStateMachine.of(mockTrigger1, mockTrigger2),
+            FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(1), new 
Instant(11));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false);
+
+    assertTrue(tester.shouldFire(window)); // should fire
+
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testOnlyT2ShouldFireFixedWindows() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+    AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), 
FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(1), new 
Instant(11));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(window)); // should fire
+
+    tester.fireIfShouldFire(window); // now finished
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  @Test
+  public void testBothShouldFireFixedWindows() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+    AfterFirstStateMachine.of(mockTrigger1, mockTrigger2), 
FixedWindows.of(Duration.millis(10)));
+    tester.injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(1), new 
Instant(11));
+
+    when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true);
+    when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true);
+    assertTrue(tester.shouldFire(window)); // should fire
+
+    tester.fireIfShouldFire(window);
+    assertTrue(tester.isMarkedFinished(window));
+  }
+
+  /**
+   * Tests that if the first trigger rewinds to be non-finished in the merged 
window,
+   * then it becomes the currently active trigger again, with real triggers.
+   */
+  @Test
+  public void testShouldFireAfterMerge() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterEachStateMachine.inOrder(
+            AfterFirstStateMachine.of(
+                AfterPaneStateMachine.elementCountAtLeast(5),
+                AfterWatermarkStateMachine.pastEndOfWindow()),
+            
RepeatedlyStateMachine.forever(AfterPaneStateMachine.elementCountAtLeast(1))),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    // Finished the AfterFirst in the first window
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+    tester.advanceInputWatermark(new Instant(11));
+    assertTrue(tester.shouldFire(firstWindow));
+    tester.fireIfShouldFire(firstWindow);
+
+    // Set up second window where it is not done
+    tester.injectElements(5);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new 
Instant(15));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Merge them, if the merged window were on the second trigger, it would 
be ready
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(15));
+    assertFalse(tester.shouldFire(mergedWindow));
+
+    // Now adding 3 more makes the AfterFirst ready to fire
+    tester.injectElements(1, 2, 3, 4, 5);
+    tester.mergeWindows();
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java
new file mode 100644
index 0000000..4240174
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachineTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link AfterPaneStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterPaneStateMachineTest {
+
+  SimpleTriggerStateMachineTester<IntervalWindow> tester;
+  /**
+   * Tests that the trigger does fire when enough elements are in a window, 
and that it only
+   * fires that window (no leakage).
+   */
+  @Test
+  public void testAfterPaneElementCountFixedWindows() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterPaneStateMachine.elementCountAtLeast(2),
+        FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1); // [0, 10)
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    assertFalse(tester.shouldFire(window));
+
+    tester.injectElements(2); // [0, 10)
+    tester.injectElements(11); // [10, 20)
+
+    assertTrue(tester.shouldFire(window)); // ready to fire
+    tester.fireIfShouldFire(window); // and finished
+    assertTrue(tester.isMarkedFinished(window));
+
+    // But don't finish the other window
+    assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), 
new Instant(20))));
+  }
+
+  @Test
+  public void testClear() throws Exception {
+    SimpleTriggerStateMachineTester<IntervalWindow> tester = 
TriggerStateMachineTester.forTrigger(
+        AfterPaneStateMachine.elementCountAtLeast(2),
+        FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1, 2, 3);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.clearState(window);
+    tester.assertCleared(window);
+  }
+
+  @Test
+  public void testAfterPaneElementCountSessions() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterPaneStateMachine.elementCountAtLeast(2),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.injectElements(
+        1, // in [1, 11)
+        2); // in [2, 12)
+
+    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new 
Instant(11))));
+    assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new 
Instant(12))));
+
+    tester.mergeWindows();
+
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(12));
+    assertTrue(tester.shouldFire(mergedWindow));
+    tester.fireIfShouldFire(mergedWindow);
+    assertTrue(tester.isMarkedFinished(mergedWindow));
+
+    // Because we closed the previous window, we don't have it around to merge 
with. So there
+    // will be a new FIRE_AND_FINISH result.
+    tester.injectElements(
+        7,  // in [7, 17)
+        9); // in [9, 19)
+
+    tester.mergeWindows();
+
+    IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new 
Instant(19));
+    assertTrue(tester.shouldFire(newMergedWindow));
+    tester.fireIfShouldFire(newMergedWindow);
+    assertTrue(tester.isMarkedFinished(newMergedWindow));
+  }
+
+  @Test
+  public void testToString() {
+    TriggerStateMachine trigger = AfterPaneStateMachine.elementCountAtLeast(5);
+    assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
new file mode 100644
index 0000000..9fbf801
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterProcessingTimeStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterProcessingTimeStateMachineTest {
+
+  /**
+   * Tests the basic property that the trigger does wait for processing time 
to be
+   * far enough advanced.
+   */
+  @Test
+  public void testAfterProcessingTimeFixedWindows() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerStateMachineTester<IntervalWindow> tester = 
TriggerStateMachineTester.forTrigger(
+        AfterProcessingTimeStateMachine
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        FixedWindows.of(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+
+    // Timer at 15
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.advanceProcessingTime(new Instant(12));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    // Load up elements in the next window, timer at 17 for them
+    tester.injectElements(11, 12, 13);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Not quite time to fire
+    tester.advanceProcessingTime(new Instant(14));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Timer at 19 for these in the first window; it should be ignored since 
the 15 will fire first
+    tester.injectElements(2, 3);
+
+    // Advance past the first timer and fire, finishing the first window
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.isMarkedFinished(firstWindow));
+
+    // The next window fires and finishes now
+    tester.advanceProcessingTime(new Instant(18));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(secondWindow);
+    assertTrue(tester.isMarkedFinished(secondWindow));
+  }
+
+  /**
+   * Tests that when windows merge, if the trigger is waiting for "N millis 
after the first
+   * element" that it is relative to the earlier of the two merged windows.
+   */
+  @Test
+  public void testClear() throws Exception {
+    SimpleTriggerStateMachineTester<IntervalWindow> tester = 
TriggerStateMachineTester.forTrigger(
+        AfterProcessingTimeStateMachine
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        FixedWindows.of(Duration.millis(10)));
+
+    tester.injectElements(1, 2, 3);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.clearState(window);
+    tester.assertCleared(window);
+  }
+
+  @Test
+  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+    SimpleTriggerStateMachineTester<IntervalWindow> tester = 
TriggerStateMachineTester.forTrigger(
+        AfterProcessingTimeStateMachine
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        Sessions.withGapDuration(Duration.millis(10)));
+
+    tester.advanceProcessingTime(new Instant(10));
+    tester.injectElements(1); // in [1, 11), timer for 15
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.advanceProcessingTime(new Instant(12));
+    tester.injectElements(3); // in [3, 13), timer for 17
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new 
Instant(13));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(13));
+
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+
+  /**
+   * Basic test of compatibility check between identical triggers.
+   */
+  @Test
+  public void testCompatibilityIdentical() throws Exception {
+    TriggerStateMachine t1 = 
AfterProcessingTimeStateMachine.pastFirstElementInPane()
+            .plusDelayOf(Duration.standardMinutes(1L));
+    TriggerStateMachine t2 = 
AfterProcessingTimeStateMachine.pastFirstElementInPane()
+            .plusDelayOf(Duration.standardMinutes(1L));
+    assertTrue(t1.isCompatible(t2));
+  }
+
+  @Test
+  public void testToString() {
+    TriggerStateMachine trigger = 
AfterProcessingTimeStateMachine.pastFirstElementInPane();
+    assertEquals("AfterProcessingTime.pastFirstElementInPane()", 
trigger.toString());
+  }
+
+  @Test
+  public void testWithDelayToString() {
+    TriggerStateMachine trigger = 
AfterProcessingTimeStateMachine.pastFirstElementInPane()
+        .plusDelayOf(Duration.standardMinutes(5));
+
+    assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 
minutes)",
+        trigger.toString());
+  }
+
+  @Test
+  public void testBuiltUpToString() {
+    TriggerStateMachine trigger = AfterWatermarkStateMachine.pastEndOfWindow()
+        .withLateFirings(AfterProcessingTimeStateMachine
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.standardMinutes(10)));
+
+    String expected = "AfterWatermark.pastEndOfWindow()"
+        + ".withLateFirings(AfterProcessingTime"
+        + ".pastFirstElementInPane()"
+        + ".plusDelayOf(10 minutes))";
+
+    assertEquals(expected, trigger.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69b1efda/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
new file mode 100644
index 0000000..140bd62
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachineTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests the {@link AfterSynchronizedProcessingTimeStateMachine}.
+ */
+@RunWith(JUnit4.class)
+public class AfterSynchronizedProcessingTimeStateMachineTest {
+
+  private TriggerStateMachine underTest = new 
AfterSynchronizedProcessingTimeStateMachine();
+
+  @Test
+  public void testAfterProcessingTimeWithFixedWindows() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerStateMachineTester<IntervalWindow> tester = 
TriggerStateMachineTester.forTrigger(
+        AfterProcessingTimeStateMachine
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        FixedWindows.of(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+
+    // Timer at 15
+    tester.injectElements(1);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
+    tester.advanceProcessingTime(new Instant(12));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    // Load up elements in the next window, timer at 17 for them
+    tester.injectElements(11, 12, 13);
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Not quite time to fire
+    tester.advanceProcessingTime(new Instant(14));
+    assertFalse(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    // Timer at 19 for these in the first window; it should be ignored since 
the 15 will fire first
+    tester.injectElements(2, 3);
+
+    // Advance past the first timer and fire, finishing the first window
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(firstWindow));
+    assertFalse(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(firstWindow);
+    assertTrue(tester.isMarkedFinished(firstWindow));
+
+    // The next window fires and finishes now
+    tester.advanceProcessingTime(new Instant(18));
+    assertTrue(tester.shouldFire(secondWindow));
+    tester.fireIfShouldFire(secondWindow);
+    assertTrue(tester.isMarkedFinished(secondWindow));
+  }
+
+  @Test
+  public void testAfterProcessingTimeWithMergingWindow() throws Exception {
+    Duration windowDuration = Duration.millis(10);
+    SimpleTriggerStateMachineTester<IntervalWindow> tester = 
TriggerStateMachineTester.forTrigger(
+        AfterProcessingTimeStateMachine
+            .pastFirstElementInPane()
+            .plusDelayOf(Duration.millis(5)),
+        Sessions.withGapDuration(windowDuration));
+
+    tester.advanceProcessingTime(new Instant(10));
+    tester.injectElements(1); // in [1, 11), timer for 15
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new 
Instant(11));
+    assertFalse(tester.shouldFire(firstWindow));
+
+    tester.advanceProcessingTime(new Instant(12));
+    tester.injectElements(3); // in [3, 13), timer for 17
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new 
Instant(13));
+    assertFalse(tester.shouldFire(secondWindow));
+
+    tester.mergeWindows();
+    IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new 
Instant(13));
+
+    tester.advanceProcessingTime(new Instant(16));
+    assertTrue(tester.shouldFire(mergedWindow));
+  }
+}

Reply via email to