Repository: crunch Updated Branches: refs/heads/master ebb1b2e32 -> e84751081
CRUNCH-497: Add union methods to Scrunch's PipelineLike Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e8475108 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e8475108 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e8475108 Branch: refs/heads/master Commit: e84751081ec6d8f995b6a41f0c4488f0ff86daf1 Parents: ebb1b2e Author: Josh Wills <[email protected]> Authored: Sat Jan 31 09:13:48 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Sat Jan 31 09:13:48 2015 -0800 ---------------------------------------------------------------------- .../org/apache/crunch/scrunch/PipelineLike.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/e8475108/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala index e948904..b66d289 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala @@ -23,6 +23,7 @@ import org.apache.crunch.{Pipeline => JPipeline, _} import org.apache.crunch.scrunch.interpreter.InterpreterRunner import org.apache.crunch.types.{PTableType, PType} +import scala.collection.JavaConversions import scala.collection.JavaConversions.asJavaCollection trait PipelineLike { @@ -165,6 +166,24 @@ trait PipelineLike { } /** + * Creates a new PCollection as the union of the given elements. + */ + def union[S](elements: Seq[PCollection[S]]) = { + val natives = elements.map(pc => pc.native) + val jpc = jpipeline.union(JavaConversions.seqAsJavaList(natives)) + new PCollection[S](jpc) + } + + /** + * Creates a new PTable as the union of the given elements. + */ + def unionTables[K, V](elements: Seq[PTable[K, V]]) = { + val natives = elements.map(pc => pc.native) + val jpt = jpipeline.unionTables(JavaConversions.seqAsJavaList(natives)) + new PTable[K, V](jpt) + } + + /** * Adds the given {@code SeqDoFn} to the pipeline execution and returns its output. */ def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn)
