Updated Branches: refs/heads/apache-crunch-0.8 96e392b44 -> 1a34cd097
CRUNCH-338 Correct Cogroup TupleN output PType Fix the PType generated for TupleNs so that it is effectively a TupleN of Collections. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1a34cd09 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1a34cd09 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1a34cd09 Branch: refs/heads/apache-crunch-0.8 Commit: 1a34cd09758ea30d60e1d4f38cff3e60c1c2c12e Parents: 96e392b Author: Gabriel Reid <[email protected]> Authored: Fri Feb 7 00:14:55 2014 +0100 Committer: Gabriel Reid <[email protected]> Committed: Fri Feb 7 08:48:30 2014 +0100 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/CogroupIT.java | 74 +++++++++++++++----- .../java/org/apache/crunch/lib/Cogroup.java | 7 +- 2 files changed, 59 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/1a34cd09/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java index 1a0bfe9..191c737 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -17,25 +17,17 @@ */ package org.apache.crunch.lib; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; - -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.PCollection; -import org.apache.crunch.PTable; -import org.apache.crunch.Pair; -import org.apache.crunch.Tuple3; -import org.apache.crunch.Tuple4; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.crunch.*; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.test.Tests; import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; import org.apache.crunch.types.writable.WritableTypeFamily; @@ -44,9 +36,13 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; public class CogroupIT { @@ -101,7 +97,17 @@ public class CogroupIT { public void testCogroup4Avro() { runCogroup4(AvroTypeFamily.getInstance()); } - + + @Test + public void testCogroupNWritables() { + runCogroupN(WritableTypeFamily.getInstance()); + } + + @Test + public void testCogroupNAvro() { + runCogroupN(AvroTypeFamily.getInstance()); + } + public void runCogroup(PTypeFamily ptf) { PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); @@ -182,6 +188,38 @@ public class CogroupIT { assertThat(actual, is(expected)); } + + public void runCogroupN(PTypeFamily ptf) { + PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); + + PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt); + PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt); + + PTable<String, TupleN> cg = Cogroup.cogroup(kv1, new PTable[]{kv2}); + + Map<String, TupleN> result = cg.materializeToMap(); + Map<String, TupleN> actual = Maps.newHashMap(); + for (Map.Entry<String, TupleN> e : result.entrySet()) { + Collection<String> one = ImmutableSet.copyOf((Collection<? extends String>) e.getValue().get(0)); + Collection<String> two = ImmutableSet.copyOf((Collection<? extends String>)e.getValue().get(1)); + actual.put(e.getKey(), TupleN.of(one, two)); + } + Map<String, TupleN> expected = ImmutableMap.of( + "a", TupleN.of(coll("1-1", "1-4"), coll()), + "b", TupleN.of(coll("1-2"), coll("2-1")), + "c", TupleN.of(coll("1-3"), coll("2-2", "2-3")), + "d", TupleN.of(coll(), coll("2-4")) + ); + + assertThat(actual, is(expected)); + + PType<TupleN> tupleValueType = cg.getValueType(); + List<PType> expectedSubtypes = ImmutableList.<PType>of( + ptf.collections(ptf.strings()), + ptf.collections(ptf.strings())); + + assertThat(tupleValueType.getSubTypes(), is(expectedSubtypes)); + } private static class KeyValueSplit extends DoFn<String, Pair<String, String>> { @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/1a34cd09/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java index 8743a29..63d6f62 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java @@ -17,8 +17,7 @@ */ package org.apache.crunch.lib; -import java.util.Collection; - +import com.google.common.collect.Lists; import org.apache.crunch.MapFn; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; @@ -32,7 +31,7 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.TupleFactory; -import com.google.common.collect.Lists; +import java.util.Collection; public class Cogroup { @@ -192,7 +191,7 @@ public class Cogroup { PType[] components = new PType[1 + rest.length]; components[0] = tf.collections(first.getValueType()); for (int i = 0; i < rest.length; i++) { - components[i + 1] = rest[i].getValueType(); + components[i + 1] = tf.collections(rest[i].getValueType()); } return cogroup( tf.tuples(components),
