Use a NavigableSet Instead of a PriorityQueue in WatermarkManager This removes an O(n) call to remove, replacing it with an O(log(n)) call. This significantly improves scaling behavior of the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aeb3b3c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aeb3b3c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aeb3b3c4 Branch: refs/heads/master Commit: aeb3b3c4bfad3e02090b1f7f62695759e17f0189 Parents: 4cb1d10 Author: Thomas Groh <[email protected]> Authored: Wed Oct 26 16:35:31 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Oct 27 12:51:32 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/WatermarkManager.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aeb3b3c4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index f8cbc51..31b8091 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Objects; -import java.util.PriorityQueue; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -388,7 +387,7 @@ public class WatermarkManager { private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers; private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers; - private final PriorityQueue<TimerData> pendingTimers; + private final NavigableSet<TimerData> pendingTimers; private AtomicReference<Instant> earliestHold; @@ -397,7 +396,7 @@ public class WatermarkManager { this.pendingBundles = new HashSet<>(); this.processingTimers = new HashMap<>(); this.synchronizedProcessingTimers = new HashMap<>(); - this.pendingTimers = new PriorityQueue<>(); + this.pendingTimers = new TreeSet<>(); Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE; for (Watermark wm : inputWms) { initialHold = INSTANT_ORDERING.min(initialHold, wm.get()); @@ -466,7 +465,7 @@ public class WatermarkManager { } } if (!pendingTimers.isEmpty()) { - earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest); + earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest); } return earliest; } @@ -630,7 +629,7 @@ public class WatermarkManager { private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural(); /** - * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the + * For each (Object, NavigableSet) pair in the provided map, remove each Timer that is before the * latestTime argument and put in in the result with the same key, then remove all of the keys * which have no more pending timers. * @@ -1003,11 +1002,11 @@ public class WatermarkManager { private static class PerKeyHolds { private final Map<Object, KeyedHold> keyedHolds; - private final PriorityQueue<KeyedHold> allHolds; + private final NavigableSet<KeyedHold> allHolds; private PerKeyHolds() { this.keyedHolds = new HashMap<>(); - this.allHolds = new PriorityQueue<>(); + this.allHolds = new TreeSet<>(); } /** @@ -1015,7 +1014,7 @@ public class WatermarkManager { * there are no holds within this {@link PerKeyHolds}. */ public Instant getMinHold() { - return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp(); + return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.first().getTimestamp(); } /** @@ -1026,7 +1025,7 @@ public class WatermarkManager { removeHold(key); KeyedHold newKeyedHold = KeyedHold.of(key, newHold); keyedHolds.put(key, newKeyedHold); - allHolds.offer(newKeyedHold); + allHolds.add(newKeyedHold); } /**
