Repository: incubator-beam Updated Branches: refs/heads/master b9116ac42 -> 659cf2ee0
Fix update sequence in InMemoryWatermarkManager Because the WatermarkManager is not synchronized within the call to updatePending, the sequence in which pending queues are updated must be in such a manner as to add additional restrictions, then remove any restrictions which no longer apply. This ensures any intermediate read will see at worst a more restricted watermark than the actual watermark. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/367f3aca Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/367f3aca Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/367f3aca Branch: refs/heads/master Commit: 367f3acac60fa421b797009eeb131f8c47d75f1d Parents: b9116ac Author: Thomas Groh <tg...@google.com> Authored: Thu Apr 28 17:38:50 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Mon May 2 11:03:52 2016 -0700 ---------------------------------------------------------------------- .../direct/InMemoryWatermarkManager.java | 34 +++++++++++++------- 1 file changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/367f3aca/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java index 07b6bb4..769457a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java @@ -469,9 +469,6 @@ public class InMemoryWatermarkManager { } private synchronized void updateTimers(TimerUpdate update) { - for (TimerData completedTimer : update.completedTimers) { - pendingTimers.remove(completedTimer); - } Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key); for (TimerData addedTimer : update.setTimers) { NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain()); @@ -479,6 +476,10 @@ public class InMemoryWatermarkManager { timerQueue.add(addedTimer); } } + + for (TimerData completedTimer : update.completedTimers) { + pendingTimers.remove(completedTimer); + } for (TimerData deletedTimer : update.deletedTimers) { NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain()); if (timerQueue != null) { @@ -832,10 +833,16 @@ public class InMemoryWatermarkManager { } /** - * Removes all of the completed Timers from the collection of pending timers, adds all new timers, - * and removes all deleted timers. Removes all elements consumed by the input bundle from the - * {@link PTransform PTransforms} collection of pending elements, and adds all elements produced - * by the {@link PTransform} to the pending queue of each consumer. + * First adds all produced elements to the queue of pending elements for each consumer, then adds + * all pending timers to the collection of pending timers, then removes all completed and deleted + * timers from the collection of pending timers, then removes all completed elements from the + * pending queue of the transform. + * + * <p>It is required that all newly pending elements are added to the queue of pending elements + * for each consumer prior to the completed elements being removed, as doing otherwise could cause + * a Watermark to appear in a state in which the upstream (completed) element does not hold the + * watermark but the element it produced is not yet pending. This can cause the watermark to + * erroneously advance. */ private void updatePending( CommittedBundle<?> input, @@ -843,17 +850,22 @@ public class InMemoryWatermarkManager { TimerUpdate timerUpdate, Iterable<? extends CommittedBundle<?>> outputs) { TransformWatermarks completedTransform = transformToWatermarks.get(transform); - completedTransform.updateTimers(timerUpdate); - if (input != null) { - completedTransform.removePending(input); - } + // Newly pending elements must be added before completed elements are removed, as the two + // do not share a Mutex within this call and thus can be interleaved with external calls to + // refresh. for (CommittedBundle<?> bundle : outputs) { for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) { TransformWatermarks watermarks = transformToWatermarks.get(consumer); watermarks.addPending(bundle); } } + + completedTransform.updateTimers(timerUpdate); + if (input != null) { + completedTransform.removePending(input); + } + } /**