Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 617105b3d -> a96e30892
CRUNCH-379: Fix unions of PTables and PCollections in crunch-spark Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a96e3089 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a96e3089 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a96e3089 Branch: refs/heads/apache-crunch-0.8 Commit: a96e30892596bb9914d85673288262f6f7d74d67 Parents: 617105b Author: Josh Wills <[email protected]> Authored: Mon Apr 21 08:28:26 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Apr 21 19:20:55 2014 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/SparkUnionResultsIT.java | 18 ++++++++++++++++++ .../impl/spark/collect/UnionCollection.java | 12 +++++++++++- .../crunch/impl/spark/collect/UnionTable.java | 10 +++++++++- 3 files changed, 38 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a96e3089/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java index db8509b..785f45a 100644 --- a/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java +++ b/crunch-spark/src/it/java/org/apache/crunch/SparkUnionResultsIT.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import org.apache.crunch.impl.spark.SparkPipeline; import org.apache.crunch.io.At; import org.apache.crunch.io.To; +import org.apache.crunch.lib.PTables; import org.apache.crunch.test.CrunchTestSupport; import org.apache.crunch.types.writable.Writables; import org.junit.Test; @@ -79,6 +80,23 @@ public class SparkUnionResultsIT extends CrunchTestSupport implements Serializab } @Test + public void testMultiGroupBy() throws Exception { + String inputPath = tempDir.copyResourceFileName("set1.txt"); + String inputPath2 = tempDir.copyResourceFileName("set2.txt"); + String output = tempDir.getFileName("output"); + + Pipeline pipeline = new SparkPipeline("local", "multigroupby"); + + PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings())); + PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(), + Writables.pairs(Writables.strings(), Writables.longs())); + PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count(); + PTables.asPTable(set2Counts.union(set1Lengths)).groupByKey().ungroup() + .write(At.sequenceFile(output, Writables.strings(), Writables.longs())); + pipeline.done(); + } + + @Test public void testMultiWrite() throws Exception { String inputPath = tempDir.copyResourceFileName("set1.txt"); String inputPath2 = tempDir.copyResourceFileName("set2.txt"); http://git-wip-us.apache.org/repos/asf/crunch/blob/a96e3089/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java index 4e8b25a..5c18665 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java @@ -17,12 +17,17 @@ */ package org.apache.crunch.impl.spark.collect; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.collect.BaseUnionCollection; import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.dist.collect.PTableBase; import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; +import org.apache.crunch.impl.spark.fn.FlatMapPairDoFn; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; +import org.apache.spark.api.java.function.Function; import org.apache.spark.storage.StorageLevel; import java.util.List; @@ -51,7 +56,12 @@ public class UnionCollection<S> extends BaseUnionCollection<S> implements SparkC List<PCollectionImpl<?>> parents = getParents(); JavaRDD[] rdds = new JavaRDD[parents.size()]; for (int i = 0; i < rdds.length; i++) { - rdds[i] = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); + if (parents.get(i) instanceof PTableBase) { + JavaPairRDD prdd = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); + rdds[i] = prdd.mapPartitions(new FlatMapPairDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext())); + } else { + rdds[i] = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); + } } return runtime.getSparkContext().union(rdds); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a96e3089/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java index 867a95d..b2776c5 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java @@ -17,12 +17,15 @@ */ package org.apache.crunch.impl.spark.collect; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.dist.collect.BaseUnionTable; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.dist.collect.PTableBase; import org.apache.crunch.impl.spark.SparkCollection; import org.apache.crunch.impl.spark.SparkRuntime; +import org.apache.crunch.impl.spark.fn.PairFlatMapDoFn; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.storage.StorageLevel; @@ -52,7 +55,12 @@ public class UnionTable<K, V> extends BaseUnionTable<K, V> implements SparkColle List<PCollectionImpl<?>> parents = getParents(); JavaPairRDD[] rdds = new JavaPairRDD[parents.size()]; for (int i = 0; i < rdds.length; i++) { - rdds[i] = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); + if (parents.get(i) instanceof PTableBase) { + rdds[i] = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); + } else { + JavaRDD rdd = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime); + rdds[i] = rdd.mapPartitions(new PairFlatMapDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext())); + } } return runtime.getSparkContext().union(rdds); }
