Merge branch 'master' into scala-2.10
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a60620b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a60620b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a60620b7 Branch: refs/heads/master Commit: a60620b76a98e236f1e4ffda7a2f289e7917b957 Parents: 0f2e3c6 2054c61 Author: Raymond Liu <[email protected]> Authored: Thu Nov 14 12:44:19 2013 +0800 Committer: Raymond Liu <[email protected]> Committed: Thu Nov 14 12:44:19 2013 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 9 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 21 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 2 + .../apache/spark/scheduler/DAGScheduler.scala | 107 +++---- .../scheduler/cluster/ClusterScheduler.scala | 24 +- .../cluster/SimrSchedulerBackend.scala | 12 + .../spark/serializer/KryoSerializer.scala | 52 ++-- .../org/apache/spark/storage/BlockManager.scala | 4 +- .../org/apache/spark/ui/jobs/StagePage.scala | 31 ++- .../org/apache/spark/ui/jobs/StageTable.scala | 11 +- .../deploy/worker/ExecutorRunnerTest.scala | 3 +- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- docs/running-on-yarn.md | 1 + ec2/spark_ec2.py | 66 +++-- pom.xml | 6 + project/SparkBuild.scala | 5 +- project/plugins.sbt | 2 +- .../scala/org/apache/spark/repl/ReplSuite.scala | 36 ++- spark-class | 13 + spark-class2.cmd | 7 + .../streaming/dstream/NetworkInputDStream.scala | 4 +- .../spark/streaming/InputStreamsSuite.scala | 83 +++++- yarn/pom.xml | 50 ++++ .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 276 ++++++++++--------- .../yarn/ClientDistributedCacheManager.scala | 228 +++++++++++++++ .../spark/deploy/yarn/WorkerRunnable.scala | 42 +-- .../ClientDistributedCacheManagerSuite.scala | 220 +++++++++++++++ 28 files changed, 985 insertions(+), 334 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ab7b3a2,42bb388..7b4fc6b --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@@ -19,11 -19,11 +19,13 @@@ package org.apache.spark.schedule import java.io.NotSerializableException import java.util.Properties - import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} import java.util.concurrent.atomic.AtomicInteger ++import scala.concurrent.duration._ ++import scala.concurrent.ExecutionContext.Implicits.global + import akka.actor._ -import akka.util.duration._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.RDD @@@ -104,13 -104,36 +106,36 @@@ class DAGScheduler // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in -- val RESUBMIT_TIMEOUT = 50L ++ val RESUBMIT_TIMEOUT = 50.milliseconds // The time, in millis, to wake up between polls of the completion queue in order to potentially // resubmit failed stages val POLL_TIMEOUT = 10L - private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent] + private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor { + override def preStart() { - context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) { ++ context.system.scheduler.schedule(RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT) { + if (failed.size > 0) { + resubmitFailedStages() + } + } + } + + /** + * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure + * events and responds by launching tasks. This runs in a dedicated thread and receives events + * via the eventQueue. + */ + def receive = { + case event: DAGSchedulerEvent => + logDebug("Got event of type " + event.getClass.getName) + + if (!processEvent(event)) + submitWaitingStages() + else + context.stop(self) + } + })) private[scheduler] val nextJobId = new AtomicInteger(0) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 8503395,53a5896..2d8a0a6 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@@ -25,6 -25,8 +25,9 @@@ import scala.collection.mutable.ArrayBu import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import akka.util.duration._ ++import scala.concurrent.duration._ ++import scala.concurrent.ExecutionContext.Implicits.global + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler._ http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index 72b9549,edcc3b3..4be9d3a --- a/pom.xml +++ b/pom.xml @@@ -391,9 -385,15 +391,15 @@@ <version>3.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.8.5</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_2.9.3</artifactId> + <artifactId>scalacheck_${scala-short.version}</artifactId> <version>1.10.0</version> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/project/SparkBuild.scala ---------------------------------------------------------------------- diff --cc project/SparkBuild.scala index b71e1b3,bccf36c..9a3cbbe --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@@ -166,16 -166,13 +166,17 @@@ object SparkBuild extends Build libraryDependencies ++= Seq( - "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", - "org.scalatest" %% "scalatest" % "1.9.1" % "test", - "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", - "com.novocode" % "junit-interface" % "0.9" % "test", - "org.easymock" % "easymock" % "3.1" % "test", - "org.mockito" % "mockito-all" % "1.8.5" % "test" + "io.netty" % "netty-all" % "4.0.0.CR1", + "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", + "org.scalatest" %% "scalatest" % "1.9.1" % "test", + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", + "com.novocode" % "junit-interface" % "0.9" % "test", + "org.easymock" % "easymock" % "3.1" % "test", ++ "org.mockito" % "mockito-all" % "1.8.5" % "test", + "commons-io" % "commons-io" % "2.4" % "test" ), + + parallelExecution := true, /* Workaround for issue #206 (fixed after SBT 0.11.0) */ watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task, const(std.TaskExtra.constant(Nil)), aggregate = true, includeRoot = true) apply { _.join.map(_.flatten) }, http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala ---------------------------------------------------------------------- diff --cc repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index fccb6e6,6e4504d..418c31e --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@@ -43,15 -63,44 +46,44 @@@ class ReplSuite extends FunSuite def assertDoesNotContain(message: String, output: String) { assert(!output.contains(message), - "Interpreter output contained '" + message + "':\n" + output) + "Interpreter output contained '" + message + "':\n" + output) } - test("simple foreach with accumulator") { + test("propagation of local properties") { + // A mock ILoop that doesn't install the SIGINT handler. + class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) { + settings = new scala.tools.nsc.Settings + settings.usejavacp.value = true + org.apache.spark.repl.Main.interp = this + override def createInterpreter() { + intp = new SparkILoopInterpreter + intp.setContextClassLoader() + } + } + + val out = new StringWriter() + val interp = new ILoop(new PrintWriter(out)) + interp.sparkContext = new SparkContext("local", "repl-test") + interp.createInterpreter() + interp.intp.initialize() + interp.sparkContext.setLocalProperty("someKey", "someValue") + + // Make sure the value we set in the caller to interpret is propagated in the thread that + // interprets the command. + interp.interpret("org.apache.spark.repl.Main.interp.sparkContext.getLocalProperty(\"someKey\")") + assert(out.toString.contains("someValue")) + + interp.sparkContext.stop() + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + } + + test ("simple foreach with accumulator") { val output = runInterpreter("local", """ - val accum = sc.accumulator(0) - sc.parallelize(1 to 10).foreach(x => accum += x) - accum.value - """) + |val accum = sc.accumulator(0) + |sc.parallelize(1 to 10).foreach(x => accum += x) + |accum.value + """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) assertContains("res1: Int = 55", output) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/spark-class ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/yarn/pom.xml ----------------------------------------------------------------------
