Updated Branches: refs/heads/master a3dd33f45 -> 643e41063
CRUNCH-238: Add numReducers options to the SecondarySort lib Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/643e4106 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/643e4106 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/643e4106 Branch: refs/heads/master Commit: 643e4106374c95b2f17c63da3800dbdde991e341 Parents: a3dd33f Author: Josh Wills <[email protected]> Authored: Wed Jul 17 18:17:08 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Wed Jul 17 18:17:08 2013 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/lib/SecondarySort.java | 58 +++++++++++++++----- 1 file changed, 45 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/643e4106/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java index 54b4396..32bff38 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java @@ -47,24 +47,54 @@ public class SecondarySort { * Perform a secondary sort on the given {@code PTable} instance and then apply a * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>}. */ - public static <K, V1, V2, T> PCollection<T> sortAndApply(PTable<K, Pair<V1, V2>> input, - DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, PType<T> ptype) { - return prepare(input) - .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype); + public static <K, V1, V2, T> PCollection<T> sortAndApply( + PTable<K, Pair<V1, V2>> input, + DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, + PType<T> ptype) { + return sortAndApply(input, doFn, ptype, -1); } /** * Perform a secondary sort on the given {@code PTable} instance and then apply a + * {@code DoFn} to the resulting sorted data to yield an output {@code PCollection<T>}, using + * the given number of reducers. + */ + public static <K, V1, V2, T> PCollection<T> sortAndApply( + PTable<K, Pair<V1, V2>> input, + DoFn<Pair<K, Iterable<Pair<V1, V2>>>, T> doFn, + PType<T> ptype, + int numReducers) { + return prepare(input, numReducers) + .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, T>(doFn), ptype); + } + + /** + * Perform a secondary sort on the given {@code PTable} instance and then apply a * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>}. */ - public static <K, V1, V2, U, V> PTable<U, V> sortAndApply(PTable<K, Pair<V1, V2>> input, - DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, PTableType<U, V> ptype) { - return prepare(input) + public static <K, V1, V2, U, V> PTable<U, V> sortAndApply( + PTable<K, Pair<V1, V2>> input, + DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, + PTableType<U, V> ptype) { + return sortAndApply(input, doFn, ptype, -1); + } + + /** + * Perform a secondary sort on the given {@code PTable} instance and then apply a + * {@code DoFn} to the resulting sorted data to yield an output {@code PTable<U, V>}, using + * the given number of reducers. + */ + public static <K, V1, V2, U, V> PTable<U, V> sortAndApply( + PTable<K, Pair<V1, V2>> input, + DoFn<Pair<K, Iterable<Pair<V1, V2>>>, Pair<U, V>> doFn, + PTableType<U, V> ptype, + int numReducers) { + return prepare(input, numReducers) .parallelDo("SecondarySort.apply", new SSWrapFn<K, V1, V2, Pair<U, V>>(doFn), ptype); } private static <K, V1, V2> PGroupedTable<Pair<K, V1>, Pair<V1, V2>> prepare( - PTable<K, Pair<V1, V2>> input) { + PTable<K, Pair<V1, V2>> input, int numReducers) { PTypeFamily ptf = input.getTypeFamily(); PType<Pair<V1, V2>> valueType = input.getValueType(); PTableType<Pair<K, V1>, Pair<V1, V2>> inter = ptf.tableOf( @@ -72,12 +102,14 @@ public class SecondarySort { valueType); PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(), ptf.collections(input.getValueType())); + GroupingOptions.Builder gob = GroupingOptions.builder() + .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)) + .partitionerClass(JoinUtils.getPartitionerClass(ptf)); + if (numReducers > 0) { + gob.numReducers(numReducers); + } return input.parallelDo("SecondarySort.format", new SSFormatFn<K, V1, V2>(), inter) - .groupByKey( - GroupingOptions.builder() - .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf)) - .partitionerClass(JoinUtils.getPartitionerClass(ptf)) - .build()); + .groupByKey(gob.build()); } private static class SSFormatFn<K, V1, V2> extends MapFn<Pair<K, Pair<V1, V2>>, Pair<Pair<K, V1>, Pair<V1, V2>>> {
