Repository: crunch Updated Branches: refs/heads/master 997f76476 -> 00bc96991
CRUNCH-397: Enrich PCollection and Pipeline APIs for Scrunch. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/00bc9699 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/00bc9699 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/00bc9699 Branch: refs/heads/master Commit: 00bc96991d9f8e0723af320930892fd1ce91da1a Parents: 997f764 Author: Josh Wills <[email protected]> Authored: Fri May 16 00:49:49 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Mon May 19 20:32:45 2014 -0700 ---------------------------------------------------------------------- .../crunch/scrunch/PageRankClassTest.scala | 2 +- .../apache/crunch/scrunch/PageRankTest.scala | 3 +- .../org/apache/crunch/scrunch/Conversions.scala | 18 ++ .../org/apache/crunch/scrunch/PCollection.scala | 99 ++++++-- .../apache/crunch/scrunch/PCollectionLike.scala | 223 ++++++++++++++++++- .../apache/crunch/scrunch/PGroupedTable.scala | 119 +++++++--- .../org/apache/crunch/scrunch/PTable.scala | 119 ++++++++-- .../org/apache/crunch/scrunch/Pipeline.scala | 1 + .../apache/crunch/scrunch/PipelineLike.scala | 50 ++++- 9 files changed, 552 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala index 585c7aa..4cd9e84 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala @@ -64,7 +64,7 @@ class PageRankClassTest extends CrunchSuite { lazy val pipeline = Pipeline.mapReduce[PageRankTest](tempDir.getDefaultConfiguration) def initialInput(fileName: String) = { - pipeline.read(from.textFile(fileName)) + pipeline.read(from.textFile(fileName, Avros.strings)) .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) }) .groupByKey .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray))) http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala index dbd14aa..541502f 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala @@ -52,7 +52,8 @@ class PageRankTest extends CrunchSuite { def initialInput(fileName: String) = { pipeline.read(from.textFile(fileName)) - .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) }) + .withPType(Avros.strings) + .mapWithContext((line, ctxt) => { ctxt.getConfiguration; val urls = line.split("\\t"); (urls(0), urls(1)) }) .groupByKey .map((url, links) => (url, (1f, 0f, links.toList))) } http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala index 744685f..833e6d9 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala @@ -24,6 +24,7 @@ import java.nio.ByteBuffer import scala.collection.Iterable import scala.reflect.ClassTag import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce.TaskInputOutputContext trait CanParallelTransform[El, To] { def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]): To @@ -51,12 +52,29 @@ object CanParallelTransform extends LowPriorityParallelTransforms { def kvWrapFn[A, K, V](fn: DoFn[A, (K, V)]) = { new DoFn[A, CPair[K, V]] { + + override def setContext(ctxt: TaskInputOutputContext[_, _, _, _]) { + super.setContext(ctxt) + fn.setContext(ctxt) + } + + override def initialize() { + fn.initialize() + } + override def process(input: A, emitFn: Emitter[CPair[K, V]]) { fn.process(input, new Emitter[(K, V)] { override def emit(kv: (K, V)) { emitFn.emit(CPair.of(kv._1, kv._2)) } override def flush() { emitFn.flush() } }) } + + override def cleanup(emitFn: Emitter[CPair[K, V]]) { + fn.cleanup(new Emitter[(K, V)] { + override def emit(kv: (K, V)) { emitFn.emit(CPair.of(kv._1, kv._2)) } + override def flush() { emitFn.flush() } + }) + } } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala index 49ee6c0..2d4ed44 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala @@ -19,26 +19,32 @@ package org.apache.crunch.scrunch import scala.collection.JavaConversions -import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} -import org.apache.crunch.{PCollection => JCollection, Pair => CPair} +import org.apache.crunch.{PCollection => JCollection, Pair => CPair, _} import org.apache.crunch.lib.{Aggregate, Cartesian, Sample} import org.apache.crunch.scrunch.Conversions._ import org.apache.crunch.scrunch.interpreter.InterpreterRunner +import org.apache.hadoop.mapreduce.TaskInputOutputContext +import org.apache.crunch.types.PType +import org.apache.crunch.fn.IdentityFn class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] { import PCollection._ - def filter(f: S => Boolean): PCollection[S] = { - parallelDo(filterFn[S](f), native.getPType()) - } + type FunctionType[T] = S => T + type CtxtFunctionType[T] = (S, TIOC) => T - def map[T, To](f: S => T)(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, mapFn(f), pt.get(getTypeFamily())) - } + protected def wrapFlatMapFn[T](fmt: S => TraversableOnce[T]) = flatMapFn(fmt) + protected def wrapMapFn[T](fmt: S => T) = mapFn(fmt) + protected def wrapFilterFn(fmt: S => Boolean) = filterFn(fmt) + protected def wrapFlatMapWithCtxtFn[T](fmt: (S, TIOC) => TraversableOnce[T]) = flatMapWithCtxtFn(fmt) + protected def wrapMapWithCtxtFn[T](fmt: (S, TIOC) => T) = mapWithCtxtFn(fmt) + protected def wrapFilterWithCtxtFn(fmt: (S, TIOC) => Boolean) = filterWithCtxtFn(fmt) + protected def wrapPairFlatMapFn[K, V](fmt: S => TraversableOnce[(K, V)]) = pairFlatMapFn(fmt) + protected def wrapPairMapFn[K, V](fmt: S => (K, V)) = pairMapFn(fmt) - def flatMap[T, To](f: S => Traversable[T]) - (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, flatMapFn(f), pt.get(getTypeFamily())) + def withPType(pt: PType[S]): PCollection[S] = { + val ident: MapFn[S, S] = IdentityFn.getInstance() + wrap(native.parallelDo("withPType", ident, pt)) } def union(others: PCollection[S]*) = { @@ -65,7 +71,7 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol JavaConversions.iterableAsScalaIterable[S](native.materialize) } - def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]]) + protected def wrap(newNative: JCollection[_]) = new PCollection[S](newNative.asInstanceOf[JCollection[S]]) def count() = { val count = new PTable[S, java.lang.Long](Aggregate.count(native)) @@ -84,10 +90,10 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol wrap(Sample.sample(native, seed, acceptanceProbability)) } - def pType = native.getPType() + def pType() = native.getPType() } -trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] { +trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, TraversableOnce[T]] { override def process(input: S, emitter: Emitter[T]) { for (v <- apply(input)) { emitter.emit(v) @@ -95,12 +101,43 @@ trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] { } } +trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] { + override def map(input: S) = apply(input) +} + trait SFilterFn[T] extends FilterFn[T] with Function1[T, Boolean] { override def accept(input: T) = apply(input) } -trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] { - override def map(input: S) = apply(input) +class SDoWithCtxtFn[S, T](val f: (S, TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T]) extends DoFn[S, T] { + override def process(input: S, emitter: Emitter[T]) { + for (v <- f(input, getContext)) { + emitter.emit(v) + } + } +} + +class SMapWithCtxtFn[S, T](val f: (S, TaskInputOutputContext[_, _, _, _]) => T) extends MapFn[S, T] { + override def map(input: S) = f(input, getContext) +} + +class SFilterWithCtxtFn[T](val f: (T, TaskInputOutputContext[_, _, _, _]) => Boolean) extends FilterFn[T] { + override def accept(input: T) = f.apply(input, getContext) +} + +trait SDoPairFn[S, K, V] extends DoFn[S, CPair[K, V]] with Function1[S, TraversableOnce[(K, V)]] { + override def process(input: S, emitter: Emitter[CPair[K, V]]) { + for (v <- apply(input)) { + emitter.emit(CPair.of(v._1, v._2)) + } + } +} + +trait SMapPairFn[S, K, V] extends MapFn[S, CPair[K, V]] with Function1[S, (K, V)] { + override def map(input: S): CPair[K, V] = { + val t = apply(input) + CPair.of(t._1, t._2) + } } trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S, K] { @@ -110,19 +147,41 @@ trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S, K] { } object PCollection { + type TIOC = TaskInputOutputContext[_, _, _, _] + + def flatMapFn[S, T](fn: S => TraversableOnce[T]) = { + new SDoFn[S, T] { def apply(s: S) = fn(s) } + } + + def mapFn[S, T](fn: S => T) = { + new SMapFn[S, T] { def apply(s: S) = fn(s) } + } + def filterFn[S](fn: S => Boolean) = { new SFilterFn[S] { def apply(x: S) = fn(x) } } + def flatMapWithCtxtFn[S, T](fn: (S, TIOC) => TraversableOnce[T]) = { + new SDoWithCtxtFn[S, T](fn) + } + + def mapWithCtxtFn[S, T](fn: (S, TIOC) => T) = { + new SMapWithCtxtFn[S, T](fn) + } + + def filterWithCtxtFn[S](fn: (S, TIOC) => Boolean) = { + new SFilterWithCtxtFn[S](fn) + } + def mapKeyFn[S, K](fn: S => K) = { new SMapKeyFn[S, K] { def apply(x: S) = fn(x) } } - def mapFn[S, T](fn: S => T) = { - new SMapFn[S, T] { def apply(s: S) = fn(s) } + def pairMapFn[S, K, V](fn: S => (K, V)) = { + new SMapPairFn[S, K, V] { def apply(s: S) = fn(s) } } - def flatMapFn[S, T](fn: S => Traversable[T]) = { - new SDoFn[S, T] { def apply(s: S) = fn(s) } + def pairFlatMapFn[S, K, V](fn: S => TraversableOnce[(K, V)]) = { + new SDoPairFn[S, K, V] { def apply(s: S) = fn(s) } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index c0162df..1e2e890 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -17,40 +17,238 @@ */ package org.apache.crunch.scrunch -import org.apache.crunch.DoFn -import org.apache.crunch.{PCollection => JCollection, Pair => JPair, Target} +import org.apache.crunch.{PCollection => JCollection, Pair => JPair, _} import org.apache.crunch.types.{PType, PTableType} import org.apache.crunch.types.writable.WritableTypeFamily -import scala.reflect.ClassTag +/** + * Base trait for PCollection-like entities in Scrunch, including PTables and PGroupedTables. + * + * @tparam S the data type of the underlying object contained in this instance + * @tparam FullType the Scrunch PCollection type of this object + * @tparam NativeType the corresponding Crunch PCollection type of this object + */ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { + type FunctionType[T] + type CtxtFunctionType[T] + + protected def wrapFlatMapFn[T](fmt: FunctionType[TraversableOnce[T]]): DoFn[S, T] + protected def wrapMapFn[T](fmt: FunctionType[T]): MapFn[S, T] + protected def wrapFilterFn(fmt: FunctionType[Boolean]): FilterFn[S] + protected def wrapFlatMapWithCtxtFn[T](fmt: CtxtFunctionType[TraversableOnce[T]]): DoFn[S, T] + protected def wrapMapWithCtxtFn[T](fmt: CtxtFunctionType[T]): MapFn[S, T] + protected def wrapFilterWithCtxtFn(fmt: CtxtFunctionType[Boolean]): FilterFn[S] + protected def wrapPairFlatMapFn[K, V](fmt: FunctionType[TraversableOnce[(K, V)]]): DoFn[S, JPair[K, V]] + protected def wrapPairMapFn[K, V](fmt: FunctionType[(K, V)]): MapFn[S, JPair[K, V]] + + /** + * Returns the underlying PCollection wrapped by this instance. + */ val native: NativeType - def wrap(newNative: AnyRef): FullType + protected def wrap(newNative: JCollection[_]): FullType + /** + * Write the data in this instance to the given target. + */ def write(target: Target): FullType = wrap(native.write(target)) + /** + * Write the data in this instance to the given target with the given {@link Target.WriteMode}. + */ def write(target: Target, writeMode: Target.WriteMode): FullType = { wrap(native.write(target, writeMode)) } + /** + * Apply a flatMap operation to this instance, returning a {@code PTable} if the return + * type of the function is a {@code TraversableOnce[Tuple2]} and a {@code PCollection} otherwise. + */ + def flatMap[T, To](f: FunctionType[TraversableOnce[T]]) + (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { + b(this, wrapFlatMapFn(f), pt.get(getTypeFamily())) + } + + /** + * Apply a map operation to this instance, returning a {@code PTable} if the return + * type of the function is a {@code Tuple2} and a {@code PCollection} otherwise. + */ + def map[T, To](f: FunctionType[T])(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { + b(this, wrapMapFn(f), pt.get(getTypeFamily())) + } + + /** + * Apply the given filter function to the elements of this instance and return an new + * instance that contains the items that pass the filter. + */ + def filter(f: FunctionType[Boolean]): FullType = { + wrap(native.filter(wrapFilterFn(f))) + } + + def filter(name: String, f: FunctionType[Boolean]): FullType = { + wrap(native.filter(name, wrapFilterFn(f))) + } + + def flatMapWithContext[T, To](f: CtxtFunctionType[TraversableOnce[T]]) + (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { + b(this, wrapFlatMapWithCtxtFn(f), pt.get(getTypeFamily())) + } + + def mapWithContext[T, To](f: CtxtFunctionType[T]) + (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { + b(this, wrapMapWithCtxtFn(f), pt.get(getTypeFamily())) + } + + def filterWithContext(f: CtxtFunctionType[Boolean]): FullType = { + wrap(native.filter(wrapFilterWithCtxtFn(f))) + } + + /** + * Applies the given doFn to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = { new PCollection[T](native.parallelDo(fn, ptype)) } - + /** + * Applies the given doFn to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T]) = { new PCollection[T](native.parallelDo(name, fn, ptype)) } + /** + * Applies the given doFn to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ + def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T], opts: ParallelDoOptions) = { + new PCollection[T](native.parallelDo(name, fn, ptype, opts)) + } + + /** + * Applies the given doFn to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ def parallelDo[K, V](fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = { new PTable[K, V](native.parallelDo(fn, ptype)) } + /** + * Applies the given doFn to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = { new PTable[K, V](native.parallelDo(name, fn, ptype)) } /** + * Applies the given doFn to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ + def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V], opts: ParallelDoOptions) = { + new PTable[K, V](native.parallelDo(name, fn, ptype, opts)) + } + + /** + * Applies the given flatMap function to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ + def flatMap[T](f: FunctionType[TraversableOnce[T]], ptype: PType[T]) = { + parallelDo(wrapFlatMapFn(f), ptype) + } + + /** + * Applies the given flatMap function to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ + def flatMap[T](name: String, f: FunctionType[TraversableOnce[T]], ptype: PType[T]) = { + parallelDo(name, wrapFlatMapFn(f), ptype) + } + + /** + * Applies the given flatMap function to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ + def flatMap[T](name: String, f: FunctionType[TraversableOnce[T]], ptype: PType[T], opts: ParallelDoOptions) = { + parallelDo(name, wrapFlatMapFn(f), ptype, opts) + } + + /** + * Applies the given flatMap function to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ + def flatMap[K, V](f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K, V]) = { + parallelDo(wrapPairFlatMapFn(f), ptype) + } + + /** + * Applies the given flatMap function to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ + def flatMap[K, V](name: String, f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K, V]) = { + parallelDo(name, wrapPairFlatMapFn(f), ptype) + } + + /** + * Applies the given flatMap function to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ + def flatMap[K, V](name: String, f: FunctionType[TraversableOnce[(K, V)]], ptype: PTableType[K, V], + opts: ParallelDoOptions) = { + parallelDo(name, wrapPairFlatMapFn(f), ptype, opts) + } + + /** + * Applies the given map function to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ + def map[T](f: FunctionType[T], ptype: PType[T]) = { + parallelDo(wrapMapFn(f), ptype) + } + + /** + * Applies the given map function to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ + def map[T](name: String, f: FunctionType[T], ptype: PType[T]) = { + parallelDo(name, wrapMapFn(f), ptype) + } + + /** + * Applies the given map function to the elements of this instance and + * returns a new {@code PCollection} that is the output of this processing. + */ + def map[T](name: String, f: FunctionType[T], ptype: PType[T], opts: ParallelDoOptions) = { + parallelDo(name, wrapMapFn(f), ptype, opts) + } + + /** + * Applies the given map function to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ + def map[K, V](f: FunctionType[(K, V)], ptype: PTableType[K, V]) = { + parallelDo(wrapPairMapFn(f), ptype) + } + + /** + * Applies the given map function to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ + def map[K, V](name: String, f: FunctionType[(K, V)], ptype: PTableType[K, V]) = { + parallelDo(name, wrapPairMapFn(f), ptype) + } + + /** + * Applies the given map function to the elements of this instance and + * returns a new {@code PTable} that is the output of this processing. + */ + def map[K, V](name: String, f: FunctionType[(K, V)], ptype: PTableType[K, V], + opts: ParallelDoOptions) = { + parallelDo(name, wrapPairMapFn(f), ptype, opts) + } + + /** * Gets the number of elements represented by this PCollection. * * @return The number of elements in this PCollection. @@ -59,9 +257,22 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { PObject(native.length()) } + /** + * Returns a {@code PObject} containing the elements of this instance as a {@code Seq}. + * @return + */ def asSeq(): PObject[Seq[S]] = { PObject(native.asCollection()) } - def getTypeFamily() = Avros + /** + * Returns the {@code PTypeFamily} of this instance. + */ + def getTypeFamily() = { + if (native.getTypeFamily == WritableTypeFamily.getInstance()) { + Writables + } else { + Avros + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala index 7d890de..c86611a 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PGroupedTable.scala @@ -17,38 +17,43 @@ */ package org.apache.crunch.scrunch -import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} -import org.apache.crunch.{CombineFn, PGroupedTable => JGroupedTable, PTable => JTable, Pair => CPair} +import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, Pair => CPair, _} import java.lang.{Iterable => JIterable} -import scala.collection.{Iterable, Iterator} +import scala.collection.Iterable import scala.collection.JavaConversions._ -import Conversions._ +import org.apache.hadoop.mapreduce.TaskInputOutputContext class PGroupedTable[K, V](val native: JGroupedTable[K, V]) extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K, V]] { import PGroupedTable._ - def filter(f: (K, Iterable[V]) => Boolean) = { - parallelDo(filterFn[K, V](f), native.getPType()) - } - - def map[T, To](f: (K, Iterable[V]) => T) - (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, mapFn(f), pt.get(getTypeFamily())) - } + type FunctionType[T] = (K, Iterable[V]) => T + type CtxtFunctionType[T] = (K, Iterable[V], TIOC) => T - def flatMap[T, To](f: (K, Iterable[V]) => Traversable[T]) - (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, flatMapFn(f), pt.get(getTypeFamily())) + protected def wrapFlatMapFn[T](fmt: (K, Iterable[V]) => TraversableOnce[T]) = flatMapFn(fmt) + protected def wrapMapFn[T](fmt: (K, Iterable[V]) => T) = mapFn(fmt) + protected def wrapFilterFn(fmt: (K, Iterable[V]) => Boolean) = filterFn(fmt) + protected def wrapFlatMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => TraversableOnce[T]) = { + flatMapWithCtxtFn(fmt) } + protected def wrapMapWithCtxtFn[T](fmt: (K, Iterable[V], TIOC) => T) = mapWithCtxtFn(fmt) + protected def wrapFilterWithCtxtFn(fmt: (K, Iterable[V], TIOC) => Boolean) = filterWithCtxtFn(fmt) + protected def wrapPairFlatMapFn[S, T](fmt: (K, Iterable[V]) => TraversableOnce[(S, T)]) = pairFlatMapFn(fmt) + protected def wrapPairMapFn[S, T](fmt: (K, Iterable[V]) => (S, T)) = pairMapFn(fmt) def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f)) + def combineValues(agg: Aggregator[V]) = new PTable[K, V](native.combineValues(agg)) + + def combineValues(combineAgg: Aggregator[V], reduceAgg: Aggregator[V]) = { + new PTable[K, V](native.combineValues(combineAgg, reduceAgg)) + } + def combineValues(fn: CombineFn[K, V]) = new PTable[K, V](native.combineValues(fn)) def ungroup() = new PTable[K, V](native.ungroup()) - def wrap(newNative: AnyRef): PGroupedTable[K, V] = { + protected def wrap(newNative: JCollection[_]): PGroupedTable[K, V] = { new PGroupedTable[K, V](newNative.asInstanceOf[JGroupedTable[K, V]]) } } @@ -59,11 +64,7 @@ class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K, V] { } } -trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V], Boolean] { - override def accept(input: CPair[K, JIterable[V]]) = apply(input.first(), iterableAsScalaIterable[V](input.second())) -} - -trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], Traversable[T]] { +trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], TraversableOnce[T]] { override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) { for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) { emitter.emit(v) @@ -77,16 +78,84 @@ trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Funct } } +trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V], Boolean] { + override def accept(input: CPair[K, JIterable[V]]) = { + apply(input.first(), iterableAsScalaIterable[V](input.second())) + } +} + +class SDoGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T]) + extends DoFn[CPair[K, JIterable[V]], T] { + override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) { + for (v <- f(input.first(), iterableAsScalaIterable[V](input.second()), getContext)) { + emitter.emit(v) + } + } +} + +class SMapGroupedWithCtxtFn[K, V, T](val f: (K, Iterable[V], TaskInputOutputContext[_, _, _, _]) => T) + extends MapFn[CPair[K, JIterable[V]], T] { + override def map(input: CPair[K, JIterable[V]]) = { + f(input.first(), iterableAsScalaIterable[V](input.second()), getContext) + } +} + +class SFilterGroupedWithCtxtFn[K, V](val f: (K, Iterable[V], TaskInputOutputContext[_, _, _, _]) => Boolean) + extends FilterFn[CPair[K, JIterable[V]]] { + override def accept(input: CPair[K, JIterable[V]]) = { + f.apply(input.first(), iterableAsScalaIterable[V](input.second()), getContext) + } +} + +trait SDoPairGroupedFn[K, V, S, T] extends DoFn[CPair[K, JIterable[V]], CPair[S, T]] + with Function2[K, Iterable[V], TraversableOnce[(S, T)]] { + override def process(input: CPair[K, JIterable[V]], emitter: Emitter[CPair[S, T]]) { + for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) { + emitter.emit(CPair.of(v._1, v._2)) + } + } +} + +trait SMapPairGroupedFn[K, V, S, T] extends MapFn[CPair[K, JIterable[V]], CPair[S, T]] + with Function2[K, Iterable[V], (S, T)] { + override def map(input: CPair[K, JIterable[V]]) = { + val t = apply(input.first(), iterableAsScalaIterable[V](input.second())) + CPair.of(t._1, t._2) + } +} + object PGroupedTable { - def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = { - new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + type TIOC = TaskInputOutputContext[_, _, _, _] + + def flatMapFn[K, V, T](fn: (K, Iterable[V]) => TraversableOnce[T]) = { + new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } } def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = { new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } } - def flatMapFn[K, V, T](fn: (K, Iterable[V]) => Traversable[T]) = { - new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = { + new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + } + + def flatMapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => TraversableOnce[T]) = { + new SDoGroupedWithCtxtFn[K, V, T](fn) + } + + def mapWithCtxtFn[K, V, T](fn: (K, Iterable[V], TIOC) => T) = { + new SMapGroupedWithCtxtFn[K, V, T](fn) + } + + def filterWithCtxtFn[K, V](fn: (K, Iterable[V], TIOC) => Boolean) = { + new SFilterGroupedWithCtxtFn[K, V](fn) + } + + def pairMapFn[K, V, S, T](fn: (K, Iterable[V]) => (S, T)) = { + new SMapPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } + } + + def pairFlatMapFn[K, V, S, T](fn: (K, Iterable[V]) => TraversableOnce[(S, T)]) = { + new SDoPairGroupedFn[K, V, S, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index 646aded..2f88b0c 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -21,22 +21,33 @@ import java.util.{Collection => JCollect} import scala.collection.JavaConversions._ -import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} -import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair} +import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, _} import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables} import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType} import org.apache.crunch.scrunch.interpreter.InterpreterRunner +import org.apache.crunch.types.{PTableType, PType} +import scala.collection.Iterable +import org.apache.hadoop.mapreduce.TaskInputOutputContext +import org.apache.crunch.fn.IdentityFn class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] { import PTable._ - def filter(f: (K, V) => Boolean): PTable[K, V] = { - parallelDo(filterFn[K, V](f), native.getPTableType()) - } - - def map[T, To](f: (K, V) => T) - (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, mapFn(f), pt.get(getTypeFamily())) + type FunctionType[T] = (K, V) => T + type CtxtFunctionType[T] = (K, V, TIOC) => T + + protected def wrapFlatMapFn[T](fmt: (K, V) => TraversableOnce[T]) = flatMapFn(fmt) + protected def wrapMapFn[T](fmt: (K, V) => T) = mapFn(fmt) + protected def wrapFilterFn(fmt: (K, V) => Boolean) = filterFn(fmt) + protected def wrapFlatMapWithCtxtFn[T](fmt: (K, V, TIOC) => TraversableOnce[T]) = flatMapWithCtxtFn(fmt) + protected def wrapMapWithCtxtFn[T](fmt: (K, V, TIOC) => T) = mapWithCtxtFn(fmt) + protected def wrapFilterWithCtxtFn(fmt: (K, V, TIOC) => Boolean) = filterWithCtxtFn(fmt) + protected def wrapPairFlatMapFn[S, T](fmt: (K, V) => TraversableOnce[(S, T)]) = pairFlatMapFn(fmt) + protected def wrapPairMapFn[S, T](fmt: (K, V) => (S, T)) = pairMapFn(fmt) + + def withPType(pt: PTableType[K, V]): PTable[K, V] = { + val ident: MapFn[CPair[K, V], CPair[K, V]] = IdentityFn.getInstance() + wrap(native.parallelDo("withPType", ident, pt)) } def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = { @@ -45,15 +56,20 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] parallelDo(mapValuesFn[K, V, T](f), ptype) } + def mapValues[T](f: V => T, pt: PType[T]) = { + val ptype = pt.getFamily().tableOf(native.getKeyType(), pt) + parallelDo(mapValuesFn[K, V, T](f), ptype) + } + def mapKeys[T](f: K => T)(implicit pt: PTypeH[T]) = { val ptf = getTypeFamily() val ptype = ptf.tableOf(pt.get(ptf), native.getValueType()) parallelDo(mapKeysFn[K, V, T](f), ptype) } - def flatMap[T, To](f: (K, V) => Traversable[T]) - (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = { - b(this, flatMapFn(f), pt.get(getTypeFamily())) + def mapKeys[T](f: K => T, pt: PType[T]) = { + val ptype = pt.getFamily.tableOf(pt, native.getValueType()) + parallelDo(mapKeysFn[K, V, T](f), ptype) } def union(others: PTable[K, V]*) = { @@ -123,12 +139,10 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] def groupByKey(options: GroupingOptions) = new PGroupedTable(native.groupByKey(options)) - def wrap(newNative: AnyRef) = { + protected def wrap(newNative: JCollection[_]) = { new PTable[K, V](newNative.asInstanceOf[JTable[K, V]]) } - def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native - def materialize(): Iterable[(K, V)] = { InterpreterRunner.addReplJarsToJob(native.getPipeline().getConfiguration()) native.materialize.view.map(x => (x.first, x.second)) @@ -143,25 +157,61 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V] PObject(native.asMap()) } + def pType() = native.getPTableType() + def keyType() = native.getPTableType().getKeyType() def valueType() = native.getPTableType().getValueType() } +trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, TraversableOnce[T]] { + override def process(input: CPair[K, V], emitter: Emitter[T]) { + for (v <- apply(input.first(), input.second())) { + emitter.emit(v) + } + } +} + +trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] { + override def map(input: CPair[K, V]) = apply(input.first(), input.second()) +} + trait SFilterTableFn[K, V] extends FilterFn[CPair[K, V]] with Function2[K, V, Boolean] { override def accept(input: CPair[K, V]) = apply(input.first(), input.second()) } -trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, Traversable[T]] { +class SDoTableWithCtxtFn[K, V, T](val f: (K, V, TaskInputOutputContext[_, _, _, _]) => TraversableOnce[T]) + extends DoFn[CPair[K, V], T] { override def process(input: CPair[K, V], emitter: Emitter[T]) { - for (v <- apply(input.first(), input.second())) { + for (v <- f(input.first(), input.second(), getContext)) { emitter.emit(v) } } } -trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] { - override def map(input: CPair[K, V]) = apply(input.first(), input.second()) +class SMapTableWithCtxtFn[K, V, T](val f: (K, V, TaskInputOutputContext[_, _, _, _]) => T) + extends MapFn[CPair[K, V], T] { + override def map(input: CPair[K, V]) = f(input.first(), input.second(), getContext) +} + +class SFilterTableWithCtxtFn[K, V](val f: (K, V, TaskInputOutputContext[_, _, _, _]) => Boolean) + extends FilterFn[CPair[K, V]] { + override def accept(input: CPair[K, V]) = f.apply(input.first(), input.second(), getContext) +} + +trait SDoPairTableFn[K, V, S, T] extends DoFn[CPair[K, V], CPair[S, T]] with Function2[K, V, TraversableOnce[(S, T)]] { + override def process(input: CPair[K, V], emitter: Emitter[CPair[S, T]]) { + for (v <- apply(input.first(), input.second())) { + emitter.emit(CPair.of(v._1, v._2)) + } + } +} + +trait SMapPairTableFn[K, V, S, T] extends MapFn[CPair[K, V], CPair[S, T]] with Function2[K, V, (S, T)] { + override def map(input: CPair[K, V]) = { + val t = apply(input.first(), input.second()) + CPair.of(t._1, t._2) + } } trait SMapTableValuesFn[K, V, T] extends MapFn[CPair[K, V], CPair[K, T]] with Function1[V, T] { @@ -173,10 +223,32 @@ trait SMapTableKeysFn[K, V, T] extends MapFn[CPair[K, V], CPair[T, V]] with Func } object PTable { + type TIOC = TaskInputOutputContext[_, _, _, _] + + def flatMapFn[K, V, T](fn: (K, V) => TraversableOnce[T]) = { + new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) } + } + + def mapFn[K, V, T](fn: (K, V) => T) = { + new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) } + } + def filterFn[K, V](fn: (K, V) => Boolean) = { new SFilterTableFn[K, V] { def apply(k: K, v: V) = fn(k, v) } } + def flatMapWithCtxtFn[K, V, T](fn: (K, V, TIOC) => TraversableOnce[T]) = { + new SDoTableWithCtxtFn[K, V, T](fn) + } + + def mapWithCtxtFn[K, V, T](fn: (K, V, TIOC) => T) = { + new SMapTableWithCtxtFn[K, V, T](fn) + } + + def filterWithCtxtFn[K, V](fn: (K, V, TIOC) => Boolean) = { + new SFilterTableWithCtxtFn[K, V](fn) + } + def mapValuesFn[K, V, T](fn: V => T) = { new SMapTableValuesFn[K, V, T] { def apply(v: V) = fn(v) } } @@ -185,11 +257,12 @@ object PTable { new SMapTableKeysFn[K, V, T] { def apply(k: K) = fn(k) } } - def mapFn[K, V, T](fn: (K, V) => T) = { - new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) } + def pairMapFn[K, V, S, T](fn: (K, V) => (S, T)) = { + new SMapPairTableFn[K, V, S, T] { def apply(k: K, v: V) = fn(k, v) } } - def flatMapFn[K, V, T](fn: (K, V) => Traversable[T]) = { - new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) } + def pairFlatMapFn[K, V, S, T](fn: (K, V) => TraversableOnce[(S, T)]) = { + new SDoPairTableFn[K, V, S, T] { def apply(k: K, v: V) = fn(k, v) } } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala index d250e9b..67c9b14 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala @@ -28,6 +28,7 @@ import org.apache.crunch.impl.mr.MRPipeline import org.apache.crunch.util.DistCache import org.apache.crunch.scrunch.interpreter.InterpreterRunner import scala.reflect.ClassTag +import org.apache.crunch.types.{PTableType, PType} /** * Manages the state of a pipeline execution. http://git-wip-us.apache.org/repos/asf/crunch/blob/00bc9699/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala index c183e5e..27c43a7 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala @@ -19,11 +19,9 @@ package org.apache.crunch.scrunch import org.apache.hadoop.conf.Configuration -import org.apache.crunch.{Pipeline => JPipeline} -import org.apache.crunch.Source -import org.apache.crunch.TableSource -import org.apache.crunch.Target +import org.apache.crunch.{Pipeline => JPipeline, _} import org.apache.crunch.scrunch.interpreter.InterpreterRunner +import org.apache.crunch.types.{PTableType, PType} trait PipelineLike { def jpipeline: JPipeline @@ -34,6 +32,18 @@ trait PipelineLike { def getConfiguration(): Configuration = jpipeline.getConfiguration() /** + * Sets the configuration object associated with this pipeline. + */ + def setConfiguration(conf: Configuration) { + jpipeline.setConfiguration(conf) + } + + /** + * Returns the name of this pipeline instance. + */ + def getName() = jpipeline.getName() + + /** * Reads a source into a [[org.apache.crunch.scrunch.PCollection]] * * @param source The source to read from. @@ -91,10 +101,29 @@ trait PipelineLike { } /** + * Creates an empty PCollection of the given PType. + */ + def emptyPCollection[T](pt: PType[T]) = new PCollection[T](jpipeline.emptyPCollection(pt)) + + /** + * Creates an empty PTable of the given PTableType. + */ + def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt)) + + /** + * Returns a handler for controlling the execution of the underlying MapReduce + * pipeline. + */ + def runAsync(): PipelineExecution = { + InterpreterRunner.addReplJarsToJob(getConfiguration()) + jpipeline.runAsync() + } + + /** * Constructs and executes a series of MapReduce jobs in order * to write data to the output targets. */ - def run(): Unit = { + def run(): PipelineResult = { InterpreterRunner.addReplJarsToJob(getConfiguration()) jpipeline.run() } @@ -104,12 +133,21 @@ trait PipelineLike { * clean up any intermediate data files that were created in * this run or previous calls to `run`. */ - def done(): Unit = { + def done(): PipelineResult = { InterpreterRunner.addReplJarsToJob(getConfiguration()) jpipeline.done() } /** + * Cleans up any artifacts created as a result of {@link #run() running} the pipeline. + * + * @param force forces the cleanup even if all targets of the pipeline have not been completed. + */ + def cleanup(force: Boolean): Unit = { + jpipeline.cleanup(force) + } + + /** * Turn on debug logging for jobs that are run from this pipeline. */ def debug(): Unit = jpipeline.enableDebug()
