Repository: crunch Updated Branches: refs/heads/master f5d3858aa -> dee0fcf51
CRUNCH-458: Eliminate random split decisions with TreeSets/TreeMap Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dee0fcf5 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dee0fcf5 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dee0fcf5 Branch: refs/heads/master Commit: dee0fcf51bfaa96c3dde773bbc8745fcc1ecb6f5 Parents: f5d3858 Author: Josh Wills <[email protected]> Authored: Wed Aug 6 18:12:19 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Aug 12 09:05:09 2014 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/impl/mr/plan/Edge.java | 30 ++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/dee0fcf5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java index 4006930..111905c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java @@ -18,6 +18,7 @@ package org.apache.crunch.impl.mr.plan; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,7 +43,7 @@ class Edge { Edge(Vertex head, Vertex tail) { this.head = head; this.tail = tail; - this.paths = Sets.newHashSet(); + this.paths = Sets.newTreeSet(NODE_CMP); } public Vertex getHead() { @@ -68,7 +69,7 @@ class Edge { public Map<NodePath, PCollectionImpl> getSplitPoints(boolean breakpointsOnly) { List<NodePath> np = Lists.newArrayList(paths); List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size()); - Map<PCollectionImpl<?>, Set<Integer>> pathCounts = Maps.newHashMap(); + Map<PCollectionImpl<?>, Set<Integer>> pathCounts = Maps.newTreeMap(PCOL_CMP); Map<NodePath, PCollectionImpl> splitPoints = Maps.newHashMap(); for (int i = 0; i < np.size(); i++) { long bestSize = Long.MAX_VALUE; @@ -165,4 +166,29 @@ class Edge { public String toString() { return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); } + + private static Comparator<NodePath> NODE_CMP = new Comparator<NodePath>() { + @Override + public int compare(NodePath left, NodePath right) { + if (left == right || left.equals(right)) { + return 0; + } + return left.toString().compareTo(right.toString()); + } + }; + + private static Comparator<PCollectionImpl<?>> PCOL_CMP = new Comparator<PCollectionImpl<?>>() { + @Override + public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) { + if (left == right || left.equals(right)) { + return 0; + } + String leftName = left.getName(); + String rightName = right.getName(); + if (leftName == null || rightName == null || leftName.equals(rightName)) { + return left.hashCode() < right.hashCode() ? -1 : 1; + } + return leftName.compareTo(rightName); + } + }; }
