Repository: crunch Updated Branches: refs/heads/master 006cd72a3 -> cbb1b7e75
CRUNCH-492: Add create methods to Scrunch Pipeline APIs Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/cbb1b7e7 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/cbb1b7e7 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/cbb1b7e7 Branch: refs/heads/master Commit: cbb1b7e757a7302b6b656ceaf1170033435df23c Parents: 006cd72 Author: Josh Wills <[email protected]> Authored: Tue Jan 27 22:08:13 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Jan 27 22:08:13 2015 -0800 ---------------------------------------------------------------------- .../apache/crunch/scrunch/DeepCopyTest.scala | 36 ++------------ .../org/apache/crunch/scrunch/PTableTest.scala | 7 +-- .../apache/crunch/scrunch/PipelineLike.scala | 51 ++++++++++++++++++++ 3 files changed, 57 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala index 25febc2..1659bf0 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala @@ -57,11 +57,9 @@ class DeepCopyTest extends CrunchSuite { val ones = Seq(BBRec(bb1, Array(bb4, bb2)), BBRec(bb2, Array(bb1, bb3))) val twos = Seq(BBRec(bb3, Array(bb1, bb2)), BBRec(bb4, Array(bb3, bb4))) - writeCollection(new Path(prefix + "/ones"), ones) - writeCollection(new Path(prefix + "/twos"), twos) - val oneF = pipe.read(from.avroFile(prefix + "/ones", Avros.reflects[BBRec])) - val twoF = pipe.read(from.avroFile(prefix + "/twos", Avros.reflects[BBRec])) + val oneF = pipe.create(ones, Avros.reflects[BBRec]) + val twoF = pipe.create(twos, Avros.reflects[BBRec]) val m = oneF.flatMap(getIterator(_)).leftJoin(twoF.flatMap(getIterator(_))) .keys @@ -77,13 +75,9 @@ class DeepCopyTest extends CrunchSuite { val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), Rec2(1, "b", 0.7), Rec2(2, "c", 9.9)) val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6)) - writeCollection(new Path(prefix + "/ones"), ones) - writeCollection(new Path(prefix + "/twos"), twos) - writeCollection(new Path(prefix + "/threes"), threes) - - val oneF = pipe.read(from.avroFile(prefix + "/ones", A.reflects(classOf[Rec1]))) - val twoF = pipe.read(from.avroFile(prefix + "/twos", A.reflects(classOf[Rec2]))) - val threeF = pipe.read(from.avroFile(prefix + "/threes", A.reflects(classOf[Rec3]))) + val oneF = pipe.create(ones, A.reflects(classOf[Rec1])) + val twoF = pipe.create(twos, A.reflects(classOf[Rec2])) + val threeF = pipe.create(threes, A.reflects(classOf[Rec3])) val res = (oneF.by(_.k) cogroup (twoF.by(_.k2) @@ -102,24 +96,4 @@ class DeepCopyTest extends CrunchSuite { assertEquals(res.map(_._2.toSet), Seq(e12, e22)) pipe.done() } - - private def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) { - writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), records) - } - - @SuppressWarnings(Array("rawtypes", "unchecked")) - private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, records: Iterable[T]) { - val r: AnyRef = records.iterator.next() - val factory = new ScalaReflectDataFactory() - val schema = factory.getData().getSchema(r.getClass) - val writer = factory.getWriter[T](schema) - val dataFileWriter = new DataFileWriter(writer) - dataFileWriter.create(schema, outputStream) - - for (record <- records) { - dataFileWriter.append(record) - } - dataFileWriter.close() - outputStream.close() - } } http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala index bae1b4e..150774d 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala @@ -17,8 +17,6 @@ */ package org.apache.crunch.scrunch -import org.apache.crunch.io.{From => from, To => to} - import _root_.org.junit.Assert._ import _root_.org.junit.Test @@ -34,10 +32,7 @@ class PTableTest extends CrunchSuite { */ private def tensCollection: PCollection[Int] = { val pipeline = Pipeline.mapReduce[PTableTest](tempDir.getDefaultConfiguration) - val input = tempDir.copyResourceFileName("tens.txt") - pipeline.read(from.textFile(input)).map { line => - Integer.parseInt(line) - } + pipeline.create(List.fill(100)(10), Avros.ints) } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/cbb1b7e7/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala index 08b4697..e948904 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala @@ -23,6 +23,8 @@ import org.apache.crunch.{Pipeline => JPipeline, _} import org.apache.crunch.scrunch.interpreter.InterpreterRunner import org.apache.crunch.types.{PTableType, PType} +import scala.collection.JavaConversions.asJavaCollection + trait PipelineLike { def jpipeline: JPipeline @@ -56,6 +58,16 @@ trait PipelineLike { def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source)) /** + * Reads a source into a [[org.apache.crunch.scrunch.PCollection]] + * + * @param source The source to read from. + * @param named A short name to use for the returned PCollection. + * @tparam T The type of the values being read. + * @return A PCollection containing data read from the specified source. + */ + def read[T](source: Source[T], named: String): PCollection[T] = new PCollection(jpipeline.read(source, named)) + + /** * Reads a source into a [[org.apache.crunch.scrunch.PTable]] * * @param source The source to read from. @@ -66,6 +78,17 @@ trait PipelineLike { def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source)) /** + * Reads a source into a [[org.apache.crunch.scrunch.PTable]] + * + * @param source The source to read from. + * @param named A short name to use for the return PTable. + * @tparam K The type of the keys being read. + * @tparam V The type of the values being read. + * @return A PTable containing data read from the specified source. + */ + def read[K, V](source: TableSource[K, V], named: String): PTable[K, V] = new PTable(jpipeline.read(source, named)) + + /** * Writes a parallel collection to a target. * * @param collection The collection to write. @@ -114,6 +137,34 @@ trait PipelineLike { def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt)) /** + * Creates a new PCollection from the given elements. + */ + def create[T](elements: Iterable[T], pt: PType[T]) = { + new PCollection[T](jpipeline.create(asJavaCollection(elements), pt)) + } + + /** + * Creates a new PCollection from the given elements. + */ + def create[T](elements: Iterable[T], pt: PType[T], options: CreateOptions) = { + new PCollection[T](jpipeline.create(asJavaCollection(elements), pt, options)) + } + + /** + * Creates a new PTable from the given elements. + */ + def create[K, V](elements: Iterable[(K, V)], pt: PTableType[K, V]) = { + new PTable[K, V](jpipeline.create(asJavaCollection(elements.map(t => Pair.of(t._1, t._2))), pt)) + } + + /** + * Creates a new PTable from the given elements. + */ + def create[K, V](elements: Iterable[(K, V)], pt: PTableType[K, V], options: CreateOptions) = { + new PTable[K, V](jpipeline.create(asJavaCollection(elements.map(t => Pair.of(t._1, t._2))), pt, options)) + } + + /** * Adds the given {@code SeqDoFn} to the pipeline execution and returns its output. */ def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn)
