Updated Branches: refs/heads/master 125f1b4e1 -> ceaa6a5e0
CRUNCH-210: Remove deprecated MapValuesFn references from cogroup and add support for user-specified parallelism for cogroup jobs Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ceaa6a5e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ceaa6a5e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ceaa6a5e Branch: refs/heads/master Commit: ceaa6a5e0ab4c1d2b23e55c0d9a7cc0b63a41000 Parents: 125f1b4 Author: Josh Wills <[email protected]> Authored: Sat May 25 17:05:31 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Sun May 26 12:08:52 2013 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lib/Cogroup.java | 62 ++++++++++----- 1 files changed, 41 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ceaa6a5e/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java index 07d873c..3bf3e4d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java @@ -19,11 +19,10 @@ package org.apache.crunch.lib; import java.util.Collection; -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; +import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; -import org.apache.crunch.fn.MapValuesFn; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; @@ -34,43 +33,65 @@ public class Cogroup { /** * Co-groups the two {@link PTable} arguments. * - * @return a {@code PTable} representing the co-grouped tables. + * @param left The left (smaller) PTable + * @param right The right (larger) PTable + * @return a {@code PTable} representing the co-grouped tables */ public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(PTable<K, U> left, PTable<K, V> right) { + return cogroup(left, right, 0); + } + + /** + * Co-groups the two {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of + * reducers.) + * + * @param left The left (smaller) PTable + * @param right The right (larger) PTable + * @param numReducers The number of reducers to use + * @return A new {@code PTable} representing the co-grouped tables + */ + public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup( + PTable<K, U> left, + PTable<K, V> right, + int numReducers) { PTypeFamily ptf = left.getTypeFamily(); - PType<K> keyType = left.getPTableType().getKeyType(); PType<U> leftType = left.getPTableType().getValueType(); PType<V> rightType = right.getPTableType().getValueType(); PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType); - PTable<K, Pair<U, V>> cgLeft = left.parallelDo("coGroupTag1", new CogroupFn1<K, U, V>(), - ptf.tableOf(keyType, itype)); - PTable<K, Pair<U, V>> cgRight = right.parallelDo("coGroupTag2", new CogroupFn2<K, U, V>(), - ptf.tableOf(keyType, itype)); + PTable<K, Pair<U, V>> cgLeft = left.mapValues("coGroupTag1", new CogroupFn1<U, V>(), + itype); + PTable<K, Pair<U, V>> cgRight = right.mapValues("coGroupTag2", new CogroupFn2<U, V>(), + itype); + PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), + ptf.collections(rightType)); PTable<K, Pair<U, V>> both = cgLeft.union(cgRight); - - PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), ptf.collections(rightType)); - return both.groupByKey().parallelDo("cogroup", - new PostGroupFn<K, U, V>(leftType, rightType), ptf.tableOf(keyType, otype)); + PGroupedTable<K, Pair<U, V>> grouped = null; + if (numReducers > 0) { + grouped = both.groupByKey(numReducers); + } else { + grouped = both.groupByKey(); + } + return grouped.mapValues("cogroup", new PostGroupFn<U, V>(leftType, rightType), otype); } - private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> { + private static class CogroupFn1<V, U> extends MapFn<V, Pair<V, U>> { @Override public Pair<V, U> map(V v) { return Pair.of(v, null); } } - private static class CogroupFn2<K, V, U> extends MapValuesFn<K, U, Pair<V, U>> { + private static class CogroupFn2<V, U> extends MapFn<U, Pair<V, U>> { @Override public Pair<V, U> map(U u) { return Pair.of(null, u); } } - private static class PostGroupFn<K, V, U> extends - DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> { + private static class PostGroupFn<V, U> extends + MapFn<Iterable<Pair<V, U>>, Pair<Collection<V>, Collection<U>>> { private PType<V> ptypeV; private PType<U> ptypeU; @@ -88,18 +109,17 @@ public class Cogroup { } @Override - public void process(Pair<K, Iterable<Pair<V, U>>> input, - Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) { + public Pair<Collection<V>, Collection<U>> map(Iterable<Pair<V, U>> input) { Collection<V> cv = Lists.newArrayList(); Collection<U> cu = Lists.newArrayList(); - for (Pair<V, U> pair : input.second()) { + for (Pair<V, U> pair : input) { if (pair.first() != null) { cv.add(ptypeV.getDetachedValue(pair.first())); } else if (pair.second() != null) { cu.add(ptypeU.getDetachedValue(pair.second())); } } - emitter.emit(Pair.of(input.first(), Pair.of(cv, cu))); + return Pair.of(cv, cu); } }
