http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManagerTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManagerTest.java
deleted file mode 100644
index d4979f2..0000000
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InMemoryWatermarkManagerTest.java
+++ /dev/null
@@ -1,1099 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess.util;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.FiredTimers;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TransformWatermarks;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Filter;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TimestampedValue;
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.joda.time.Instant;
-import org.joda.time.ReadableInstant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Tests for {@link InMemoryWatermarkManager}.
- */
-@RunWith(JUnit4.class)
-public class InMemoryWatermarkManagerTest implements Serializable {
-  private transient MockClock clock;
-
-  private transient PCollection<Integer> createdInts;
-
-  private transient PCollection<Integer> filtered;
-  private transient PCollection<Integer> filteredTimesTwo;
-  private transient PCollection<KV<String, Integer>> keyed;
-
-  private transient PCollection<Integer> intsToFlatten;
-  private transient PCollection<Integer> flattened;
-
-  private transient InMemoryWatermarkManager manager;
-
-  @Before
-  public void setup() {
-    TestPipeline p = TestPipeline.create();
-
-    createdInts = p.apply("createdInts", Create.of(1, 2, 3));
-
-    filtered = createdInts.apply("filtered", Filter.greaterThan(1));
-    filteredTimesTwo = filtered.apply("timesTwo", ParDo.of(new DoFn<Integer, 
Integer>() {
-      @Override
-      public void processElement(DoFn<Integer, Integer>.ProcessContext c) 
throws Exception {
-        c.output(c.element() * 2);
-      }
-    }));
-
-    keyed = createdInts.apply("keyed", WithKeys.<String, Integer>of("MyKey"));
-
-    intsToFlatten = p.apply("intsToFlatten", Create.of(-1, 256, 65535));
-    PCollectionList<Integer> preFlatten = 
PCollectionList.of(createdInts).and(intsToFlatten);
-    flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections());
-
-    Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
-            createdInts.getProducingTransformInternal(),
-            intsToFlatten.getProducingTransformInternal());
-
-    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers = new 
HashMap<>();
-    consumers.put(
-        createdInts,
-        ImmutableList.<AppliedPTransform<?, ?, 
?>>of(filtered.getProducingTransformInternal(),
-            keyed.getProducingTransformInternal(), 
flattened.getProducingTransformInternal()));
-    consumers.put(
-        filtered,
-        Collections.<AppliedPTransform<?, ?, ?>>singleton(
-            filteredTimesTwo.getProducingTransformInternal()));
-    consumers.put(filteredTimesTwo, Collections.<AppliedPTransform<?, ?, 
?>>emptyList());
-    consumers.put(keyed, Collections.<AppliedPTransform<?, ?, ?>>emptyList());
-
-    consumers.put(
-        intsToFlatten,
-        Collections.<AppliedPTransform<?, ?, ?>>singleton(
-            flattened.getProducingTransformInternal()));
-    consumers.put(flattened, Collections.<AppliedPTransform<?, ?, 
?>>emptyList());
-
-    clock = MockClock.fromInstant(new Instant(1000));
-
-    manager = InMemoryWatermarkManager.create(clock, rootTransforms, 
consumers);
-  }
-
-  /**
-   * Demonstrates that getWatermark, when called on an {@link 
AppliedPTransform} that has not
-   * processed any elements, returns the {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}.
-   */
-  @Test
-  public void getWatermarkForUntouchedTransform() {
-    TransformWatermarks watermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-
-    assertThat(watermarks.getInputWatermark(), 
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-    assertThat(watermarks.getOutputWatermark(), 
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-  }
-
-  /**
-   * Demonstrates that getWatermark for a transform that consumes no input 
uses the Watermark
-   * Hold value provided to it as the output watermark.
-   */
-  @Test
-  public void getWatermarkForUpdatedSourceTransform() {
-    CommittedBundle<Integer> output = globallyWindowedBundle(createdInts, 1);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(output), new Instant(8000L));
-    TransformWatermarks updatedSourceWatermark =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-
-    assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new 
Instant(8000L)));
-  }
-
-  /**
-   * Demonstrates that getWatermark for a transform that takes multiple inputs 
is held to the
-   * minimum watermark across all of its inputs.
-   */
-  @Test
-  public void getWatermarkForMultiInputTransform() {
-    CommittedBundle<Integer> secondPcollectionBundle = 
globallyWindowedBundle(intsToFlatten, -1);
-
-    manager.updateWatermarks(null, 
intsToFlatten.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    // We didn't do anything for the first source, so we shouldn't have 
progressed the watermark
-    TransformWatermarks firstSourceWatermark =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(
-        firstSourceWatermark.getOutputWatermark(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    // the Second Source output all of the elements so it should be done (with 
a watermark at the
-    // end of time).
-    TransformWatermarks secondSourceWatermark =
-        manager.getWatermarks(intsToFlatten.getProducingTransformInternal());
-    assertThat(
-        secondSourceWatermark.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    // We haven't consumed anything yet, so our watermark should be at the 
beginning of time
-    TransformWatermarks transformWatermark =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
-    assertThat(
-        transformWatermark.getInputWatermark(), 
not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        transformWatermark.getOutputWatermark(), 
not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    CommittedBundle<Integer> flattenedBundleSecondCreate = 
globallyWindowedBundle(flattened, -1);
-    // We have finished processing the bundle from the second PCollection, but 
we haven't consumed
-    // anything from the first PCollection yet; so our watermark shouldn't 
advance
-    manager.updateWatermarks(secondPcollectionBundle, 
flattened.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
-        null);
-    TransformWatermarks transformAfterProcessing =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
-    manager.updateWatermarks(secondPcollectionBundle, 
flattened.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate),
-        null);
-    assertThat(
-        transformAfterProcessing.getInputWatermark(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        transformAfterProcessing.getOutputWatermark(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    Instant firstCollectionTimestamp = new Instant(10000);
-    CommittedBundle<Integer> firstPcollectionBundle =
-        timestampedBundle(createdInts, TimestampedValue.<Integer>of(5, 
firstCollectionTimestamp));
-    // the source is done, but elements are still buffered. The source output 
watermark should be
-    // past the end of the global window
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle),
-        new Instant(Long.MAX_VALUE));
-    TransformWatermarks firstSourceWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(
-        firstSourceWatermarks.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    // We still haven't consumed any of the first source's input, so the 
watermark should still not
-    // progress
-    TransformWatermarks flattenAfterSourcesProduced =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
-    assertThat(
-        flattenAfterSourcesProduced.getInputWatermark(), 
not(laterThan(firstCollectionTimestamp)));
-    assertThat(
-        flattenAfterSourcesProduced.getOutputWatermark(), 
not(laterThan(firstCollectionTimestamp)));
-
-    // We have buffered inputs, but since the PCollection has all of the 
elements (has a WM past the
-    // end of the global window), we should have a watermark equal to the min 
among buffered
-    // elements
-    TransformWatermarks withBufferedElements =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
-    assertThat(withBufferedElements.getInputWatermark(), 
equalTo(firstCollectionTimestamp));
-    assertThat(withBufferedElements.getOutputWatermark(), 
equalTo(firstCollectionTimestamp));
-
-    CommittedBundle<?> completedFlattenBundle =
-        
InProcessBundle.unkeyed(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(firstPcollectionBundle, 
flattened.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(completedFlattenBundle),
-        null);
-    TransformWatermarks afterConsumingAllInput =
-        manager.getWatermarks(flattened.getProducingTransformInternal());
-    assertThat(
-        afterConsumingAllInput.getInputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        afterConsumingAllInput.getOutputWatermark(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that pending elements are independent among
-   * {@link AppliedPTransform AppliedPTransforms} that consume the same input 
{@link PCollection}.
-   */
-  @Test
-  public void getWatermarkForMultiConsumedCollection() {
-    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, new Instant(1_000_000L)), 
TimestampedValue.of(2, new Instant(1234L)),
-        TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new 
Instant(Long.MAX_VALUE));
-    TransformWatermarks createdAfterProducing =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(
-        createdAfterProducing.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new 
Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(createdBundle, 
keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(keyBundle), null);
-    TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
-    assertThat(
-        keyedWatermarks.getInputWatermark(), 
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        keyedWatermarks.getOutputWatermark(), 
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(filteredWatermarks.getInputWatermark(), not(laterThan(new 
Instant(-1000L))));
-    assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new 
Instant(-1000L))));
-
-    CommittedBundle<Integer> filteredBundle =
-        timestampedBundle(filtered, TimestampedValue.of(2, new 
Instant(1234L)));
-    manager.updateWatermarks(createdBundle, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(filteredBundle), null);
-    TransformWatermarks filteredProcessedWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(
-        filteredProcessedWatermarks.getInputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        filteredProcessedWatermarks.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that the watermark of an {@link AppliedPTransform} is held 
to the provided
-   * watermark hold.
-   */
-  @Test
-  public void updateWatermarkWithWatermarkHolds() {
-    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, new Instant(1_000_000L)), 
TimestampedValue.of(2, new Instant(1234L)),
-        TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new 
Instant(Long.MAX_VALUE));
-
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new 
Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(createdBundle, 
keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(keyBundle),
-        new Instant(500L));
-    TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
-    assertThat(
-        keyedWatermarks.getInputWatermark(), 
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new 
Instant(500L))));
-  }
-
-  /**
-   * Demonstrates that the watermark of an {@link AppliedPTransform} is held 
to the provided
-   * watermark hold.
-   */
-  @Test
-  public void updateWatermarkWithKeyedWatermarkHolds() {
-    CommittedBundle<Integer> firstKeyBundle =
-        InProcessBundle.keyed(createdInts, "Odd")
-            .add(WindowedValue.timestampedValueInGlobalWindow(1, new 
Instant(1_000_000L)))
-            .add(WindowedValue.timestampedValueInGlobalWindow(3, new 
Instant(-1000L)))
-            .commit(clock.now());
-
-    CommittedBundle<Integer> secondKeyBundle =
-        InProcessBundle.keyed(createdInts, "Even")
-            .add(WindowedValue.timestampedValueInGlobalWindow(2, new 
Instant(1234L)))
-            .commit(clock.now());
-
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        ImmutableList.of(firstKeyBundle, secondKeyBundle), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    manager.updateWatermarks(firstKeyBundle, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new 
Instant(-1000L));
-    manager.updateWatermarks(secondKeyBundle, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new 
Instant(1234L));
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(
-        filteredWatermarks.getInputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new 
Instant(-1000L))));
-
-    CommittedBundle<Integer> fauxFirstKeyTimerBundle =
-        InProcessBundle.keyed(createdInts, "Odd").commit(clock.now());
-    manager.updateWatermarks(fauxFirstKeyTimerBundle, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new 
Instant(1234L)));
-
-    CommittedBundle<Integer> fauxSecondKeyTimerBundle =
-        InProcessBundle.keyed(createdInts, "Even").commit(clock.now());
-    manager.updateWatermarks(fauxSecondKeyTimerBundle, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new 
Instant(5678L));
-    assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new 
Instant(5678L)));
-
-    manager.updateWatermarks(fauxSecondKeyTimerBundle, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(
-        filteredWatermarks.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that updated output watermarks are monotonic in the presence 
of late data, when
-   * called on an {@link AppliedPTransform} that consumes no input.
-   */
-  @Test
-  public void updateOutputWatermarkShouldBeMonotonic() {
-    CommittedBundle<?> firstInput =
-        
InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(firstInput), new 
Instant(0L));
-    TransformWatermarks firstWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
-
-    CommittedBundle<?> secondInput =
-        
InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(secondInput), new 
Instant(-250L));
-    TransformWatermarks secondWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new 
Instant(0L))));
-  }
-
-  /**
-   * Demonstrates that updated output watermarks are monotonic in the presence 
of watermark holds
-   * that become earlier than a previous watermark hold.
-   */
-  @Test
-  public void updateWatermarkWithHoldsShouldBeMonotonic() {
-    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, new Instant(1_000_000L)), 
TimestampedValue.of(2, new Instant(1234L)),
-        TimestampedValue.of(3, new Instant(-1000L)));
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new 
Instant(Long.MAX_VALUE));
-
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new 
Instant(1_000_000L)),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)),
-            TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L)));
-    manager.updateWatermarks(createdBundle, 
keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(keyBundle),
-        new Instant(500L));
-    TransformWatermarks keyedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
-    assertThat(
-        keyedWatermarks.getInputWatermark(), 
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(keyedWatermarks.getOutputWatermark(), not(laterThan(new 
Instant(500L))));
-    Instant oldOutputWatermark = keyedWatermarks.getOutputWatermark();
-
-    TransformWatermarks updatedWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
-    assertThat(
-        updatedWatermarks.getInputWatermark(), 
not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    // We added a hold prior to the old watermark; we shouldn't progress (due 
to the earlier hold)
-    // but the watermark is monotonic and should not backslide to the new, 
earlier hold
-    assertThat(updatedWatermarks.getOutputWatermark(), 
equalTo(oldOutputWatermark));
-  }
-
-  /**
-   * Demonstrates that updateWatermarks in the presence of late data is 
monotonic.
-   */
-  @Test
-  public void updateWatermarkWithLateData() {
-    Instant sourceWatermark = new Instant(1_000_000L);
-    CommittedBundle<Integer> createdBundle = timestampedBundle(createdInts,
-        TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new 
Instant(1234L)));
-
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), 
sourceWatermark);
-
-    CommittedBundle<KV<String, Integer>> keyBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), 
sourceWatermark),
-            TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)));
-
-    // Finish processing the on-time data. The watermarks should progress to 
be equal to the source
-    manager.updateWatermarks(createdBundle, 
keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(keyBundle), null);
-    TransformWatermarks onTimeWatermarks =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
-    assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
-    assertThat(onTimeWatermarks.getOutputWatermark(), 
equalTo(sourceWatermark));
-
-    CommittedBundle<Integer> lateDataBundle =
-        timestampedBundle(createdInts, TimestampedValue.of(3, new 
Instant(-1000L)));
-    // the late data arrives in a downstream PCollection after its watermark 
has advanced past it;
-    // we don't advance the watermark past the current watermark until we've 
consumed the late data
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(lateDataBundle), new 
Instant(2_000_000L));
-    TransformWatermarks bufferedLateWm =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new 
Instant(2_000_000L)));
-
-    // The input watermark should be held to its previous value (not advanced 
due to late data; not
-    // moved backwards in the presence of watermarks due to monotonicity).
-    TransformWatermarks lateDataBufferedWatermark =
-        manager.getWatermarks(keyed.getProducingTransformInternal());
-    assertThat(lateDataBufferedWatermark.getInputWatermark(), 
not(earlierThan(sourceWatermark)));
-    assertThat(lateDataBufferedWatermark.getOutputWatermark(), 
not(earlierThan(sourceWatermark)));
-
-    CommittedBundle<KV<String, Integer>> lateKeyedBundle =
-        timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new 
Instant(-1000L)));
-    manager.updateWatermarks(lateDataBundle, 
keyed.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(lateKeyedBundle), null);
-  }
-
-  /**
-   * Demonstrates that after watermarks of an upstream transform are updated, 
but no output has been
-   * produced, the watermarks of a downstream process are advanced.
-   */
-  @Test
-  public void getWatermarksAfterOnlyEmptyOutput() {
-    CommittedBundle<Integer> emptyCreateOutput = 
globallyWindowedBundle(createdInts);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-
-    assertThat(
-        updatedSourceWatermarks.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    TransformWatermarks finishedFilterWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(
-        finishedFilterWatermarks.getInputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(
-        finishedFilterWatermarks.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-  }
-
-  /**
-   * Demonstrates that after watermarks of an upstream transform are updated, 
but no output has been
-   * produced, and the downstream transform has a watermark hold, the 
watermark is held to the hold.
-   */
-  @Test
-  public void getWatermarksAfterHoldAndEmptyOutput() {
-    CommittedBundle<Integer> firstCreateOutput = 
globallyWindowedBundle(createdInts, 1, 2);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(firstCreateOutput), new 
Instant(12_000L));
-
-    CommittedBundle<Integer> firstFilterOutput = 
globallyWindowedBundle(filtered);
-    manager.updateWatermarks(firstCreateOutput, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(firstFilterOutput),
-        new Instant(10_000L));
-    TransformWatermarks firstFilterWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(firstFilterWatermarks.getInputWatermark(), not(earlierThan(new 
Instant(12_000L))));
-    assertThat(firstFilterWatermarks.getOutputWatermark(), not(laterThan(new 
Instant(10_000L))));
-
-    CommittedBundle<Integer> emptyCreateOutput = 
globallyWindowedBundle(createdInts);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(emptyCreateOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    TransformWatermarks updatedSourceWatermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-
-    assertThat(
-        updatedSourceWatermarks.getOutputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-
-    TransformWatermarks finishedFilterWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(
-        finishedFilterWatermarks.getInputWatermark(),
-        not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
-    assertThat(finishedFilterWatermarks.getOutputWatermark(), 
not(laterThan(new Instant(10_000L))));
-  }
-
-  @Test
-  public void 
getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() {
-    TransformWatermarks watermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(watermarks.getSynchronizedProcessingInputTime(), 
equalTo(clock.now()));
-    assertThat(
-        watermarks.getSynchronizedProcessingOutputTime(),
-        equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    // Non-root processing watermarks don't progress until data has been 
processed
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingInputTime(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingOutputTime(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    CommittedBundle<Integer> createOutput =
-        InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
-
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createOutput), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
-    TransformWatermarks createAfterUpdate =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), 
equalTo(clock.now()));
-    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), 
equalTo(clock.now()));
-
-    TransformWatermarks filterAfterProduced =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingInputTime(), 
not(laterThan(clock.now())));
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingOutputTime(), 
not(laterThan(clock.now())));
-
-    clock.set(new Instant(1500L));
-    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), 
equalTo(clock.now()));
-    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), 
equalTo(clock.now()));
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingInputTime(),
-        not(laterThan(new Instant(1250L))));
-    assertThat(
-        filterAfterProduced.getSynchronizedProcessingOutputTime(),
-        not(laterThan(new Instant(1250L))));
-
-    CommittedBundle<?> filterOutputBundle =
-        InProcessBundle.unkeyed(intsToFlatten).commit(new Instant(1250L));
-    manager.updateWatermarks(createOutput, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.empty(), 
Collections.<CommittedBundle<?>>singleton(filterOutputBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    TransformWatermarks filterAfterConsumed =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    assertThat(
-        filterAfterConsumed.getSynchronizedProcessingInputTime(),
-        
not(laterThan(createAfterUpdate.getSynchronizedProcessingOutputTime())));
-    assertThat(
-        filterAfterConsumed.getSynchronizedProcessingOutputTime(),
-        
not(laterThan(filterAfterConsumed.getSynchronizedProcessingInputTime())));
-  }
-
-  /**
-   * Demonstrates that the Synchronized Processing Time output watermark 
cannot progress past
-   * pending timers in the same set. This propagates to all downstream 
SynchronizedProcessingTimes.
-   *
-   * <p>Also demonstrate that the result is monotonic.
-   */
-  //  @Test
-  public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() {
-    CommittedBundle<Integer> createdBundle = 
globallyWindowedBundle(createdInts, 1, 2, 4, 8);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createdBundle), new 
Instant(1248L));
-
-    TransformWatermarks filteredWms =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    TransformWatermarks filteredDoubledWms =
-        
manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
-    Instant initialFilteredWm = 
filteredWms.getSynchronizedProcessingOutputTime();
-    Instant initialFilteredDoubledWm = 
filteredDoubledWms.getSynchronizedProcessingOutputTime();
-
-    CommittedBundle<Integer> filteredBundle = globallyWindowedBundle(filtered, 
2, 8);
-    TimerData pastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(250L), 
TimeDomain.PROCESSING_TIME);
-    TimerData futureTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(4096L), 
TimeDomain.PROCESSING_TIME);
-    TimerUpdate timers =
-        
TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build();
-    manager.updateWatermarks(createdBundle, 
filtered.getProducingTransformInternal(), timers,
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    Instant startTime = clock.now();
-    clock.set(startTime.plus(250L));
-    // We're held based on the past timer
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), 
not(laterThan(startTime)));
-    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), 
not(laterThan(startTime)));
-    // And we're monotonic
-    assertThat(
-        filteredWms.getSynchronizedProcessingOutputTime(), 
not(earlierThan(initialFilteredWm)));
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(),
-        not(earlierThan(initialFilteredDoubledWm)));
-
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firedTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        firedTimers.get(filtered.getProducingTransformInternal())
-            .get("key")
-            .getTimers(TimeDomain.PROCESSING_TIME),
-        contains(pastTimer));
-    // Our timer has fired, but has not been completed, so it holds our 
synchronized processing WM
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), 
not(laterThan(startTime)));
-    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), 
not(laterThan(startTime)));
-
-    CommittedBundle<Integer> filteredTimerBundle =
-        InProcessBundle.keyed(filtered, 
"key").commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    CommittedBundle<Integer> filteredTimerResult =
-        InProcessBundle.keyed(filteredTimesTwo, "key")
-            .commit(filteredWms.getSynchronizedProcessingOutputTime());
-    // Complete the processing time timer
-    manager.updateWatermarks(filteredTimerBundle, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.builder("key")
-            .withCompletedTimers(Collections.<TimerData>singleton(pastTimer))
-            .build(),
-        Collections.<CommittedBundle<?>>singleton(filteredTimerResult),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    clock.set(startTime.plus(500L));
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), 
not(laterThan(clock.now())));
-    // filtered should be held to the time at which the filteredTimerResult 
fired
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(),
-        
not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark())));
-
-    manager.updateWatermarks(filteredTimerResult, 
filteredTimesTwo.getProducingTransformInternal(),
-        TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), 
equalTo(clock.now()));
-
-    clock.set(new Instant(Long.MAX_VALUE));
-    assertThat(filteredWms.getSynchronizedProcessingOutputTime(), equalTo(new 
Instant(4096)));
-    assertThat(
-        filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(new 
Instant(4096)));
-  }
-
-  /**
-   * Demonstrates that if any earlier processing holds appear in the 
synchronized processing time
-   * output hold the result is monotonic.
-   */
-  @Test
-  public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() {
-    Instant startTime = clock.now();
-    TransformWatermarks watermarks =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(watermarks.getSynchronizedProcessingInputTime(), 
equalTo(startTime));
-
-    TransformWatermarks filteredWatermarks =
-        manager.getWatermarks(filtered.getProducingTransformInternal());
-    // Non-root processing watermarks don't progress until data has been 
processed
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingInputTime(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-    assertThat(
-        filteredWatermarks.getSynchronizedProcessingOutputTime(),
-        not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
-
-    CommittedBundle<Integer> createOutput =
-        InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L));
-
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createOutput), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
-    TransformWatermarks createAfterUpdate =
-        manager.getWatermarks(createdInts.getProducingTransformInternal());
-    assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), 
not(laterThan(clock.now())));
-    assertThat(
-        createAfterUpdate.getSynchronizedProcessingOutputTime(), 
not(laterThan(clock.now())));
-
-    CommittedBundle<Integer> createSecondOutput =
-        InProcessBundle.unkeyed(createdInts).commit(new Instant(750L));
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(createSecondOutput),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), 
equalTo(clock.now()));
-  }
-
-  @Test
-  public void 
synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() {
-    CommittedBundle<Integer> created = globallyWindowedBundle(createdInts, 1, 
2, 3);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(created), new 
Instant(40_900L));
-
-    CommittedBundle<Integer> filteredBundle = globallyWindowedBundle(filtered, 
2, 4);
-    Instant upstreamHold = new Instant(2048L);
-    TimerData upstreamProcessingTimer =
-        TimerData.of(StateNamespaces.global(), upstreamHold, 
TimeDomain.PROCESSING_TIME);
-    manager.updateWatermarks(created, filtered.getProducingTransformInternal(),
-        TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    TransformWatermarks downstreamWms =
-        
manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), 
equalTo(clock.now()));
-
-    clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), 
equalTo(upstreamHold));
-
-    manager.extractFiredTimers();
-    // Pending processing time timers that have been fired but aren't 
completed hold the
-    // synchronized processing time
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), 
equalTo(upstreamHold));
-
-    CommittedBundle<Integer> otherCreated = 
globallyWindowedBundle(createdInts, 4, 8, 12);
-    manager.updateWatermarks(otherCreated, 
filtered.getProducingTransformInternal(),
-        TimerUpdate.builder("key")
-            
.withCompletedTimers(Collections.singleton(upstreamProcessingTimer))
-            .build(),
-        Collections.<CommittedBundle<?>>emptyList(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), 
not(earlierThan(clock.now())));
-  }
-
-  @Test
-  public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() {
-    CommittedBundle<Integer> created = globallyWindowedBundle(createdInts, 1, 
2, 3);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(created), new 
Instant(29_919_235L));
-
-    Instant upstreamHold = new Instant(2048L);
-    CommittedBundle<Integer> filteredBundle =
-        InProcessBundle.keyed(filtered, "key").commit(upstreamHold);
-    manager.updateWatermarks(created, 
filtered.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>singleton(filteredBundle),
-        BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    TransformWatermarks downstreamWms =
-        
manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), 
equalTo(clock.now()));
-
-    clock.set(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    assertThat(downstreamWms.getSynchronizedProcessingInputTime(), 
equalTo(upstreamHold));
-  }
-
-  @Test
-  public void extractFiredTimersReturnsFiredEventTimeTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
-        manager.extractFiredTimers();
-    // Watermarks haven't advanced
-    assertThat(initialTimers.entrySet(), emptyIterable());
-
-    // Advance WM of keyed past the first timer, but ahead of the second and 
third
-    CommittedBundle<Integer> createdBundle = globallyWindowedBundle(filtered);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.singleton(createdBundle), new Instant(1500L));
-
-    TimerData earliestTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(1000), 
TimeDomain.EVENT_TIME);
-    TimerData middleTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(5000L), 
TimeDomain.EVENT_TIME);
-    TimerData lastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(10000L), 
TimeDomain.EVENT_TIME);
-    Object key = new Object();
-    TimerUpdate update =
-        TimerUpdate.builder(key)
-            .setTimer(earliestTimer)
-            .setTimer(middleTimer)
-            .setTimer(lastTimer)
-            .build();
-
-    manager.updateWatermarks(createdBundle, 
filtered.getProducingTransformInternal(), update,
-        
Collections.<CommittedBundle<?>>singleton(globallyWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
firstTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<Object, FiredTimers> firstFilteredTimers =
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(firstFilteredTimers.get(key), not(nullValue()));
-    FiredTimers firstFired = firstFilteredTimers.get(key);
-    assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), 
contains(earliestTimer));
-
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<Object, FiredTimers> secondFilteredTimers =
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(secondFilteredTimers.get(key), not(nullValue()));
-    FiredTimers secondFired = secondFilteredTimers.get(key);
-    // Contains, in order, middleTimer and then lastTimer
-    assertThat(secondFired.getTimers(TimeDomain.EVENT_TIME), 
contains(middleTimer, lastTimer));
-  }
-
-  @Test
-  public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
-        manager.extractFiredTimers();
-    // Watermarks haven't advanced
-    assertThat(initialTimers.entrySet(), emptyIterable());
-
-    // Advance WM of keyed past the first timer, but ahead of the second and 
third
-    CommittedBundle<Integer> createdBundle = globallyWindowedBundle(filtered);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.singleton(createdBundle), new Instant(1500L));
-
-    TimerData earliestTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(999L), 
TimeDomain.PROCESSING_TIME);
-    TimerData middleTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(5000L), 
TimeDomain.PROCESSING_TIME);
-    TimerData lastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(10000L), 
TimeDomain.PROCESSING_TIME);
-    Object key = new Object();
-    TimerUpdate update =
-        TimerUpdate.builder(key)
-            .setTimer(lastTimer)
-            .setTimer(earliestTimer)
-            .setTimer(middleTimer)
-            .build();
-
-    manager.updateWatermarks(createdBundle, 
filtered.getProducingTransformInternal(), update,
-        
Collections.<CommittedBundle<?>>singleton(globallyWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
firstTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<Object, FiredTimers> firstFilteredTimers =
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(firstFilteredTimers.get(key), not(nullValue()));
-    FiredTimers firstFired = firstFilteredTimers.get(key);
-    assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), 
contains(earliestTimer));
-
-    clock.set(new Instant(50_000L));
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<Object, FiredTimers> secondFilteredTimers =
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(secondFilteredTimers.get(key), not(nullValue()));
-    FiredTimers secondFired = secondFilteredTimers.get(key);
-    // Contains, in order, middleTimer and then lastTimer
-    assertThat(secondFired.getTimers(TimeDomain.PROCESSING_TIME), 
contains(middleTimer, lastTimer));
-  }
-
-  @Test
-  public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() 
{
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
-        manager.extractFiredTimers();
-    // Watermarks haven't advanced
-    assertThat(initialTimers.entrySet(), emptyIterable());
-
-    // Advance WM of keyed past the first timer, but ahead of the second and 
third
-    CommittedBundle<Integer> createdBundle = globallyWindowedBundle(filtered);
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.singleton(createdBundle), new Instant(1500L));
-
-    TimerData earliestTimer = TimerData.of(
-        StateNamespaces.global(), new Instant(999L), 
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData middleTimer = TimerData.of(
-        StateNamespaces.global(), new Instant(5000L), 
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData lastTimer = TimerData.of(
-        StateNamespaces.global(), new Instant(10000L), 
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    Object key = new Object();
-    TimerUpdate update =
-        TimerUpdate.builder(key)
-            .setTimer(lastTimer)
-            .setTimer(earliestTimer)
-            .setTimer(middleTimer)
-            .build();
-
-    manager.updateWatermarks(createdBundle, 
filtered.getProducingTransformInternal(), update,
-        
Collections.<CommittedBundle<?>>singleton(globallyWindowedBundle(intsToFlatten)),
-        new Instant(1000L));
-
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
firstTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<Object, FiredTimers> firstFilteredTimers =
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(firstFilteredTimers.get(key), not(nullValue()));
-    FiredTimers firstFired = firstFilteredTimers.get(key);
-    assertThat(
-        firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), 
contains(earliestTimer));
-
-    clock.set(new Instant(50_000L));
-    manager.updateWatermarks(null, 
createdInts.getProducingTransformInternal(), TimerUpdate.empty(),
-        Collections.<CommittedBundle<?>>emptyList(), new Instant(50_000L));
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<Object, FiredTimers> secondFilteredTimers =
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(secondFilteredTimers.get(key), not(nullValue()));
-    FiredTimers secondFired = secondFilteredTimers.get(key);
-    // Contains, in order, middleTimer and then lastTimer
-    assertThat(
-        secondFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME),
-        contains(middleTimer, lastTimer));
-  }
-
-  @Test
-  public void timerUpdateBuilderBuildAddsAllAddedTimers() {
-    TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), 
TimeDomain.EVENT_TIME);
-    TimerData deleted =
-        TimerData.of(StateNamespaces.global(), new Instant(24L), 
TimeDomain.PROCESSING_TIME);
-    TimerData completedOne = TimerData.of(
-        StateNamespaces.global(), new Instant(1024L), 
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData completedTwo =
-        TimerData.of(StateNamespaces.global(), new Instant(2048L), 
TimeDomain.EVENT_TIME);
-
-    TimerUpdate update =
-        TimerUpdate.builder("foo")
-            .withCompletedTimers(ImmutableList.of(completedOne, completedTwo))
-            .setTimer(set)
-            .deletedTimer(deleted)
-            .build();
-
-    assertThat(update.getCompletedTimers(), containsInAnyOrder(completedOne, 
completedTwo));
-    assertThat(update.getSetTimers(), contains(set));
-    assertThat(update.getDeletedTimers(), contains(deleted));
-  }
-
-  @Test
-  public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), 
TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build();
-
-    assertThat(built.getSetTimers(), emptyIterable());
-    assertThat(built.getDeletedTimers(), contains(timer));
-  }
-
-  @Test
-  public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), 
TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build();
-
-    assertThat(built.getSetTimers(), contains(timer));
-    assertThat(built.getDeletedTimers(), emptyIterable());
-  }
-
-  @Test
-  public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), 
TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.build();
-    builder.setTimer(timer);
-    assertThat(built.getSetTimers(), emptyIterable());
-    builder.build();
-    assertThat(built.getSetTimers(), emptyIterable());
-  }
-
-  @Test
-  public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), 
TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.build();
-    builder.deletedTimer(timer);
-    assertThat(built.getDeletedTimers(), emptyIterable());
-    builder.build();
-    assertThat(built.getDeletedTimers(), emptyIterable());
-  }
-
-  @Test
-  public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() {
-    TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), 
TimeDomain.EVENT_TIME);
-
-    TimerUpdate built = builder.build();
-    builder.withCompletedTimers(ImmutableList.of(timer));
-    assertThat(built.getCompletedTimers(), emptyIterable());
-    builder.build();
-    assertThat(built.getCompletedTimers(), emptyIterable());
-  }
-
-  private static Matcher<Instant> earlierThan(final Instant laterInstant) {
-    return new BaseMatcher<Instant>() {
-      @Override
-      public boolean matches(Object item) {
-        ReadableInstant instant = (ReadableInstant) item;
-        return instant.isBefore(laterInstant);
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description.appendText("earlier than ").appendValue(laterInstant);
-      }
-    };
-  }
-
-  private static Matcher<Instant> laterThan(final Instant shouldBeEarlier) {
-    return new BaseMatcher<Instant>() {
-      @Override
-      public boolean matches(Object item) {
-        ReadableInstant instant = (ReadableInstant) item;
-        return instant.isAfter(shouldBeEarlier);
-      }
-
-      @Override
-      public void describeTo(Description description) {
-        description.appendText("later than ").appendValue(shouldBeEarlier);
-      }
-    };
-  }
-
-  @SafeVarargs
-  private final <T> CommittedBundle<T> timestampedBundle(
-      PCollection<T> pc, TimestampedValue<T>... values) {
-    UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
-    for (TimestampedValue<T> value : values) {
-      bundle.add(
-          WindowedValue.timestampedValueInGlobalWindow(value.getValue(), 
value.getTimestamp()));
-    }
-    return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-  }
-
-  @SafeVarargs
-  private final <T> CommittedBundle<T> globallyWindowedBundle(PCollection<T> 
pc, T... values) {
-    UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc);
-    for (T value : values) {
-      bundle.add(WindowedValue.valueInGlobalWindow(value));
-    }
-    return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessBundleTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessBundleTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessBundleTest.java
deleted file mode 100644
index 57d8c90..0000000
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessBundleTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess.util;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertThat;
-
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * Tests for {@link InProcessBundle}.
- */
-@RunWith(JUnit4.class)
-public class InProcessBundleTest {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void unkeyedShouldCreateWithNullKey() {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.of(1));
-
-    InProcessBundle<Integer> inFlightBundle = 
InProcessBundle.unkeyed(pcollection);
-
-    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
-
-    assertThat(bundle.isKeyed(), is(false));
-    assertThat(bundle.getKey(), nullValue());
-  }
-
-  private void keyedCreateBundle(Object key) {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.of(1));
-
-    InProcessBundle<Integer> inFlightBundle = 
InProcessBundle.keyed(pcollection, key);
-
-    CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
-    assertThat(bundle.isKeyed(), is(true));
-    assertThat(bundle.getKey(), equalTo(key));
-  }
-
-  @Test
-  public void keyedWithNullKeyShouldCreateKeyedBundle() {
-    keyedCreateBundle(null);
-  }
-
-  @Test
-  public void keyedWithKeyShouldCreateKeyedBundle() {
-    keyedCreateBundle(new Object());
-  }
-
-  private <T> void 
afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) 
{
-    PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of());
-
-    InProcessBundle<T> bundle = InProcessBundle.unkeyed(pcollection);
-    Collection<Matcher<? super WindowedValue<T>>> expectations = new 
ArrayList<>();
-    for (WindowedValue<T> elem : elems) {
-      bundle.add(elem);
-      expectations.add(equalTo(elem));
-    }
-    Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher =
-        Matchers.<WindowedValue<T>>containsInAnyOrder(expectations);
-    assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher);
-  }
-
-  @Test
-  public void getElementsBeforeAddShouldReturnEmptyIterable() {
-    
afterCommitGetElementsShouldHaveAddedElements(Collections.<WindowedValue<Integer>>emptyList());
-  }
-
-  @Test
-  public void getElementsAfterAddShouldReturnAddedElements() {
-    WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1);
-    WindowedValue<Integer> secondValue =
-        WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L));
-
-    afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, 
secondValue));
-  }
-
-  @Test
-  public void addAfterCommitShouldThrowException() {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.<Integer>of());
-
-    InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection);
-    bundle.add(WindowedValue.valueInGlobalWindow(1));
-    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
-    assertThat(firstCommit.getElements(), 
containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("3");
-    thrown.expectMessage("committed");
-
-    bundle.add(WindowedValue.valueInGlobalWindow(3));
-  }
-
-  @Test
-  public void commitAfterCommitShouldThrowException() {
-    PCollection<Integer> pcollection = 
TestPipeline.create().apply(Create.<Integer>of());
-
-    InProcessBundle<Integer> bundle = InProcessBundle.unkeyed(pcollection);
-    bundle.add(WindowedValue.valueInGlobalWindow(1));
-    CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now());
-    assertThat(firstCommit.getElements(), 
containsInAnyOrder(WindowedValue.valueInGlobalWindow(1)));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("committed");
-
-    bundle.commit(Instant.now());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessTimerInternalsTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessTimerInternalsTest.java
deleted file mode 100644
index cfe820d..0000000
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/InProcessTimerInternalsTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess.util;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import 
com.google.cloud.dataflow.sdk.runners.inprocess.util.InMemoryWatermarkManager.TransformWatermarks;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link InProcessTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class InProcessTimerInternalsTest {
-  private MockClock clock;
-  @Mock private TransformWatermarks watermarks;
-
-  private TimerUpdateBuilder timerUpdateBuilder;
-
-  private InProcessTimerInternals internals;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    clock = MockClock.fromInstant(new Instant(0));
-
-    timerUpdateBuilder = TimerUpdate.builder(1234);
-
-    internals = InProcessTimerInternals.create(clock, watermarks, 
timerUpdateBuilder);
-  }
-
-  @Test
-  public void setTimerAddsToBuilder() {
-    TimerData eventTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(20145L), 
TimeDomain.EVENT_TIME);
-    TimerData processingTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(125555555L), 
TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTimer =
-        TimerData.of(
-            StateNamespaces.global(),
-            new Instant(98745632189L),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    internals.setTimer(eventTimer);
-    internals.setTimer(processingTimer);
-    internals.setTimer(synchronizedProcessingTimer);
-
-    assertThat(
-        internals.getTimerUpdate().getSetTimers(),
-        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, 
processingTimer));
-  }
-
-  @Test
-  public void deleteTimerDeletesOnBuilder() {
-    TimerData eventTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(20145L), 
TimeDomain.EVENT_TIME);
-    TimerData processingTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(125555555L), 
TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTimer =
-        TimerData.of(
-            StateNamespaces.global(),
-            new Instant(98745632189L),
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    internals.deleteTimer(eventTimer);
-    internals.deleteTimer(processingTimer);
-    internals.deleteTimer(synchronizedProcessingTimer);
-
-    assertThat(
-        internals.getTimerUpdate().getDeletedTimers(),
-        containsInAnyOrder(eventTimer, synchronizedProcessingTimer, 
processingTimer));
-  }
-
-  @Test
-  public void getProcessingTimeIsClockNow() {
-    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
-    Instant oldProcessingTime = internals.currentProcessingTime();
-
-    clock.advance(Duration.standardHours(12));
-
-    assertThat(internals.currentProcessingTime(), equalTo(clock.now()));
-    assertThat(
-        internals.currentProcessingTime(),
-        equalTo(oldProcessingTime.plus(Duration.standardHours(12))));
-  }
-
-  @Test
-  public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() {
-    when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new 
Instant(12345L));
-    assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new 
Instant(12345L)));
-  }
-
-  @Test
-  public void getInputWatermarkTimeUsesWatermarkTime() {
-    when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L));
-    assertThat(internals.currentInputWatermarkTime(), equalTo(new 
Instant(8765L)));
-  }
-
-  @Test
-  public void getOutputWatermarkTimeUsesWatermarkTime() {
-    when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L));
-    assertThat(internals.currentOutputWatermarkTime(), equalTo(new 
Instant(25525L)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c8911e/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/MockClock.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/MockClock.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/MockClock.java
deleted file mode 100644
index 440ed43..0000000
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/util/MockClock.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A clock that returns a constant value for now which can be set with calls to
- * {@link #set(Instant)}.
- *
- * <p>For uses of the {@link Clock} interface in unit tests.
- */
-public class MockClock implements Clock {
-
-  private Instant now;
-
-  public static MockClock fromInstant(Instant initial) {
-    return new MockClock(initial);
-  }
-
-  private MockClock(Instant initialNow) {
-    this.now = initialNow;
-  }
-
-  public void set(Instant newNow) {
-    checkArgument(!newNow.isBefore(now), "Cannot move MockClock backwards in 
time from %s to %s",
-        now, newNow);
-    this.now = newNow;
-  }
-
-  public void advance(Duration duration) {
-    checkArgument(
-        duration.getMillis() > 0,
-        "Cannot move MockClock backwards in time by duration %s",
-        duration);
-    set(now.plus(duration));
-  }
-
-  @Override
-  public Instant now() {
-    return now;
-  }
-
-}

Reply via email to