Updated Branches: refs/heads/master 1c95647be -> b0165faae
CRUNCH-88 - route grouped table to multiple outputs Fix issue where the contents of a PGroupedTableImpl could only successfully be sent to a single PCollection down the pipeline. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b0165faa Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b0165faa Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b0165faa Branch: refs/heads/master Commit: b0165faaea013a0d6124308154039c33c7b85002 Parents: 1c95647 Author: Gabriel Reid <[email protected]> Authored: Thu Oct 4 22:40:59 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Sat Oct 6 09:08:08 2012 +0200 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/MRPipelineIT.java | 28 +++++++++++++++ .../crunch/impl/mr/collect/PCollectionImpl.java | 13 ++++++- .../crunch/impl/mr/collect/PGroupedTableImpl.java | 11 +++++- 3 files changed, 48 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java index e4ff91d..0865820 100644 --- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java +++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java @@ -17,13 +17,19 @@ */ package org.apache.crunch; +import static org.junit.Assert.assertTrue; + import java.io.File; +import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.To; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; import org.junit.Rule; import org.junit.Test; @@ -51,5 +57,27 @@ public class MRPipelineIT implements Serializable { write.materialize(); pipeline.run(); } + + + + @Test + public void testPGroupedTableToMultipleOutputs() throws IOException{ + Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration()); + PGroupedTable<String, String> groupedLineTable = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")).by(IdentityFn.<String>getInstance(), Writables.strings()).groupByKey(); + + PTable<String, String> ungroupedTableA = groupedLineTable.ungroup(); + PTable<String, String> ungroupedTableB = groupedLineTable.ungroup(); + + File outputDirA = tmpDir.getFile("output_a"); + File outputDirB = tmpDir.getFile("output_b"); + + pipeline.writeTextFile(ungroupedTableA, outputDirA.getAbsolutePath()); + pipeline.writeTextFile(ungroupedTableB, outputDirB.getAbsolutePath()); + pipeline.done(); + + // Verify that output from a single PGroupedTable can be sent to multiple collections + assertTrue(new File(outputDirA, "part-r-00000").exists()); + assertTrue(new File(outputDirB, "part-r-00000").exists()); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index d4948c0..f0d8187 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -86,7 +86,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { @Override public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) { - return new DoCollectionImpl<T>(name, this, fn, type); + return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type); } @Override @@ -97,7 +97,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { @Override public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) { - return new DoTableImpl<K, V>(name, this, fn, type); + return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type); } public PCollection<S> write(Target target) { @@ -254,4 +254,13 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { } protected abstract long getSizeInternal(); + + /** + * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline. + * @return The PCollectionImpl instance to be chained + */ + protected PCollectionImpl<S> getChainingCollection(){ + return this; + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java index 5a40413..fee381d 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java @@ -42,7 +42,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V> private final PTableBase<K, V> parent; private final GroupingOptions groupingOptions; private final PGroupedTableType<K, V> ptype; - + PGroupedTableImpl(PTableBase<K, V> parent) { this(parent, null); } @@ -79,7 +79,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V> } public PTable<K, V> combineValues(CombineFn<K, V> combineFn) { - return new DoTableImpl<K, V>("combine", this, combineFn, parent.getPTableType()); + return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType()); } private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> { @@ -113,4 +113,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V> public DoNode getGroupingNode() { return DoNode.createGroupingNode("", ptype); } + + @Override + protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() { + // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs + // TODO This should be implemented in a cleaner way in the planner + return new PGroupedTableImpl<K, V>(parent, groupingOptions); + } }
