Repository: incubator-beam
Updated Branches:
  refs/heads/master 7d1976b26 -> ffe3ab3d6


Revert "Move InMemoryTimerInternals to runners-core"

This reverts commit ec0bf7b4023ff75f4ec6723d2e77ed507eb57c51.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45ed5c70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45ed5c70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45ed5c70

Branch: refs/heads/master
Commit: 45ed5c70c18a806d0fc2e7385886285206fd18e4
Parents: 954e57d
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Dec 16 16:33:51 2016 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Dec 16 16:39:20 2016 -0800

----------------------------------------------------------------------
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   1 +
 .../runners/core/InMemoryTimerInternals.java    | 276 -------------------
 .../core/InMemoryTimerInternalsTest.java        | 155 -----------
 .../beam/runners/core/ReduceFnTester.java       |   1 +
 .../beam/runners/core/SplittableParDoTest.java  |  16 +-
 .../triggers/TriggerStateMachineTester.java     |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  36 +++
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 ++++++++++++++++++
 .../util/state/InMemoryTimerInternalsTest.java  | 153 ++++++++++
 10 files changed, 471 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index efcd771..9189191 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
deleted file mode 100644
index b22fcb3..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowTracing;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.joda.time.Instant;
-
-/**
- * Simulates the firing of timers and progression of input and output 
watermarks for a single
- * computation and key in a Windmill-like streaming environment.
- */
-public class InMemoryTimerInternals implements TimerInternals {
-
-  /** At most one timer per timestamp is kept. */
-  private Set<TimerData> existingTimers = new HashSet<>();
-
-  /** Pending input watermark timers, in timestamp order. */
-  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-
-  /** Pending processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
-  /** Pending synchronized processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> synchronizedProcessingTimers = new 
PriorityQueue<>(11);
-
-  /** Current input watermark. */
-  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  /** Current output watermark. */
-  @Nullable private Instant outputWatermarkTime = null;
-
-  /** Current processing time. */
-  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  /** Current synchronized processing time. */
-  private Instant synchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    return outputWatermarkTime;
-  }
-
-  /**
-   * Returns when the next timer in the given time domain will fire, or {@code 
null}
-   * if there are no timers scheduled in that time domain.
-   */
-  @Nullable
-  public Instant getNextTimer(TimeDomain domain) {
-    final TimerData data;
-    switch (domain) {
-      case EVENT_TIME:
-        data = watermarkTimers.peek();
-        break;
-      case PROCESSING_TIME:
-        data = processingTimers.peek();
-        break;
-      case SYNCHRONIZED_PROCESSING_TIME:
-        data = synchronizedProcessingTimers.peek();
-        break;
-      default:
-        throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
-    }
-    return (data == null) ? null : data.getTimestamp();
-  }
-
-  private PriorityQueue<TimerData> queue(TimeDomain domain) {
-    switch (domain) {
-      case EVENT_TIME:
-        return watermarkTimers;
-      case PROCESSING_TIME:
-        return processingTimers;
-      case SYNCHRONIZED_PROCESSING_TIME:
-        return synchronizedProcessingTimers;
-      default:
-        throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
-    }
-  }
-
-  @Override
-  public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
-      TimeDomain timeDomain) {
-    throw new UnsupportedOperationException("Setting a timer by ID is not yet 
supported.");
-  }
-
-  @Override
-  public void setTimer(TimerData timerData) {
-    WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), 
timerData);
-    if (existingTimers.add(timerData)) {
-      queue(timerData.getDomain()).add(timerData);
-    }
-  }
-
-  @Override
-  public void deleteTimer(StateNamespace namespace, String timerId) {
-    throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
-  }
-
-  @Override
-  public void deleteTimer(TimerData timer) {
-    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), 
timer);
-    existingTimers.remove(timer);
-    queue(timer.getDomain()).remove(timer);
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTime;
-  }
-
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return synchronizedProcessingTime;
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return inputWatermarkTime;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("watermarkTimers", watermarkTimers)
-        .add("processingTimers", processingTimers)
-        .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
-        .add("inputWatermarkTime", inputWatermarkTime)
-        .add("outputWatermarkTime", outputWatermarkTime)
-        .add("processingTime", processingTime)
-        .toString();
-  }
-
-  /** Advances input watermark to the given value. */
-  public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
-    checkNotNull(newInputWatermark);
-    checkState(
-        !newInputWatermark.isBefore(inputWatermarkTime),
-        "Cannot move input watermark time backwards from %s to %s",
-        inputWatermarkTime,
-        newInputWatermark);
-    WindowTracing.trace(
-        "{}.advanceInputWatermark: from {} to {}",
-        getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
-    inputWatermarkTime = newInputWatermark;
-  }
-
-  /** Advances output watermark to the given value. */
-  public void advanceOutputWatermark(Instant newOutputWatermark) {
-    checkNotNull(newOutputWatermark);
-    final Instant adjustedOutputWatermark;
-    if (newOutputWatermark.isAfter(inputWatermarkTime)) {
-      WindowTracing.trace(
-          "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
-          getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
-      adjustedOutputWatermark = inputWatermarkTime;
-    } else {
-      adjustedOutputWatermark = newOutputWatermark;
-    }
-
-    checkState(
-        outputWatermarkTime == null || 
!adjustedOutputWatermark.isBefore(outputWatermarkTime),
-        "Cannot move output watermark time backwards from %s to %s",
-        outputWatermarkTime,
-        adjustedOutputWatermark);
-    WindowTracing.trace(
-        "{}.advanceOutputWatermark: from {} to {}",
-        getClass().getSimpleName(), outputWatermarkTime, 
adjustedOutputWatermark);
-    outputWatermarkTime = adjustedOutputWatermark;
-  }
-
-  /** Advances processing time to the given value. */
-  public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
-    checkNotNull(newProcessingTime);
-    checkState(
-        !newProcessingTime.isBefore(processingTime),
-        "Cannot move processing time backwards from %s to %s",
-        processingTime,
-        newProcessingTime);
-    WindowTracing.trace(
-        "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(), processingTime, newProcessingTime);
-    processingTime = newProcessingTime;
-  }
-
-  /** Advances synchronized processing time to the given value. */
-  public void advanceSynchronizedProcessingTime(Instant 
newSynchronizedProcessingTime)
-      throws Exception {
-    checkNotNull(newSynchronizedProcessingTime);
-    checkState(
-        !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
-        "Cannot move processing time backwards from %s to %s",
-        synchronizedProcessingTime,
-        newSynchronizedProcessingTime);
-    WindowTracing.trace(
-        "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(), synchronizedProcessingTime, 
newSynchronizedProcessingTime);
-    synchronizedProcessingTime = newSynchronizedProcessingTime;
-  }
-
-  /** Returns the next eligible event time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextEventTimer() {
-    TimerData timer = removeNextTimer(inputWatermarkTime, 
TimeDomain.EVENT_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextEventTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, inputWatermarkTime);
-    }
-    return timer;
-  }
-
-  /** Returns the next eligible processing time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextProcessingTimer() {
-    TimerData timer = removeNextTimer(processingTime, 
TimeDomain.PROCESSING_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, processingTime);
-    }
-    return timer;
-  }
-
-  /** Returns the next eligible synchronized processing time timer, if none 
returns null. */
-  @Nullable
-  public TimerData removeNextSynchronizedProcessingTimer() {
-    TimerData timer = removeNextTimer(
-        synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, synchronizedProcessingTime);
-    }
-    return timer;
-  }
-
-  @Nullable
-  private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
-    PriorityQueue<TimerData> queue = queue(domain);
-    if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
-      TimerData timer = queue.remove();
-      existingTimers.remove(timer);
-      return timer;
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
deleted file mode 100644
index 2caa874..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaceForTest;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link InMemoryTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class InMemoryTimerInternalsTest {
-
-  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
-
-  @Test
-  public void testFiringTimers() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
-
-    underTest.advanceProcessingTime(new Instant(20));
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-
-    // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(new Instant(21));
-    assertNull(underTest.removeNextProcessingTimer());
-
-    // Adding the timer and advancing a little should refire
-    underTest.setTimer(processingTime1);
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-
-    // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(new Instant(30));
-    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-  }
-
-  @Test
-  public void testFiringTimersWithCallback() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
-
-    underTest.advanceProcessingTime(new Instant(20));
-    assertThat(underTest.removeNextProcessingTimer(), 
equalTo(processingTime1));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
-    // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(new Instant(21));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
-    // Adding the timer and advancing a little should fire again
-    underTest.setTimer(processingTime1);
-    underTest.advanceProcessingTime(new Instant(21));
-    assertThat(underTest.removeNextProcessingTimer(), 
equalTo(processingTime1));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
-    // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(new Instant(30));
-    assertThat(underTest.removeNextProcessingTimer(), 
equalTo(processingTime2));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-  }
-
-  @Test
-  public void testTimerOrdering() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData eventTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTime1 = TimerData.of(
-        NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData eventTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.EVENT_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTime2 = TimerData.of(
-        NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(eventTime1);
-    underTest.setTimer(synchronizedProcessingTime1);
-    underTest.setTimer(processingTime2);
-    underTest.setTimer(eventTime2);
-    underTest.setTimer(synchronizedProcessingTime2);
-
-    assertNull(underTest.removeNextEventTimer());
-    underTest.advanceInputWatermark(new Instant(30));
-    assertEquals(eventTime1, underTest.removeNextEventTimer());
-    assertEquals(eventTime2, underTest.removeNextEventTimer());
-    assertNull(underTest.removeNextEventTimer());
-
-    assertNull(underTest.removeNextProcessingTimer());
-    underTest.advanceProcessingTime(new Instant(30));
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-
-    assertNull(underTest.removeNextSynchronizedProcessingTimer());
-    underTest.advanceSynchronizedProcessingTime(new Instant(30));
-    assertEquals(synchronizedProcessingTime1, 
underTest.removeNextSynchronizedProcessingTimer());
-    assertEquals(synchronizedProcessingTime2, 
underTest.removeNextSynchronizedProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-  }
-
-  @Test
-  public void testDeduplicate() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData eventTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
-    TimerData processingTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
-    underTest.setTimer(eventTime);
-    underTest.setTimer(eventTime);
-    underTest.setTimer(processingTime);
-    underTest.setTimer(processingTime);
-    underTest.advanceProcessingTime(new Instant(20));
-    underTest.advanceInputWatermark(new Instant(20));
-
-    assertEquals(processingTime, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-    assertEquals(eventTime, underTest.removeNextEventTimer());
-    assertNull(underTest.removeNextEventTimer());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 890195a..db0cf91 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -73,6 +73,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 41d419b..cf96b66 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -28,7 +28,6 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -191,8 +190,6 @@ public class SplittableParDoTest {
         tester;
     private Instant currentProcessingTime;
 
-    private InMemoryTimerInternals timerInternals;
-
     ProcessFnTester(
         Instant currentProcessingTime,
         DoFn<InputT, OutputT> fn,
@@ -203,7 +200,6 @@ public class SplittableParDoTest {
           new SplittableParDo.ProcessFn<>(
               fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
       this.tester = DoFnTester.of(processFn);
-      this.timerInternals = new InMemoryTimerInternals();
       processFn.setStateInternalsFactory(
           new StateInternalsFactory<String>() {
             @Override
@@ -215,7 +211,7 @@ public class SplittableParDoTest {
           new TimerInternalsFactory<String>() {
             @Override
             public TimerInternals timerInternalsForKey(String key) {
-              return timerInternals;
+              return tester.getTimerInternals();
             }
           });
       processFn.setOutputWindowedValue(
@@ -251,7 +247,7 @@ public class SplittableParDoTest {
       // through the state/timer/output callbacks.
       this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
       this.tester.startBundle();
-      timerInternals.advanceProcessingTime(currentProcessingTime);
+      this.tester.advanceProcessingTime(currentProcessingTime);
 
       this.currentProcessingTime = currentProcessingTime;
     }
@@ -289,13 +285,7 @@ public class SplittableParDoTest {
      */
     boolean advanceProcessingTimeBy(Duration duration) throws Exception {
       currentProcessingTime = currentProcessingTime.plus(duration);
-      timerInternals.advanceProcessingTime(currentProcessingTime);
-
-      List<TimerInternals.TimerData> timers = new ArrayList<>();
-      TimerInternals.TimerData nextTimer;
-      while ((nextTimer = timerInternals.removeNextProcessingTimer()) != null) 
{
-        timers.add(nextTimer);
-      }
+      List<TimerInternals.TimerData> timers = 
tester.advanceProcessingTime(currentProcessingTime);
       if (timers.isEmpty()) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 2a626d4..be63c06 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -34,7 +34,6 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.ActiveWindowSet;
 import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback;
-import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.MergingActiveWindowSet;
 import org.apache.beam.runners.core.NonMergingActiveWindowSet;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -47,6 +46,7 @@ import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 5432d58..87d3f50 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
-import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -38,6 +37,7 @@ import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 2d8684a..93b3f59 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -46,10 +46,12 @@ import 
org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -141,6 +143,10 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
     return (StateInternals<K>) stateInternals;
   }
 
+  public TimerInternals getTimerInternals() {
+    return timerInternals;
+  }
+
   /**
    * When a {@link DoFnTester} should clone the {@link DoFn} under test and 
how it should manage
    * the lifecycle of the {@link DoFn}.
@@ -227,6 +233,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
     context.setupDelegateAggregators();
     // State and timer internals are per-bundle.
     stateInternals = InMemoryStateInternals.forKey(new Object());
+    timerInternals = new InMemoryTimerInternals();
     try {
       fnInvoker.invokeStartBundle(context);
     } catch (UserCodeException e) {
@@ -535,6 +542,34 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
     return extractAggregatorValue(agg.getName(), agg.getCombineFn());
   }
 
+  public List<TimerInternals.TimerData> advanceInputWatermark(Instant 
newWatermark) {
+    try {
+      timerInternals.advanceInputWatermark(newWatermark);
+      final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        firedTimers.add(timer);
+      }
+      return firedTimers;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public List<TimerInternals.TimerData> advanceProcessingTime(Instant 
newProcessingTime) {
+    try {
+      timerInternals.advanceProcessingTime(newProcessingTime);
+      final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        firedTimers.add(timer);
+      }
+      return firedTimers;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private <AccumT, AggregateT> AggregateT extractAggregatorValue(
       String name, CombineFn<?, AccumT, AggregateT> combiner) {
     @SuppressWarnings("unchecked")
@@ -779,6 +814,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
   private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
 
   private InMemoryStateInternals<?> stateInternals;
+  private InMemoryTimerInternals timerInternals;
 
   /** The state of processing of the {@link DoFn} under test. */
   private State state = State.UNINITIALIZED;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
new file mode 100644
index 0000000..44b44f0
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.joda.time.Instant;
+
+/**
+ * Simulates the firing of timers and progression of input and output 
watermarks for a single
+ * computation and key in a Windmill-like streaming environment.
+ */
+public class InMemoryTimerInternals implements TimerInternals {
+
+  /** At most one timer per timestamp is kept. */
+  private Set<TimerData> existingTimers = new HashSet<>();
+
+  /** Pending input watermark timers, in timestamp order. */
+  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+
+  /** Pending processing time timers, in timestamp order. */
+  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+  /** Pending synchronized processing time timers, in timestamp order. */
+  private PriorityQueue<TimerData> synchronizedProcessingTimers = new 
PriorityQueue<>(11);
+
+  /** Current input watermark. */
+  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current output watermark. */
+  @Nullable private Instant outputWatermarkTime = null;
+
+  /** Current processing time. */
+  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current synchronized processing time. */
+  private Instant synchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    return outputWatermarkTime;
+  }
+
+  /**
+   * Returns when the next timer in the given time domain will fire, or {@code 
null}
+   * if there are no timers scheduled in that time domain.
+   */
+  @Nullable
+  public Instant getNextTimer(TimeDomain domain) {
+    final TimerData data;
+    switch (domain) {
+      case EVENT_TIME:
+        data = watermarkTimers.peek();
+        break;
+      case PROCESSING_TIME:
+        data = processingTimers.peek();
+        break;
+      case SYNCHRONIZED_PROCESSING_TIME:
+        data = synchronizedProcessingTimers.peek();
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
+    }
+    return (data == null) ? null : data.getTimestamp();
+  }
+
+  private PriorityQueue<TimerData> queue(TimeDomain domain) {
+    switch (domain) {
+      case EVENT_TIME:
+        return watermarkTimers;
+      case PROCESSING_TIME:
+        return processingTimers;
+      case SYNCHRONIZED_PROCESSING_TIME:
+        return synchronizedProcessingTimers;
+      default:
+        throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
+    }
+  }
+
+  @Override
+  public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("Setting a timer by ID is not yet 
supported.");
+  }
+
+  @Override
+  public void setTimer(TimerData timerData) {
+    WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), 
timerData);
+    if (existingTimers.add(timerData)) {
+      queue(timerData.getDomain()).add(timerData);
+    }
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
+  }
+
+  @Override
+  public void deleteTimer(TimerData timer) {
+    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), 
timer);
+    existingTimers.remove(timer);
+    queue(timer.getDomain()).remove(timer);
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return processingTime;
+  }
+
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return synchronizedProcessingTime;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return inputWatermarkTime;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(getClass())
+        .add("watermarkTimers", watermarkTimers)
+        .add("processingTimers", processingTimers)
+        .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
+        .add("inputWatermarkTime", inputWatermarkTime)
+        .add("outputWatermarkTime", outputWatermarkTime)
+        .add("processingTime", processingTime)
+        .toString();
+  }
+
+  /** Advances input watermark to the given value. */
+  public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
+    checkNotNull(newInputWatermark);
+    checkState(
+        !newInputWatermark.isBefore(inputWatermarkTime),
+        "Cannot move input watermark time backwards from %s to %s",
+        inputWatermarkTime,
+        newInputWatermark);
+    WindowTracing.trace(
+        "{}.advanceInputWatermark: from {} to {}",
+        getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
+    inputWatermarkTime = newInputWatermark;
+  }
+
+  /** Advances output watermark to the given value. */
+  public void advanceOutputWatermark(Instant newOutputWatermark) {
+    checkNotNull(newOutputWatermark);
+    final Instant adjustedOutputWatermark;
+    if (newOutputWatermark.isAfter(inputWatermarkTime)) {
+      WindowTracing.trace(
+          "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
+          getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
+      adjustedOutputWatermark = inputWatermarkTime;
+    } else {
+      adjustedOutputWatermark = newOutputWatermark;
+    }
+
+    checkState(
+        outputWatermarkTime == null || 
!adjustedOutputWatermark.isBefore(outputWatermarkTime),
+        "Cannot move output watermark time backwards from %s to %s",
+        outputWatermarkTime,
+        adjustedOutputWatermark);
+    WindowTracing.trace(
+        "{}.advanceOutputWatermark: from {} to {}",
+        getClass().getSimpleName(), outputWatermarkTime, 
adjustedOutputWatermark);
+    outputWatermarkTime = adjustedOutputWatermark;
+  }
+
+  /** Advances processing time to the given value. */
+  public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
+    checkNotNull(newProcessingTime);
+    checkState(
+        !newProcessingTime.isBefore(processingTime),
+        "Cannot move processing time backwards from %s to %s",
+        processingTime,
+        newProcessingTime);
+    WindowTracing.trace(
+        "{}.advanceProcessingTime: from {} to {}",
+        getClass().getSimpleName(), processingTime, newProcessingTime);
+    processingTime = newProcessingTime;
+  }
+
+  /** Advances synchronized processing time to the given value. */
+  public void advanceSynchronizedProcessingTime(Instant 
newSynchronizedProcessingTime)
+      throws Exception {
+    checkNotNull(newSynchronizedProcessingTime);
+    checkState(
+        !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
+        "Cannot move processing time backwards from %s to %s",
+        synchronizedProcessingTime,
+        newSynchronizedProcessingTime);
+    WindowTracing.trace(
+        "{}.advanceProcessingTime: from {} to {}",
+        getClass().getSimpleName(), synchronizedProcessingTime, 
newSynchronizedProcessingTime);
+    synchronizedProcessingTime = newSynchronizedProcessingTime;
+  }
+
+  /** Returns the next eligible event time timer, if none returns null. */
+  @Nullable
+  public TimerData removeNextEventTimer() {
+    TimerData timer = removeNextTimer(inputWatermarkTime, 
TimeDomain.EVENT_TIME);
+    if (timer != null) {
+      WindowTracing.trace(
+          "{}.removeNextEventTimer: firing {} at {}",
+          getClass().getSimpleName(), timer, inputWatermarkTime);
+    }
+    return timer;
+  }
+
+  /** Returns the next eligible processing time timer, if none returns null. */
+  @Nullable
+  public TimerData removeNextProcessingTimer() {
+    TimerData timer = removeNextTimer(processingTime, 
TimeDomain.PROCESSING_TIME);
+    if (timer != null) {
+      WindowTracing.trace(
+          "{}.removeNextProcessingTimer: firing {} at {}",
+          getClass().getSimpleName(), timer, processingTime);
+    }
+    return timer;
+  }
+
+  /** Returns the next eligible synchronized processing time timer, if none 
returns null. */
+  @Nullable
+  public TimerData removeNextSynchronizedProcessingTimer() {
+    TimerData timer = removeNextTimer(
+        synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    if (timer != null) {
+      WindowTracing.trace(
+          "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
+          getClass().getSimpleName(), timer, synchronizedProcessingTime);
+    }
+    return timer;
+  }
+
+  @Nullable
+  private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
+    PriorityQueue<TimerData> queue = queue(domain);
+    if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
+      TimerData timer = queue.remove();
+      existingTimers.remove(timer);
+      return timer;
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ed5c70/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
new file mode 100644
index 0000000..4a2763c
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link InMemoryTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InMemoryTimerInternalsTest {
+
+  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
+
+  @Test
+  public void testFiringTimers() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(processingTime2);
+
+    underTest.advanceProcessingTime(new Instant(20));
+    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+
+    // Advancing just a little shouldn't refire
+    underTest.advanceProcessingTime(new Instant(21));
+    assertNull(underTest.removeNextProcessingTimer());
+
+    // Adding the timer and advancing a little should refire
+    underTest.setTimer(processingTime1);
+    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+
+    // And advancing the rest of the way should still have the other timer
+    underTest.advanceProcessingTime(new Instant(30));
+    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+  }
+
+  @Test
+  public void testFiringTimersWithCallback() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(processingTime2);
+
+    underTest.advanceProcessingTime(new Instant(20));
+    assertThat(underTest.removeNextProcessingTimer(), 
equalTo(processingTime1));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+    // Advancing just a little shouldn't refire
+    underTest.advanceProcessingTime(new Instant(21));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+    // Adding the timer and advancing a little should fire again
+    underTest.setTimer(processingTime1);
+    underTest.advanceProcessingTime(new Instant(21));
+    assertThat(underTest.removeNextProcessingTimer(), 
equalTo(processingTime1));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+    // And advancing the rest of the way should still have the other timer
+    underTest.advanceProcessingTime(new Instant(30));
+    assertThat(underTest.removeNextProcessingTimer(), 
equalTo(processingTime2));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+  }
+
+  @Test
+  public void testTimerOrdering() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData eventTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTime1 = TimerData.of(
+        NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    TimerData eventTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.EVENT_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTime2 = TimerData.of(
+        NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(eventTime1);
+    underTest.setTimer(synchronizedProcessingTime1);
+    underTest.setTimer(processingTime2);
+    underTest.setTimer(eventTime2);
+    underTest.setTimer(synchronizedProcessingTime2);
+
+    assertNull(underTest.removeNextEventTimer());
+    underTest.advanceInputWatermark(new Instant(30));
+    assertEquals(eventTime1, underTest.removeNextEventTimer());
+    assertEquals(eventTime2, underTest.removeNextEventTimer());
+    assertNull(underTest.removeNextEventTimer());
+
+    assertNull(underTest.removeNextProcessingTimer());
+    underTest.advanceProcessingTime(new Instant(30));
+    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+
+    assertNull(underTest.removeNextSynchronizedProcessingTimer());
+    underTest.advanceSynchronizedProcessingTime(new Instant(30));
+    assertEquals(synchronizedProcessingTime1, 
underTest.removeNextSynchronizedProcessingTimer());
+    assertEquals(synchronizedProcessingTime2, 
underTest.removeNextSynchronizedProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+  }
+
+  @Test
+  public void testDeduplicate() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData eventTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
+    TimerData processingTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
+    underTest.setTimer(eventTime);
+    underTest.setTimer(eventTime);
+    underTest.setTimer(processingTime);
+    underTest.setTimer(processingTime);
+    underTest.advanceProcessingTime(new Instant(20));
+    underTest.advanceInputWatermark(new Instant(20));
+
+    assertEquals(processingTime, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+    assertEquals(eventTime, underTest.removeNextEventTimer());
+    assertNull(underTest.removeNextEventTimer());
+  }
+}

Reply via email to