CRUNCH-73. PipelineApp does not extend DelayedInit so that Scala closures are properly serialized.
Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/4e0ead87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/4e0ead87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/4e0ead87 Branch: refs/heads/master Commit: 4e0ead87d77be4991bc776b3938f27ed26bbf5e3 Parents: a65feb5 Author: Kiyan Ahmadizadeh <[email protected]> Authored: Thu Sep 20 17:00:37 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Fri Sep 21 15:45:12 2012 -0700 ---------------------------------------------------------------------- crunch-scrunch/src/it/resources/tens.txt | 100 +++++++++++++++ .../crunch/scrunch/PipelineAppClosureTest.scala | 58 +++++++++ .../apache/crunch/scrunch/PipelineAppTest.scala | 8 +- .../src/main/examples/ClassyPageRank.scala | 30 +++-- crunch-scrunch/src/main/examples/PageRank.scala | 26 ++-- crunch-scrunch/src/main/examples/WordCount.scala | 6 +- .../org/apache/crunch/scrunch/PipelineApp.scala | 24 ++-- 7 files changed, 207 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/it/resources/tens.txt ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/resources/tens.txt b/crunch-scrunch/src/it/resources/tens.txt new file mode 100644 index 0000000..d3bfd02 --- /dev/null +++ b/crunch-scrunch/src/it/resources/tens.txt @@ -0,0 +1,100 @@ +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 +10 http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala new file mode 100644 index 0000000..e0702e4 --- /dev/null +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppClosureTest.scala @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.scrunch + +import org.apache.crunch.test.CrunchTestSupport + +import org.scalatest.junit.JUnitSuite +import _root_.org.junit.Assert._ +import _root_.org.junit.Test + +/** + * Test that verifies that a Scala PipelineApp can properly send some side data as part of a + * function closure. + */ +class PipelineClosureAppTest extends CrunchTestSupport with JUnitSuite { + + /** + * A simple pipeline application that divides each element of a PCollection of numbers by + * 10. The PCollection of numbers used as input is just the number 10 repeated 100 times. + * Thus the resulting PCollection should be the number 1 repeated 100 times. + */ + object Divider extends PipelineApp { + + /** + * Runs the Pipeline for this test and verifies it has the desired effect of transforming a + * PCollection of 10s into a PCollection of 1s. + */ + override def run(args: Array[String]) { + val divisor = 10 + val tens = read(from.textFile(args(0))) + val ones = tens.map { n => Integer.valueOf(n) / divisor } + val countOfOnes = ones.count().materializeToMap() + assertEquals(100, countOfOnes(1)) + } + } + + @Test def run { + val args = new Array[String](1) + args(0) = tempDir.copyResourceFileName("tens.txt") + tempDir.overridePathProperties(Divider.configuration) + Divider.main(args) + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala index 4947b40..db49c92 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala @@ -32,9 +32,11 @@ object WordCount extends PipelineApp { words.count } - val w1 = countWords(args(0)) - val w2 = countWords(args(1)) - cogroup(w1, w2).write(to.textFile(args(2))) + override def run(args: Array[String]) { + val w1 = countWords(args(0)) + val w2 = countWords(args(1)) + cogroup(w1, w2).write(to.textFile(args(2))) + } } class PipelineAppTest extends CrunchTestSupport with JUnitSuite { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/examples/ClassyPageRank.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/examples/ClassyPageRank.scala b/crunch-scrunch/src/main/examples/ClassyPageRank.scala index 540aaca..1bc705d 100644 --- a/crunch-scrunch/src/main/examples/ClassyPageRank.scala +++ b/crunch-scrunch/src/main/examples/ClassyPageRank.scala @@ -22,9 +22,9 @@ case class UrlData(pageRank: Float, oldPageRank: Float, links: List[String]) { def this() = this(1.0f, 0.0f, Nil) def this(links: String*) = this(1.0f, 0.0f, List(links:_*)) - + def this(links: Iterable[String]) = this(1.0f, 0.0f, links.toList) - + def delta = math.abs(pageRank - oldPageRank) def next(newPageRank: Float) = new UrlData(newPageRank, pageRank, links) @@ -55,17 +55,19 @@ object ClassyPageRank extends PipelineApp { }) } - var index = 0 - var delta = 10.0f - fs.mkdirs("prank/") - var curr = initialize(args(0)) - while (delta > 1.0f) { - index = index + 1 - curr = update(curr, 0.5f) - write(curr, to.avroFile("prank/" + index)) - delta = curr.values.map(_.delta).max.materialize.head - println("Current delta = " + delta) + override def run(args: Array[String]) { + var index = 0 + var delta = 10.0f + fs.mkdirs("prank/") + var curr = initialize(args(0)) + while (delta > 1.0f) { + index = index + 1 + curr = update(curr, 0.5f) + write(curr, to.avroFile("prank/" + index)) + delta = curr.values.map(_.delta).max.value() + println("Current delta = " + delta) + } + fs.rename("prank/" + index, args(1)) + fs.delete("prank/", true) } - fs.rename("prank/" + index, args(1)) - fs.delete("prank/", true) } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/examples/PageRank.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/examples/PageRank.scala b/crunch-scrunch/src/main/examples/PageRank.scala index e0ed3bc..5e08169 100644 --- a/crunch-scrunch/src/main/examples/PageRank.scala +++ b/crunch-scrunch/src/main/examples/PageRank.scala @@ -45,17 +45,19 @@ object PageRank extends PipelineApp { }) } - var index = 0 - var delta = 10.0f - fs.mkdirs("prank/") - var curr = initialize(args(0)) - while (delta > 1.0f) { - index = index + 1 - curr = update(curr, 0.5f) - write(curr, to.avroFile("prank/" + index)) - delta = curr.values.map(v => math.abs(v._1 - v._2)).max.materialize.head - println("Current delta = " + delta) + override def run(args: Array[String]) { + var index = 0 + var delta = 10.0f + fs.mkdirs("prank/") + var curr = initialize(args(0)) + while (delta > 1.0f) { + index = index + 1 + curr = update(curr, 0.5f) + write(curr, to.avroFile("prank/" + index)) + delta = curr.values.map(v => math.abs(v._1 - v._2)).max.value() + println("Current delta = " + delta) + } + fs.rename("prank/" + index, args(1)) + fs.delete("prank/", true) } - fs.rename("prank/" + index, args(1)) - fs.delete("prank/", true) } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/examples/WordCount.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/examples/WordCount.scala b/crunch-scrunch/src/main/examples/WordCount.scala index 59cd1c5..18ee9d6 100644 --- a/crunch-scrunch/src/main/examples/WordCount.scala +++ b/crunch-scrunch/src/main/examples/WordCount.scala @@ -25,6 +25,8 @@ object WordCount extends PipelineApp { .count } - val counts = join(countWords(args(0)), countWords(args(1))) - write(counts, to.textFile(args(2))) + override def run(args: Array[String]) { + val counts = join(countWords(args(0)), countWords(args(1))) + write(counts, to.textFile(args(2))) + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/4e0ead87/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala index 3a1a2bd..11395d3 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineApp.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.util.GenericOptionsParser import org.apache.crunch.{Source, TableSource, Target} -trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedInit { +trait PipelineApp extends MREmbeddedPipeline with PipelineHelper { implicit def _string2path(str: String) = new Path(str) /** Contains factory methods used to create `Source`s. */ @@ -39,26 +39,22 @@ trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedIni /** Contains factory methods used to create `SourceTarget`s. */ val at = At - private val initCode = new ListBuffer[() => Unit] - - private var _args: Array[String] = _ - - /** Command-line arguments passed to this application. */ - protected def args: Array[String] = _args - def configuration: Configuration = pipeline.getConfiguration /** Gets the distributed filesystem associated with this application's configuration. */ def fs: FileSystem = FileSystem.get(configuration) - override def delayedInit(body: => Unit) { - initCode += (() => body) - } + /** + * The entry-point for pipeline applications. Clients should override this method to implement + * the logic of their pipeline application. + * + * @param args The command-line arguments passed to the pipeline application. + */ + def run(args: Array[String]): Unit - def main(args: Array[String]) = { + final def main(args: Array[String]) = { val parser = new GenericOptionsParser(configuration, args) - _args = parser.getRemainingArgs() - for (proc <- initCode) proc() + run(parser.getRemainingArgs()) done } }
