Repository: incubator-beam
Updated Branches:
  refs/heads/master b9116ac42 -> 659cf2ee0


Fix update sequence in InMemoryWatermarkManager

Because the WatermarkManager is not synchronized within the call to
updatePending, the sequence in which pending queues are updated must be
in such a manner as to add additional restrictions, then remove any
restrictions which no longer apply. This ensures any intermediate read
will see at worst a more restricted watermark than the actual watermark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/367f3aca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/367f3aca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/367f3aca

Branch: refs/heads/master
Commit: 367f3acac60fa421b797009eeb131f8c47d75f1d
Parents: b9116ac
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 28 17:38:50 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Mon May 2 11:03:52 2016 -0700

----------------------------------------------------------------------
 .../direct/InMemoryWatermarkManager.java        | 34 +++++++++++++-------
 1 file changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/367f3aca/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 07b6bb4..769457a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -469,9 +469,6 @@ public class InMemoryWatermarkManager {
     }
 
     private synchronized void updateTimers(TimerUpdate update) {
-      for (TimerData completedTimer : update.completedTimers) {
-        pendingTimers.remove(completedTimer);
-      }
       Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
       for (TimerData addedTimer : update.setTimers) {
         NavigableSet<TimerData> timerQueue = 
timerMap.get(addedTimer.getDomain());
@@ -479,6 +476,10 @@ public class InMemoryWatermarkManager {
           timerQueue.add(addedTimer);
         }
       }
+
+      for (TimerData completedTimer : update.completedTimers) {
+        pendingTimers.remove(completedTimer);
+      }
       for (TimerData deletedTimer : update.deletedTimers) {
         NavigableSet<TimerData> timerQueue = 
timerMap.get(deletedTimer.getDomain());
         if (timerQueue != null) {
@@ -832,10 +833,16 @@ public class InMemoryWatermarkManager {
   }
 
   /**
-   * Removes all of the completed Timers from the collection of pending 
timers, adds all new timers,
-   * and removes all deleted timers. Removes all elements consumed by the 
input bundle from the
-   * {@link PTransform PTransforms} collection of pending elements, and adds 
all elements produced
-   * by the {@link PTransform} to the pending queue of each consumer.
+   * First adds all produced elements to the queue of pending elements for 
each consumer, then adds
+   * all pending timers to the collection of pending timers, then removes all 
completed and deleted
+   * timers from the collection of pending timers, then removes all completed 
elements from the
+   * pending queue of the transform.
+   *
+   * <p>It is required that all newly pending elements are added to the queue 
of pending elements
+   * for each consumer prior to the completed elements being removed, as doing 
otherwise could cause
+   * a Watermark to appear in a state in which the upstream (completed) 
element does not hold the
+   * watermark but the element it produced is not yet pending. This can cause 
the watermark to
+   * erroneously advance.
    */
   private void updatePending(
       CommittedBundle<?> input,
@@ -843,17 +850,22 @@ public class InMemoryWatermarkManager {
       TimerUpdate timerUpdate,
       Iterable<? extends CommittedBundle<?>> outputs) {
     TransformWatermarks completedTransform = 
transformToWatermarks.get(transform);
-    completedTransform.updateTimers(timerUpdate);
-    if (input != null) {
-      completedTransform.removePending(input);
-    }
 
+    // Newly pending elements must be added before completed elements are 
removed, as the two
+    // do not share a Mutex within this call and thus can be interleaved with 
external calls to
+    // refresh.
     for (CommittedBundle<?> bundle : outputs) {
       for (AppliedPTransform<?, ?, ?> consumer : 
consumers.get(bundle.getPCollection())) {
         TransformWatermarks watermarks = transformToWatermarks.get(consumer);
         watermarks.addPending(bundle);
       }
     }
+
+    completedTransform.updateTimers(timerUpdate);
+    if (input != null) {
+      completedTransform.removePending(input);
+    }
+
   }
 
   /**

Reply via email to