Updated Branches: refs/heads/master 643e41063 -> 181b476fe
CRUNCH-174: Add support for cogrouping 3, 4, or N inputs. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/181b476f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/181b476f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/181b476f Branch: refs/heads/master Commit: 181b476fe25c9ba5efac7d65edcbfc8b27ae1077 Parents: 643e410 Author: Josh Wills <[email protected]> Authored: Wed Jul 17 19:30:21 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Wed Jul 17 22:47:22 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/CogroupIT.java | 82 +++++- .../src/main/java/org/apache/crunch/Tuple3.java | 37 +++ .../src/main/java/org/apache/crunch/Tuple4.java | 43 +++ .../java/org/apache/crunch/lib/Cogroup.java | 259 +++++++++++++++---- 4 files changed, 374 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java index 0d8b2b8..16c4c69 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -29,6 +29,8 @@ import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; @@ -53,13 +55,16 @@ public class CogroupIT { private MRPipeline pipeline; private PCollection<String> lines1; private PCollection<String> lines2; - + private PCollection<String> lines3; + private PCollection<String> lines4; @Before public void setUp() throws IOException { pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()); lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt"))); lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt"))); + lines3 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt"))); + lines4 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt"))); } @After @@ -77,6 +82,26 @@ public class CogroupIT { runCogroup(AvroTypeFamily.getInstance()); } + @Test + public void testCogroup3Writables() { + runCogroup3(WritableTypeFamily.getInstance()); + } + + @Test + public void testCogroup3Avro() { + runCogroup3(AvroTypeFamily.getInstance()); + } + + @Test + public void testCogroup4Writables() { + runCogroup3(WritableTypeFamily.getInstance()); + } + + @Test + public void testCogroup4Avro() { + runCogroup3(AvroTypeFamily.getInstance()); + } + public void runCogroup(PTypeFamily ptf) { PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); @@ -102,7 +127,62 @@ public class CogroupIT { assertThat(actual, is(expected)); } + public void runCogroup3(PTypeFamily ptf) { + PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); + PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt); + PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt); + PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt); + + PTable<String, Tuple3.Collect<String, String, String>> cg = Cogroup.cogroup(kv1, kv2, kv3); + + Map<String, Tuple3.Collect<String, String, String>> result = cg.materializeToMap(); + Map<String, Tuple3.Collect<String, String, String>> actual = Maps.newHashMap(); + for (Map.Entry<String, Tuple3.Collect<String, String, String>> e : result.entrySet()) { + Collection<String> one = ImmutableSet.copyOf(e.getValue().first()); + Collection<String> two = ImmutableSet.copyOf(e.getValue().second()); + Collection<String> three = ImmutableSet.copyOf(e.getValue().third()); + actual.put(e.getKey(), new Tuple3.Collect<String, String, String>(one, two, three)); + } + Map<String, Tuple3.Collect<String, String, String>> expected = ImmutableMap.of( + "a", new Tuple3.Collect<String, String, String>(coll("1-1", "1-4"), coll(), coll("1-1", "1-4")), + "b", new Tuple3.Collect<String, String, String>(coll("1-2"), coll("2-1"), coll("1-2")), + "c", new Tuple3.Collect<String, String, String>(coll("1-3"), coll("2-2", "2-3"), coll("1-3")), + "d", new Tuple3.Collect<String, String, String>(coll(), coll("2-4"), coll()) + ); + + assertThat(actual, is(expected)); + } + + public void runCogroup4(PTypeFamily ptf) { + PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); + + PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt); + PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt); + PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt); + PTable<String, String> kv4 = lines4.parallelDo("kv4", new KeyValueSplit(), tt); + + PTable<String, Tuple4.Collect<String, String, String, String>> cg = Cogroup.cogroup(kv1, kv2, kv3, kv4); + + Map<String, Tuple4.Collect<String, String, String, String>> result = cg.materializeToMap(); + Map<String, Tuple4.Collect<String, String, String, String>> actual = Maps.newHashMap(); + for (Map.Entry<String, Tuple4.Collect<String, String, String, String>> e : result.entrySet()) { + Collection<String> one = ImmutableSet.copyOf(e.getValue().first()); + Collection<String> two = ImmutableSet.copyOf(e.getValue().second()); + Collection<String> three = ImmutableSet.copyOf(e.getValue().third()); + Collection<String> four = ImmutableSet.copyOf(e.getValue().fourth()); + actual.put(e.getKey(), new Tuple4.Collect<String, String, String, String>(one, two, three, four)); + } + Map<String, Tuple4.Collect<String, String, String, String>> expected = ImmutableMap.of( + "a", new Tuple4.Collect<String, String, String, String>(coll("1-1", "1-4"), coll(), coll("1-1", "1-4"), coll()), + "b", new Tuple4.Collect<String, String, String, String>(coll("1-2"), coll("2-1"), coll("1-2"), coll("2-1")), + "c", new Tuple4.Collect<String, String, String, String>(coll("1-3"), coll("2-2", "2-3"), coll("1-3"), coll("2-2", "2-3")), + "d", new Tuple4.Collect<String, String, String, String>(coll(), coll("2-4"), coll(), coll("2-4")) + ); + + assertThat(actual, is(expected)); + } + private static class KeyValueSplit extends DoFn<String, Pair<String, String>> { @Override public void process(String input, Emitter<Pair<String, String>> emitter) { http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/Tuple3.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java index 4372811..922ed07 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java +++ b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java @@ -17,13 +17,50 @@ */ package org.apache.crunch; +import java.util.Collection; + import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; /** * A convenience class for three-element {@link Tuple}s. */ public class Tuple3<V1, V2, V3> implements Tuple { + public static class Collect<V1, V2, V3> extends Tuple3<Collection<V1>, Collection<V2>, Collection<V3>> { + + public static <V1, V2, V3> PType<Tuple3.Collect<V1, V2, V3>> derived(PType<V1> first, + PType<V2> second, PType<V3> third) { + PTypeFamily tf = first.getFamily(); + PType<Tuple3<Collection<V1>, Collection<V2>, Collection<V3>>> pt = + tf.triples( + tf.collections(first), + tf.collections(second), + tf.collections(third)); + Object clazz = Tuple3.Collect.class; + return tf.derived((Class<Tuple3.Collect<V1, V2, V3>>) clazz, + new MapFn<Tuple3<Collection<V1>, Collection<V2>, Collection<V3>>, Collect<V1, V2, V3>>() { + @Override + public Collect<V1, V2, V3> map( + Tuple3<Collection<V1>, Collection<V2>, Collection<V3>> in) { + return new Collect<V1, V2, V3>(in.first(), in.second(), in.third()); + } + }, + new MapFn<Collect<V1, V2, V3>, Tuple3<Collection<V1>, Collection<V2>, Collection<V3>>>() { + @Override + public Tuple3<Collection<V1>, Collection<V2>, Collection<V3>> map( + Collect<V1, V2, V3> in) { + return in; + } + }, pt); + } + + public Collect(Collection<V1> first, Collection<V2> second, Collection<V3> third) { + super(first, second, third); + } + } + private final V1 first; private final V2 second; private final V3 third; http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/Tuple4.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java index f161371..94d23fd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java +++ b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java @@ -17,13 +17,56 @@ */ package org.apache.crunch; +import java.util.Collection; + import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; /** * A convenience class for four-element {@link Tuple}s. */ public class Tuple4<V1, V2, V3, V4> implements Tuple { + public static class Collect<V1, V2, V3, V4> extends Tuple4< + Collection<V1>, + Collection<V2>, + Collection<V3>, + Collection<V4>> { + + public static <V1, V2, V3, V4> PType<Tuple4.Collect<V1, V2, V3, V4>> derived(PType<V1> first, + PType<V2> second, PType<V3> third, PType<V4> fourth) { + PTypeFamily tf = first.getFamily(); + PType<Tuple4<Collection<V1>, Collection<V2>, Collection<V3>, Collection<V4>>> pt = + tf.quads( + tf.collections(first), + tf.collections(second), + tf.collections(third), + tf.collections(fourth)); + Object clazz = Tuple4.Collect.class; + return tf.derived((Class<Tuple4.Collect<V1, V2, V3, V4>>) clazz, + new MapFn<Tuple4<Collection<V1>, Collection<V2>, Collection<V3>, Collection<V4>>, + Collect<V1, V2, V3, V4>>() { + @Override + public Collect<V1, V2, V3, V4> map( + Tuple4<Collection<V1>, Collection<V2>, Collection<V3>, Collection<V4>> in) { + return new Collect<V1, V2, V3, V4>(in.first(), in.second(), in.third(), in.fourth()); + } + }, + new MapFn<Collect<V1, V2, V3, V4>, Tuple4<Collection<V1>, Collection<V2>, Collection<V3>, Collection<V4>>>() { + @Override + public Tuple4<Collection<V1>, Collection<V2>, Collection<V3>, Collection<V4>> map( + Collect<V1, V2, V3, V4> input) { + return input; + } + }, pt); + } + + public Collect(Collection<V1> first, Collection<V2> second, Collection<V3> third, Collection<V4> fourth) { + super(first, second, third, fourth); + } + } + private final V1 first; private final V2 second; private final V3 third; http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java index 3bf3e4d..7f5f70d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java @@ -23,8 +23,13 @@ import org.apache.crunch.MapFn; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.Tuple; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.TupleFactory; import com.google.common.collect.Lists; @@ -38,88 +43,250 @@ public class Cogroup { * @return a {@code PTable} representing the co-grouped tables */ public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(PTable<K, U> left, PTable<K, V> right) { - return cogroup(left, right, 0); + return cogroup(0, left, right); } /** * Co-groups the two {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of * reducers.) * + * @param numReducers The number of reducers to use * @param left The left (smaller) PTable * @param right The right (larger) PTable - * @param numReducers The number of reducers to use * @return A new {@code PTable} representing the co-grouped tables */ public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup( + int numReducers, PTable<K, U> left, - PTable<K, V> right, - int numReducers) { - PTypeFamily ptf = left.getTypeFamily(); - PType<U> leftType = left.getPTableType().getValueType(); - PType<V> rightType = right.getPTableType().getValueType(); - PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType); + PTable<K, V> right) { + PTypeFamily tf = left.getTypeFamily(); + return cogroup( + tf.pairs(tf.collections(left.getValueType()), + tf.collections(right.getValueType())), + TupleFactory.PAIR, + numReducers, + left, right); + } - PTable<K, Pair<U, V>> cgLeft = left.mapValues("coGroupTag1", new CogroupFn1<U, V>(), - itype); - PTable<K, Pair<U, V>> cgRight = right.mapValues("coGroupTag2", new CogroupFn2<U, V>(), + /** + * Co-groups the three {@link PTable} arguments. + * + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return a {@code PTable} representing the co-grouped tables + */ + public static <K, V1, V2, V3> PTable<K, Tuple3.Collect<V1, V2, V3>> cogroup( + PTable<K, V1> first, + PTable<K, V2> second, + PTable<K, V3> third) { + return cogroup(0, first, second, third); + } + + /** + * Co-groups the three {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of + * reducers.) + * + * @param numReducers The number of reducers to use + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return A new {@code PTable} representing the co-grouped tables + */ + public static <K, V1, V2, V3> PTable<K, Tuple3.Collect<V1, V2, V3>> cogroup( + int numReducers, + PTable<K, V1> first, + PTable<K, V2> second, + PTable<K, V3> third) { + return cogroup( + Tuple3.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType()), + new TupleFactory<Tuple3.Collect<V1, V2, V3>>() { + @Override + public Tuple3.Collect<V1, V2, V3> makeTuple(Object... values) { + return new Tuple3.Collect<V1, V2, V3>( + (Collection<V1>) values[0], + (Collection<V2>) values[1], + (Collection<V3>) values[2]); + } + }, + numReducers, + first, second, third); + } + + /** + * Co-groups the three {@link PTable} arguments. + * + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return a {@code PTable} representing the co-grouped tables + */ + public static <K, V1, V2, V3, V4> PTable<K, Tuple4.Collect<V1, V2, V3, V4>> cogroup( + PTable<K, V1> first, + PTable<K, V2> second, + PTable<K, V3> third, + PTable<K, V4> fourth) { + return cogroup(0, first, second, third, fourth); + } + + /** + * Co-groups the three {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of + * reducers.) + * + * @param numReducers The number of reducers to use + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return A new {@code PTable} representing the co-grouped tables + */ + public static <K, V1, V2, V3, V4> PTable<K, Tuple4.Collect<V1, V2, V3, V4>> cogroup( + int numReducers, + PTable<K, V1> first, + PTable<K, V2> second, + PTable<K, V3> third, + PTable<K, V4> fourth) { + return cogroup( + Tuple4.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType(), + fourth.getValueType()), + new TupleFactory<Tuple4.Collect<V1, V2, V3, V4>>() { + @Override + public Tuple4.Collect<V1, V2, V3, V4> makeTuple(Object... values) { + return new Tuple4.Collect<V1, V2, V3, V4>( + (Collection<V1>) values[0], + (Collection<V2>) values[1], + (Collection<V3>) values[2], + (Collection<V4>) values[3]); + } + }, + numReducers, + first, second, third); + } + + /** + * Co-groups an arbitrary number of {@link PTable} arguments. The largest table should + * come last in the ordering. + * + * @param first The first (smallest) PTable to co-group + * @param rest The other (larger) PTables to co-group + * @return a {@code PTable} representing the co-grouped tables + */ + public static <K> PTable<K, TupleN> cogroup(PTable<K, ?> first, PTable<K, ?>... rest) { + return cogroup(0, first, rest); + } + + /** + * Co-groups an arbitrary number of {@link PTable} arguments with a user-specified degree of parallelism + * (a.k.a, number of reducers.) The largest table should come last in the ordering. + * + * @param numReducers The number of reducers to use + * @param first The first (smallest) PTable to co-group + * @param rest The other (larger) PTables to co-group + * @return A new {@code PTable} representing the co-grouped tables + */ + public static <K, U, V> PTable<K, TupleN> cogroup( + int numReducers, + PTable<K, ?> first, + PTable<K, ?>... rest) { + PTypeFamily tf = first.getTypeFamily(); + PType[] components = new PType[1 + rest.length]; + components[0] = tf.collections(first.getValueType()); + for (int i = 0; i < rest.length; i++) { + components[i + 1] = rest[i].getValueType(); + } + return cogroup( + tf.tuples(components), + TupleFactory.TUPLEN, + numReducers, + first, rest); + } + + private static <K, T extends Tuple> PTable<K, T> cogroup( + PType<T> outputType, + TupleFactory tupleFactory, + int numReducers, + PTable<K, ?> first, PTable<K, ?>... rest) { + PTypeFamily ptf = first.getTypeFamily(); + PType[] ptypes = new PType[1 + rest.length]; + ptypes[0] = first.getValueType(); + for (int i = 0; i < rest.length; i++) { + ptypes[i + 1] = rest[i].getValueType(); + } + PType<TupleN> itype = ptf.tuples(ptypes); + + PTable<K, TupleN> firstInter = first.mapValues("coGroupTag1", + new CogroupFn(0, 1 + rest.length), itype); - - PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), - ptf.collections(rightType)); - PTable<K, Pair<U, V>> both = cgLeft.union(cgRight); - PGroupedTable<K, Pair<U, V>> grouped = null; + PTable<K, TupleN>[] inter = new PTable[rest.length]; + for (int i = 0; i < rest.length; i++) { + inter[i] = rest[i].mapValues("coGroupTag" + (i + 2), + new CogroupFn(i + 1, 1 + rest.length), + itype); + } + + PTable<K, TupleN> union = firstInter.union(inter); + PGroupedTable<K, TupleN> grouped = null; if (numReducers > 0) { - grouped = both.groupByKey(numReducers); + grouped = union.groupByKey(numReducers); } else { - grouped = both.groupByKey(); + grouped = union.groupByKey(); } - return grouped.mapValues("cogroup", new PostGroupFn<U, V>(leftType, rightType), otype); + + return grouped.mapValues("cogroup", + new PostGroupFn<T>(tupleFactory, ptypes), + outputType); } - - private static class CogroupFn1<V, U> extends MapFn<V, Pair<V, U>> { - @Override - public Pair<V, U> map(V v) { - return Pair.of(v, null); + + private static class CogroupFn<T> extends MapFn<T, TupleN> { + private final int index; + private final int size; + + public CogroupFn(int index, int size) { + this.index = index; + this.size = size; } - } - private static class CogroupFn2<V, U> extends MapFn<U, Pair<V, U>> { @Override - public Pair<V, U> map(U u) { - return Pair.of(null, u); + public TupleN map(T input) { + Object[] v = new Object[size]; + v[index] = input; + return TupleN.of(v); } } - private static class PostGroupFn<V, U> extends - MapFn<Iterable<Pair<V, U>>, Pair<Collection<V>, Collection<U>>> { + private static class PostGroupFn<T extends Tuple> extends + MapFn<Iterable<TupleN>, T> { - private PType<V> ptypeV; - private PType<U> ptypeU; + private final TupleFactory factory; + private final PType[] ptypes; - public PostGroupFn(PType<V> ptypeV, PType<U> ptypeU) { - this.ptypeV = ptypeV; - this.ptypeU = ptypeU; + public PostGroupFn(TupleFactory tf, PType... ptypes) { + this.factory = tf; + this.ptypes = ptypes; } @Override public void initialize() { super.initialize(); - ptypeV.initialize(getConfiguration()); - ptypeU.initialize(getConfiguration()); + for (PType pt : ptypes) { + pt.initialize(getConfiguration()); + } } @Override - public Pair<Collection<V>, Collection<U>> map(Iterable<Pair<V, U>> input) { - Collection<V> cv = Lists.newArrayList(); - Collection<U> cu = Lists.newArrayList(); - for (Pair<V, U> pair : input) { - if (pair.first() != null) { - cv.add(ptypeV.getDetachedValue(pair.first())); - } else if (pair.second() != null) { - cu.add(ptypeU.getDetachedValue(pair.second())); + public T map(Iterable<TupleN> input) { + Collection[] collections = new Collection[ptypes.length]; + for (int i = 0; i < ptypes.length; i++) { + collections[i] = Lists.newArrayList(); + } + for (TupleN t : input) { + for (int i = 0; i < ptypes.length; i++) { + if (t.get(i) != null) { + collections[i].add(ptypes[i].getDetachedValue(t.get(i))); + break; + } } } - return Pair.of(cv, cu); + return (T) factory.makeTuple(collections); } }
