Updated Branches: refs/heads/master daa16452c -> fd1fc3ad6
CRUNCH-30 Add cross join to Scrunch PTable and PCollection This commit makes the lib.join.Cartesian#cross function easily available to Scrunch. It also adds an accessor for PTable.materializeToMap(). A test is included. Signed-off-by: jwills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/fd1fc3ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/fd1fc3ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/fd1fc3ad Branch: refs/heads/master Commit: fd1fc3ad69334c25dfaee044785385ba95981f91 Parents: daa1645 Author: Brian Martin <[email protected]> Authored: Mon Aug 6 15:57:00 2012 -0700 Committer: jwills <[email protected]> Committed: Mon Aug 6 16:14:12 2012 -0700 ---------------------------------------------------------------------- .../scala/org/apache/scrunch/PCollection.scala | 10 ++- .../src/main/scala/org/apache/scrunch/PTable.scala | 14 +++- .../scala/org/apache/scrunch/CrossJoinTest.scala | 61 +++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fd1fc3ad/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala index d7ebee5..0924587 100644 --- a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala +++ b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConversions import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target} -import org.apache.crunch.lib.Aggregate +import org.apache.crunch.lib.{Aggregate, Cartesian} import org.apache.scrunch.Conversions._ import org.apache.scrunch.interpreter.InterpreterRunner @@ -54,6 +54,12 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol by(f).groupByKey } + def cross[S2](other: PCollection[S2]): PCollection[(S, S2)] = { + val inter = Cartesian.cross(this.native, other.native) + val f = (in: CPair[S, S2]) => (in.first(), in.second()) + inter.parallelDo(mapFn(f), getTypeFamily().tuple2(pType, other.pType)) + } + def materialize() = { InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) JavaConversions.iterableAsScalaIterable[S](native.materialize) @@ -67,6 +73,8 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol } def max() = wrap(Aggregate.max(native)) + + def pType = native.getPType() } trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fd1fc3ad/scrunch/src/main/scala/org/apache/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala index 984d9dc..b6d95a6 100644 --- a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala +++ b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConversions._ import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair} -import org.apache.crunch.lib.{Join, Aggregate, Cogroup, PTables} +import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables} import org.apache.scrunch.interpreter.InterpreterRunner class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] { @@ -100,6 +100,13 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] join[V2](Join.fullJoin[K, V, V2](_, _), other) } + def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = { + val ptf = getTypeFamily() + val inter = new PTable(Cartesian.cross(this.native, other.native)) + val f = (k: CPair[K,K2], v: CPair[V,V2]) => CPair.of((k.first(), k.second()), (v.first(), v.second())) + inter.parallelDo(mapFn(f), ptf.tableOf(ptf.tuple2(keyType, other.keyType), ptf.tuple2(valueType, other.valueType))) + } + def top(limit: Int, maximize: Boolean) = { wrap(Aggregate.top(this.native, limit, maximize)) } @@ -121,6 +128,11 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] native.materialize.view.map(x => (x.first, x.second)) } + def materializeToMap(): Map[K, V] = { + InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) + native.materializeToMap().view.toMap + } + def keyType() = native.getPTableType().getKeyType() def valueType() = native.getPTableType().getValueType() http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/fd1fc3ad/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala ---------------------------------------------------------------------- diff --git a/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala b/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala new file mode 100644 index 0000000..bbbf849 --- /dev/null +++ b/scrunch/src/test/scala/org/apache/scrunch/CrossJoinTest.scala @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.scrunch + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Test + +class CrossJoinTest extends JUnitSuite { + + @Test + def testCrossCollection() { + val testCases = List(Array(1,2,3,4,5), Array(6,7,8), Array.empty[Int]) + val testCasePairs = testCases flatMap {test1 => testCases map {test2 => (test1,test2)}} + + for ((test1, test2) <- testCasePairs) { + val X = Mem.collectionOf(test1: _*) + val Y = Mem.collectionOf(test2: _*) + val cross = X.cross(Y) + + val crossSet = cross.materialize().toSet + + assert(crossSet.size == test1.size * test2.size) + assert(test1.flatMap(t1 => test2.map(t2 => crossSet.contains((t1, t2)))).forall(_ == true)) + + } + } + + @Test + def testCrossTable() { + val testCases = List(Array((1,2),(3,4),(5,6)), Array((7,8),(9,10)), Array.empty[(Int,Int)]) + val testCasePairs = testCases flatMap {test1 => testCases map {test2 => (test1,test2)}} + + for ((test1, test2) <- testCasePairs) { + val X = Mem.tableOf(test1) + val Y = Mem.tableOf(test2) + val cross = X.cross(Y) + + val crossSet = cross.materializeToMap().toSet + val actualCross = test1.flatMap(t1 => test2.map(t2 => ((t1._1, t2._1), (t1._2, t2._2)))) + + assert(crossSet.size == test1.size * test2.size) + assert(actualCross.map(crossSet.contains(_)).forall(_ == true)) + } + } + +}
