Hold output watermark according to pending timers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfe2e62d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfe2e62d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfe2e62d Branch: refs/heads/master Commit: dfe2e62d103595583e3ca4594cc03885fe1bba16 Parents: 7f14c46 Author: Kenneth Knowles <k...@google.com> Authored: Tue Dec 20 13:37:40 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 13:45:37 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/WatermarkManager.java | 59 ++++++++++++++++---- 1 file changed, 48 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfe2e62d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index f7bafd1..248fafd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -211,12 +211,18 @@ public class WatermarkManager { private static class AppliedPTransformInputWatermark implements Watermark { private final Collection<? extends Watermark> inputWatermarks; private final SortedMultiset<CommittedBundle<?>> pendingElements; - private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers; + + // This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key + // minimum + private final SortedMultiset<Instant> pendingTimers; // Entries in this table represent the authoritative timestamp for which // a per-key-and-StateNamespace timer is set. private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers; + // This per-key sorted set allows quick retrieval of timers that should fire for a key + private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers; + private AtomicReference<Instant> currentWatermark; public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) { @@ -224,10 +230,13 @@ public class WatermarkManager { // The ordering must order elements by timestamp, and must not compare two distinct elements // as equal. This is built on the assumption that any element added as a pending element will // be consumed without modifications. + // + // The same logic is applied for pending timers Ordering<CommittedBundle<?>> pendingBundleComparator = new BundleByElementTimestampComparator().compound(Ordering.arbitrary()); this.pendingElements = TreeMultiset.create(pendingBundleComparator); + this.pendingTimers = TreeMultiset.create(); this.objectTimers = new HashMap<>(); this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); @@ -278,6 +287,14 @@ public class WatermarkManager { pendingElements.remove(completed); } + private synchronized Instant getEarliestTimerTimestamp() { + if (pendingTimers.isEmpty()) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } else { + return pendingTimers.firstEntry().getElement(); + } + } + private synchronized void updateTimers(TimerUpdate update) { NavigableSet<TimerData> keyTimers = objectTimers.get(update.key); if (keyTimers == null) { @@ -291,27 +308,43 @@ public class WatermarkManager { existingTimers.put(update.key, existingTimersForKey); } - for (TimerData timer : update.setTimers) { + for (TimerData timer : update.getSetTimers()) { + if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + @Nullable + TimerData existingTimer = + existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); + + if (existingTimer == null) { + pendingTimers.add(timer.getTimestamp()); + keyTimers.add(timer); + } else if (!existingTimer.equals(timer)) { + keyTimers.remove(existingTimer); + keyTimers.add(timer); + } // else the timer is already set identically, so noop + + existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); + } + } + + for (TimerData timer : update.getDeletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { @Nullable TimerData existingTimer = existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); if (existingTimer != null) { + pendingTimers.remove(existingTimer.getTimestamp()); keyTimers.remove(existingTimer); + existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId()); } - keyTimers.add(timer); - existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); } } - for (TimerData timer : update.deletedTimers) { + for (TimerData timer : update.getCompletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { - keyTimers.remove(timer); - existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId()); + pendingTimers.remove(timer.getTimestamp()); } } - // We don't keep references to timers that have been fired and delivered via #getFiredTimers() } private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() { @@ -336,11 +369,12 @@ public class WatermarkManager { * {@link #refresh()} for more information. */ private static class AppliedPTransformOutputWatermark implements Watermark { - private final Watermark inputWatermark; + private final AppliedPTransformInputWatermark inputWatermark; private final PerKeyHolds holds; private AtomicReference<Instant> currentWatermark; - public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) { + public AppliedPTransformOutputWatermark( + AppliedPTransformInputWatermark inputWatermark) { this.inputWatermark = inputWatermark; holds = new PerKeyHolds(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); @@ -377,7 +411,10 @@ public class WatermarkManager { @Override public synchronized WatermarkUpdate refresh() { Instant oldWatermark = currentWatermark.get(); - Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold()); + Instant newWatermark = INSTANT_ORDERING.min( + inputWatermark.get(), + inputWatermark.getEarliestTimerTimestamp(), + holds.getMinHold()); newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark); currentWatermark.set(newWatermark); return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);