Repository: incubator-beam
Updated Branches:
  refs/heads/master 487052588 -> 874ddef05


Update Watermarks Outside of handleResult

Remove excess mutual exclusion


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46bc6e14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46bc6e14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46bc6e14

Branch: refs/heads/master
Commit: 46bc6e14b5e247905f567e617ffc017f98cc6f44
Parents: 4870525
Author: Thomas Groh <[email protected]>
Authored: Mon May 2 17:26:47 2016 -0700
Committer: Thomas Groh <[email protected]>
Committed: Tue May 10 10:49:25 2016 -0700

----------------------------------------------------------------------
 .../direct/InMemoryWatermarkManager.java        | 113 +++++++++++++++----
 .../direct/InProcessEvaluationContext.java      |  14 ++-
 .../direct/WatermarkCallbackExecutor.java       |   9 +-
 .../direct/InMemoryWatermarkManagerTest.java    |  58 ++++++++--
 .../direct/InProcessEvaluationContextTest.java  |   8 +-
 .../direct/WatermarkCallbackExecutorTest.java   |   4 +-
 6 files changed, 169 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 87ea4d5..f8cf343 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
@@ -57,6 +58,7 @@ import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
@@ -686,6 +688,17 @@ public class InMemoryWatermarkManager {
   private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> 
transformToWatermarks;
 
   /**
+   * A queue of pending updates to the state of this {@link 
InMemoryWatermarkManager}.
+   */
+  private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
+
+  /**
+   * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have 
potentially
+   * stale data.
+   */
+  private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> 
pendingRefreshes;
+
+  /**
    * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the 
newly created
    * {@link InMemoryWatermarkManager} start at {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}, the
    * minimum watermark, with no watermark holds or pending elements.
@@ -707,6 +720,8 @@ public class InMemoryWatermarkManager {
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
     this.clock = clock;
     this.consumers = consumers;
+    this.pendingUpdates = new ConcurrentLinkedQueue<>();
+    this.pendingRefreshes = new ConcurrentLinkedQueue<>();
 
     transformToWatermarks = new HashMap<>();
 
@@ -810,27 +825,38 @@ public class InMemoryWatermarkManager {
       @Nullable CommittedBundle<?> completed,
       TimerUpdate timerUpdate,
       CommittedResult result,
-      @Nullable Instant earliestHold) {
-    AppliedPTransform<?, ?, ?> transform = result.getTransform();
-    updatePending(completed, timerUpdate, result);
-    TransformWatermarks transformWms = transformToWatermarks.get(transform);
-    transformWms.setEventTimeHold(completed == null ? null : 
completed.getKey(), earliestHold);
-    refreshWatermarks(transform);
+      Instant earliestHold) {
+    pendingUpdates.offer(PendingWatermarkUpdate.create(completed,
+        timerUpdate,
+        result,
+        earliestHold));
   }
 
-  private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) {
-    TransformWatermarks myWatermarks = transformToWatermarks.get(transform);
-    WatermarkUpdate updateResult = myWatermarks.refresh();
-    if (updateResult.isAdvanced()) {
-      for (PValue outputPValue : transform.getOutput().expand()) {
-        Collection<AppliedPTransform<?, ?, ?>> downstreamTransforms = 
consumers.get(outputPValue);
-        if (downstreamTransforms != null) {
-          for (AppliedPTransform<?, ?, ?> downstreamTransform : 
downstreamTransforms) {
-            refreshWatermarks(downstreamTransform);
-          }
-        }
-      }
-    }
+  /**
+   * Applies all pending updates to this {@link InMemoryWatermarkManager}, 
causing the pending state
+   * of all {@link TransformWatermarks} to be advanced as far as possible.
+   */
+  private void applyPendingUpdates() {
+    Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>();
+    PendingWatermarkUpdate pending = pendingUpdates.poll();
+    while (pending != null) {
+      applyPendingUpdate(pending);
+      updatedTransforms.add(pending.getTransform());
+      pending = pendingUpdates.poll();
+    }
+    pendingRefreshes.addAll(updatedTransforms);
+  }
+
+  private void applyPendingUpdate(PendingWatermarkUpdate pending) {
+    CommittedResult result = pending.getResult();
+    AppliedPTransform transform = result.getTransform();
+    CommittedBundle<?> inputBundle = pending.getInputBundle();
+
+    updatePending(inputBundle, pending.getTimerUpdate(), result);
+
+    TransformWatermarks transformWms = transformToWatermarks.get(transform);
+    transformWms.setEventTimeHold(inputBundle == null ? null : 
inputBundle.getKey(),
+        pending.getEarliestHold());
   }
 
   /**
@@ -871,6 +897,29 @@ public class InMemoryWatermarkManager {
   }
 
   /**
+   * Refresh the watermarks contained within this {@link 
InMemoryWatermarkManager}, causing all
+   * watermarks to be advanced as far as possible.
+   */
+  synchronized void refreshAll() {
+    applyPendingUpdates();
+    while (!pendingRefreshes.isEmpty()) {
+      refreshWatermarks(pendingRefreshes.poll());
+    }
+  }
+
+  private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
+    TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh);
+    WatermarkUpdate updateResult = myWatermarks.refresh();
+    Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
+    if (updateResult.isAdvanced()) {
+      for (PValue outputPValue : toRefresh.getOutput().expand()) {
+        additionalRefreshes.addAll(consumers.get(outputPValue));
+      }
+    }
+    pendingRefreshes.addAll(additionalRefreshes);
+  }
+
+  /**
    * Returns a map of each {@link PTransform} that has pending timers to those 
timers. All of the
    * pending timers will be removed from this {@link InMemoryWatermarkManager}.
    */
@@ -1338,4 +1387,30 @@ public class InMemoryWatermarkManager {
     }
     return result;
   }
+
+  @AutoValue
+  abstract static class PendingWatermarkUpdate {
+    @Nullable
+    public abstract CommittedBundle<?> getInputBundle();
+    public abstract TimerUpdate getTimerUpdate();
+    public abstract CommittedResult getResult();
+    public abstract Instant getEarliestHold();
+
+    /**
+     * Gets the {@link AppliedPTransform} that generated this result.
+     */
+    public AppliedPTransform<?, ?, ?> getTransform() {
+      return getResult().getTransform();
+    }
+
+    public static PendingWatermarkUpdate create(
+        CommittedBundle<?> inputBundle,
+        TimerUpdate timerUpdate,
+        CommittedResult result, Instant earliestHold) {
+      return new 
AutoValue_InMemoryWatermarkManager_PendingWatermarkUpdate(inputBundle,
+          timerUpdate,
+          result,
+          earliestHold);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index a6dffba..9eeafbb 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
@@ -51,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
 
@@ -128,7 +130,7 @@ class InProcessEvaluationContext {
     this.applicationStateInternals = new ConcurrentHashMap<>();
     this.mergedCounters = new CounterSet();
 
-    this.callbackExecutor = WatermarkCallbackExecutor.create();
+    this.callbackExecutor = 
WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor());
   }
 
   /**
@@ -146,7 +148,7 @@ class InProcessEvaluationContext {
    * @param result the result of evaluating the input bundle
    * @return the committed bundles contained within the handled {@code result}
    */
-  public synchronized CommittedResult handleResult(
+  public CommittedResult handleResult(
       @Nullable CommittedBundle<?> completedBundle,
       Iterable<TimerData> completedTimers,
       InProcessTransformResult result) {
@@ -163,7 +165,6 @@ class InProcessEvaluationContext {
         result.getTimerUpdate().withCompletedTimers(completedTimers),
         committedResult,
         result.getWatermarkHold());
-    fireAllAvailableCallbacks();
     // Update counters
     if (result.getCounters() != null) {
       mergedCounters.merge(result.getCounters());
@@ -359,6 +360,12 @@ class InProcessEvaluationContext {
     return mergedCounters;
   }
 
+  @VisibleForTesting
+  void forceRefresh() {
+    watermarkManager.refreshAll();
+    fireAllAvailableCallbacks();
+  }
+
   /**
    * Extracts all timers that have been fired and have not already been 
extracted.
    *
@@ -366,6 +373,7 @@ class InProcessEvaluationContext {
    * for each time they are set.
    */
   public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
extractFiredTimers() {
+    forceRefresh();
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
         watermarkManager.extractFiredTimers();
     return fired;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index 4a3a517..1c9b050 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -30,7 +30,6 @@ import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * Executes callbacks that occur based on the progression of the watermark 
per-step.
@@ -52,17 +51,17 @@ class WatermarkCallbackExecutor {
   /**
    * Create a new {@link WatermarkCallbackExecutor}.
    */
-  public static WatermarkCallbackExecutor create() {
-    return new WatermarkCallbackExecutor();
+  public static WatermarkCallbackExecutor create(ExecutorService executor) {
+    return new WatermarkCallbackExecutor(executor);
   }
 
   private final ConcurrentMap<AppliedPTransform<?, ?, ?>, 
PriorityQueue<WatermarkCallback>>
       callbacks;
   private final ExecutorService executor;
 
-  private WatermarkCallbackExecutor() {
+  private WatermarkCallbackExecutor(ExecutorService executor) {
     this.callbacks = new ConcurrentHashMap<>();
-    this.executor = Executors.newSingleThreadExecutor();
+    this.executor = executor;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index b45440d..7f202fb 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -167,6 +167,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(output)),
         new Instant(8000L));
+    manager.refreshAll();
     TransformWatermarks updatedSourceWatermark =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
 
@@ -187,6 +188,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             
Collections.<CommittedBundle<?>>singleton(secondPcollectionBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
 
     // We didn't do anything for the first source, so we shouldn't have 
progressed the watermark
     TransformWatermarks firstSourceWatermark =
@@ -219,7 +221,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(flattened.getProducingTransformInternal(),
             
secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
-        null);
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
     TransformWatermarks transformAfterProcessing =
         manager.getWatermarks(flattened.getProducingTransformInternal());
     manager.updateWatermarks(secondPcollectionBundle,
@@ -227,7 +229,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(flattened.getProducingTransformInternal(),
             
secondPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(flattenedBundleSecondCreate)),
-        null);
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     assertThat(
         transformAfterProcessing.getInputWatermark(),
         not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE)));
@@ -245,7 +248,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(createdInts.getProducingTransformInternal(),
             null,
             Collections.<CommittedBundle<?>>singleton(firstPcollectionBundle)),
-        new Instant(Long.MAX_VALUE));
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks firstSourceWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(
@@ -276,7 +280,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(flattened.getProducingTransformInternal(),
             
firstPcollectionBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(completedFlattenBundle)),
-        null);
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks afterConsumingAllInput =
         manager.getWatermarks(flattened.getProducingTransformInternal());
     assertThat(
@@ -302,6 +307,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(Long.MAX_VALUE));
+    manager.refreshAll();
     TransformWatermarks createdAfterProducing =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(
@@ -317,7 +323,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(keyed.getProducingTransformInternal(),
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
-            null);
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks keyedWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
     assertThat(
@@ -337,7 +344,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(filtered.getProducingTransformInternal(),
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
-        null);
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks filteredProcessedWatermarks =
         manager.getWatermarks(filtered.getProducingTransformInternal());
     assertThat(
@@ -375,6 +383,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
+    manager.refreshAll();
     TransformWatermarks keyedWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
     assertThat(
@@ -418,6 +427,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
secondKeyBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(1234L));
+    manager.refreshAll();
 
     TransformWatermarks filteredWatermarks =
         manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -433,6 +443,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
fauxFirstKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
 
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new 
Instant(1234L)));
 
@@ -444,6 +455,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(5678L));
+    manager.refreshAll();
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new 
Instant(5678L)));
 
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
@@ -452,6 +464,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
fauxSecondKeyTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     assertThat(filteredWatermarks.getOutputWatermark(),
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
   }
@@ -469,6 +482,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(firstInput)),
         new Instant(0L));
+    manager.refreshAll();
     TransformWatermarks firstWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L)));
@@ -481,6 +495,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(secondInput)),
         new Instant(-250L));
+    manager.refreshAll();
     TransformWatermarks secondWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new 
Instant(0L))));
@@ -513,6 +528,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
         new Instant(500L));
+    manager.refreshAll();
     TransformWatermarks keyedWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
     assertThat(
@@ -589,7 +605,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(keyed.getProducingTransformInternal(),
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(keyBundle)),
-        null);
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks onTimeWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
     assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark));
@@ -605,6 +622,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateDataBundle)),
         new Instant(2_000_000L));
+    manager.refreshAll();
     TransformWatermarks bufferedLateWm =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new 
Instant(2_000_000L)));
@@ -623,7 +641,8 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
         result(keyed.getProducingTransformInternal(),
             
lateDataBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(lateKeyedBundle)),
-        null);
+        BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
   }
 
   public void updateWatermarkWithDifferentWindowedValueInstances() {
@@ -646,6 +665,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         null);
+    manager.refreshAll();
     TransformWatermarks onTimeWatermarks =
         manager.getWatermarks(keyed.getProducingTransformInternal());
     assertThat(onTimeWatermarks.getInputWatermark(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
@@ -664,6 +684,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks updatedSourceWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
 
@@ -702,6 +723,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
firstCreateOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(firstFilterOutput)),
         new Instant(10_000L));
+    manager.refreshAll();
     TransformWatermarks firstFilterWatermarks =
         manager.getWatermarks(filtered.getProducingTransformInternal());
     assertThat(firstFilterWatermarks.getInputWatermark(), not(earlierThan(new 
Instant(12_000L))));
@@ -714,6 +736,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(emptyCreateOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks updatedSourceWatermarks =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
 
@@ -757,6 +780,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks createAfterUpdate =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), 
equalTo(clock.now()));
@@ -787,6 +811,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createOutput.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filterOutputBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks filterAfterConsumed =
         manager.getWatermarks(filtered.getProducingTransformInternal());
     assertThat(
@@ -812,6 +837,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(createdBundle)),
         new Instant(1248L));
+    manager.refreshAll();
 
     TransformWatermarks filteredWms =
         manager.getWatermarks(filtered.getProducingTransformInternal());
@@ -833,6 +859,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     Instant startTime = clock.now();
     clock.set(startTime.plus(250L));
     // We're held based on the past timer
@@ -871,6 +898,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredTimerResult)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
 
     clock.set(startTime.plus(500L));
     assertThat(filteredWms.getSynchronizedProcessingOutputTime(), 
not(laterThan(clock.now())));
@@ -885,6 +913,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
filteredTimerResult.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), 
equalTo(clock.now()));
 
     clock.set(new Instant(Long.MAX_VALUE));
@@ -923,6 +952,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(createOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
     TransformWatermarks createAfterUpdate =
         manager.getWatermarks(createdInts.getProducingTransformInternal());
     assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), 
not(laterThan(clock.now())));
@@ -937,6 +967,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(createSecondOutput)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
 
     assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), 
equalTo(clock.now()));
   }
@@ -950,6 +981,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>singleton(created)),
         new Instant(40_900L));
+    manager.refreshAll();
 
     CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 
4);
     Instant upstreamHold = new Instant(2048L);
@@ -961,6 +993,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
 
     TransformWatermarks downstreamWms =
         
manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
@@ -982,6 +1015,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>emptyList()),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
 
     assertThat(downstreamWms.getSynchronizedProcessingInputTime(), 
not(earlierThan(clock.now())));
   }
@@ -1007,6 +1041,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
         BoundedWindow.TIMESTAMP_MAX_VALUE);
+    manager.refreshAll();
 
     TransformWatermarks downstreamWms =
         
manager.getWatermarks(filteredTimesTwo.getProducingTransformInternal());
@@ -1031,6 +1066,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.singleton(createdBundle)),
         new Instant(1500L));
+    manager.refreshAll();
 
     TimerData earliestTimer =
         TimerData.of(StateNamespaces.global(), new Instant(1000), 
TimeDomain.EVENT_TIME);
@@ -1052,6 +1088,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
+    manager.refreshAll();
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
firstTransformFiredTimers =
         manager.extractFiredTimers();
@@ -1069,6 +1106,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
+    manager.refreshAll();
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
@@ -1118,6 +1156,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
+    manager.refreshAll();
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
firstTransformFiredTimers =
         manager.extractFiredTimers();
@@ -1136,6 +1175,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
+    manager.refreshAll();
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
@@ -1185,6 +1225,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             
createdBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             
Collections.<CommittedBundle<?>>singleton(multiWindowedBundle(intsToFlatten))),
         new Instant(1000L));
+    manager.refreshAll();
 
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
firstTransformFiredTimers =
         manager.extractFiredTimers();
@@ -1204,6 +1245,7 @@ public class InMemoryWatermarkManagerTest implements 
Serializable {
             null,
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
+    manager.refreshAll();
     Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
index 59c4d8e..b73e41a 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -317,7 +317,6 @@ public class InProcessEvaluationContextTest {
             .build();
 
     context.handleResult(null, ImmutableList.<TimerData>of(), result);
-
     // Difficult to demonstrate that we took no action in a multithreaded 
world; poll for a bit
     // will likely be flaky if this logic is broken
     assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
@@ -325,6 +324,7 @@ public class InProcessEvaluationContextTest {
     InProcessTransformResult finishedResult =
         
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
+    context.forceRefresh();
     // Obtain the value via blocking call
     assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
   }
@@ -336,6 +336,7 @@ public class InProcessEvaluationContextTest {
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
 
     final CountDownLatch callLatch = new CountDownLatch(1);
+    context.extractFiredTimers();
     Runnable callback =
         new Runnable() {
           @Override
@@ -426,6 +427,7 @@ public class InProcessEvaluationContextTest {
         null,
         ImmutableList.<TimerData>of(),
         
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    context.extractFiredTimers();
     assertThat(context.isDone(unbounded.getProducingTransformInternal()), 
is(true));
   }
 
@@ -450,6 +452,7 @@ public class InProcessEvaluationContextTest {
         null,
         ImmutableList.<TimerData>of(),
         
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+    context.extractFiredTimers();
     assertThat(context.isDone(created.getProducingTransformInternal()), 
is(true));
   }
 
@@ -482,6 +485,7 @@ public class InProcessEvaluationContextTest {
           ImmutableList.<TimerData>of(),
           StepTransformResult.withoutHold(consumers).build());
     }
+    context.extractFiredTimers();
     assertThat(context.isDone(), is(true));
   }
 
@@ -502,12 +506,14 @@ public class InProcessEvaluationContextTest {
         context.createRootBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+    context.extractFiredTimers();
     assertThat(context.isDone(), is(false));
 
     context.handleResult(
         context.createRootBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         
StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+    context.extractFiredTimers();
     assertThat(context.isDone(), is(false));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46bc6e14/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index d47cf5e..b6b2bf5 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -47,7 +48,8 @@ import java.util.concurrent.TimeUnit;
  */
 @RunWith(JUnit4.class)
 public class WatermarkCallbackExecutorTest {
-  private WatermarkCallbackExecutor executor = 
WatermarkCallbackExecutor.create();
+  private WatermarkCallbackExecutor executor =
+      WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor());
   private AppliedPTransform<?, ?, ?> create;
   private AppliedPTransform<?, ?, ?> sum;
 


Reply via email to