Updated Branches: refs/heads/master ac17f4f72 -> acdf396b5
CRUNCH-265: Allow clients to specify the number of reducers to use for default and one-to-many joins Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/acdf396b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/acdf396b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/acdf396b Branch: refs/heads/master Commit: acdf396b516e9a9c8348eb041be6adc1cd0de278 Parents: ac17f4f Author: Josh Wills <[email protected]> Authored: Sun Sep 15 13:14:56 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Sep 15 13:19:46 2013 -0700 ---------------------------------------------------------------------- .../crunch/lib/join/DefaultJoinStrategy.java | 21 ++++++++++++++++---- .../apache/crunch/lib/join/OneToManyJoin.java | 19 ++++++++++++++++-- 2 files changed, 34 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/acdf396b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java index 3edceeb..bfc8ab3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java @@ -32,7 +32,17 @@ import org.apache.crunch.types.PTypeFamily; * efficient due to its passing all data through the shuffle phase. */ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { - + + private final int numReducers; + + public DefaultJoinStrategy() { + this(-1); + } + + public DefaultJoinStrategy(int numReducers) { + this.numReducers = numReducers; + } + @Override public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) { switch (joinType) { @@ -60,14 +70,15 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { */ public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinFn<K, U, V> joinFn) { PTypeFamily ptf = left.getTypeFamily(); - PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right); + PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right, numReducers); PTableType<K, Pair<U, V>> ret = ptf .tableOf(left.getKeyType(), ptf.pairs(left.getValueType(), right.getValueType())); return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret); } - static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) { + static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right, + int numReducers) { PTypeFamily ptf = left.getTypeFamily(); PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()), ptf.pairs(left.getValueType(), right.getValueType())); @@ -89,7 +100,9 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { GroupingOptions.Builder optionsBuilder = GroupingOptions.builder(); optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf)); - + if (numReducers > 0) { + optionsBuilder.numReducers(numReducers); + } return (tag1.union(tag2)).groupByKey(optionsBuilder.build()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/acdf396b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java index 25556ec..c09fd05 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java @@ -62,9 +62,24 @@ public class OneToManyJoin { * @return the post-processed output of the join */ public static <K, U, V, T> PCollection<T> oneToManyJoin(PTable<K, U> left, PTable<K, V> right, - DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype) { + DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype) { + return oneToManyJoin(left, right, postProcessFn, ptype, -1); + } + + /** + * Supports a user-specified number of reducers for the one-to-many join. + * + * @param left left-side table to join + * @param right right-side table to join + * @param postProcessFn DoFn to process the results of the join + * @param ptype type of the output of the postProcessFn + * @param numReducers The number of reducers to use + * @return the post-processed output of the join + */ + public static <K, U, V, T> PCollection<T> oneToManyJoin(PTable<K, U> left, PTable<K, V> right, + DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype, int numReducers) { - PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = DefaultJoinStrategy.preJoin(left, right); + PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = DefaultJoinStrategy.preJoin(left, right, numReducers); return grouped.parallelDo("One to many join " + grouped.getName(), new OneToManyJoinFn<K, U, V, T>(left.getValueType(), postProcessFn), ptype); }
