Updated Branches: refs/heads/master 79e6c896b -> 2aa692e52
CRUNCH-297: Add parallelism options for joins/cogroups to the Scrunch API Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2aa692e5 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2aa692e5 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2aa692e5 Branch: refs/heads/master Commit: 2aa692e5299ee9d775218d4754ae73f3d58beed1 Parents: 79e6c89 Author: Josh Wills <[email protected]> Authored: Tue Nov 19 07:44:21 2013 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Nov 19 07:44:21 2013 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/scrunch/PTable.scala | 34 ++++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2aa692e5/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index 5775262..7d5bd66 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -23,7 +23,8 @@ import scala.collection.JavaConversions._ import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair} -import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables} +import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables} +import org.apache.crunch.lib.join.{JoinStrategy, DefaultJoinStrategy, JoinType} import org.apache.crunch.scrunch.interpreter.InterpreterRunner import java.util @@ -64,8 +65,8 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] def values() = new PCollection[V](PTables.values(native)) - def cogroup[V2](other: PTable[K, V2]) = { - val jres = Cogroup.cogroup[K, V, V2](this.native, other.native) + def cogroup[V2](other: PTable[K, V2], parallelism: Int = 0) = { + val jres = Cogroup.cogroup[K, V, V2](parallelism, this.native, other.native) val ptf = getTypeFamily() val inter = new PTable[K, CPair[JCollect[V], JCollect[V2]]](jres) inter.parallelDo(new SMapTableValuesFn[K, CPair[JCollect[V], JCollect[V2]], (Iterable[V], Iterable[V2])] { @@ -75,10 +76,9 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] }, ptf.tableOf(keyType, ptf.tuple2(ptf.collections(valueType), ptf.collections(other.valueType)))) } - type JoinFn[V2] = (JTable[K, V], JTable[K, V2]) => JTable[K, CPair[V, V2]] - - protected def join[V2](joinFn: JoinFn[V2], other: PTable[K, V2]): PTable[K, (V, V2)] = { - val jres = joinFn(this.native, other.native) + protected def join[V2](other: PTable[K, V2], joinType: JoinType, parallelism: Int): PTable[K, (V, V2)] = { + val strategy = new DefaultJoinStrategy[K, V, V2](parallelism) + val jres = strategy.join(this.native, other.native, joinType) val ptf = getTypeFamily() val ptype = ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType)) val inter = new PTable[K, CPair[V, V2]](jres) @@ -87,24 +87,24 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] }, ptype) } - def join[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - innerJoin(other) + def join[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = { + innerJoin(other, parallelism) } - def innerJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.innerJoin[K, V, V2](_, _), other) + def innerJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = { + join[V2](other, JoinType.INNER_JOIN, parallelism) } - def leftJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.leftJoin[K, V, V2](_, _), other) + def leftJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = { + join[V2](other, JoinType.LEFT_OUTER_JOIN, parallelism) } - def rightJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.rightJoin[K, V, V2](_, _), other) + def rightJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = { + join[V2](other, JoinType.RIGHT_OUTER_JOIN, parallelism) } - def fullJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = { - join[V2](Join.fullJoin[K, V, V2](_, _), other) + def fullJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = { + join[V2](other, JoinType.FULL_OUTER_JOIN, parallelism) } def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = {
