Merge branch 'master' into scala-2.10

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a60620b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a60620b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a60620b7

Branch: refs/heads/master
Commit: a60620b76a98e236f1e4ffda7a2f289e7917b957
Parents: 0f2e3c6 2054c61
Author: Raymond Liu <[email protected]>
Authored: Thu Nov 14 12:44:19 2013 +0800
Committer: Raymond Liu <[email protected]>
Committed: Thu Nov 14 12:44:19 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   9 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  21 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   2 +
 .../apache/spark/scheduler/DAGScheduler.scala   | 107 +++----
 .../scheduler/cluster/ClusterScheduler.scala    |  24 +-
 .../cluster/SimrSchedulerBackend.scala          |  12 +
 .../spark/serializer/KryoSerializer.scala       |  52 ++--
 .../org/apache/spark/storage/BlockManager.scala |   4 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  31 ++-
 .../org/apache/spark/ui/jobs/StageTable.scala   |  11 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |   3 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +-
 docs/running-on-yarn.md                         |   1 +
 ec2/spark_ec2.py                                |  66 +++--
 pom.xml                                         |   6 +
 project/SparkBuild.scala                        |   5 +-
 project/plugins.sbt                             |   2 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala |  36 ++-
 spark-class                                     |  13 +
 spark-class2.cmd                                |   7 +
 .../streaming/dstream/NetworkInputDStream.scala |   4 +-
 .../spark/streaming/InputStreamsSuite.scala     |  83 +++++-
 yarn/pom.xml                                    |  50 ++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |   2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 276 ++++++++++---------
 .../yarn/ClientDistributedCacheManager.scala    | 228 +++++++++++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      |  42 +--
 .../ClientDistributedCacheManagerSuite.scala    | 220 +++++++++++++++
 28 files changed, 985 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ab7b3a2,42bb388..7b4fc6b
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@@ -19,11 -19,11 +19,13 @@@ package org.apache.spark.schedule
  
  import java.io.NotSerializableException
  import java.util.Properties
- import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
  import java.util.concurrent.atomic.AtomicInteger
++import scala.concurrent.duration._
++import scala.concurrent.ExecutionContext.Implicits.global
  
+ import akka.actor._
 -import akka.util.duration._
  import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 +import scala.reflect.ClassTag
  
  import org.apache.spark._
  import org.apache.spark.rdd.RDD
@@@ -104,13 -104,36 +106,36 @@@ class DAGScheduler
    // The time, in millis, to wait for fetch failure events to stop coming in 
after one is detected;
    // this is a simplistic way to avoid resubmitting tasks in the 
non-fetchable map stage one by one
    // as more failure events come in
--  val RESUBMIT_TIMEOUT = 50L
++  val RESUBMIT_TIMEOUT = 50.milliseconds
  
    // The time, in millis, to wake up between polls of the completion queue in 
order to potentially
    // resubmit failed stages
    val POLL_TIMEOUT = 10L
  
-   private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
+   private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new 
Actor {
+     override def preStart() {
 -      context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, 
RESUBMIT_TIMEOUT milliseconds) {
++      context.system.scheduler.schedule(RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT) {
+         if (failed.size > 0) {
+           resubmitFailedStages()
+         }
+       }
+     }
+ 
+     /**
+      * The main event loop of the DAG scheduler, which waits for new-job / 
task-finished / failure
+      * events and responds by launching tasks. This runs in a dedicated 
thread and receives events
+      * via the eventQueue.
+      */
+     def receive = {
+       case event: DAGSchedulerEvent =>
+         logDebug("Got event of type " + event.getClass.getName)
+ 
+         if (!processEvent(event))
+           submitWaitingStages()
+         else
+           context.stop(self)
+     }
+   }))
  
    private[scheduler] val nextJobId = new AtomicInteger(0)
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --cc 
core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 8503395,53a5896..2d8a0a6
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@@ -25,6 -25,8 +25,9 @@@ import scala.collection.mutable.ArrayBu
  import scala.collection.mutable.HashMap
  import scala.collection.mutable.HashSet
  
 -import akka.util.duration._
++import scala.concurrent.duration._
++import scala.concurrent.ExecutionContext.Implicits.global
+ 
  import org.apache.spark._
  import org.apache.spark.TaskState.TaskState
  import org.apache.spark.scheduler._

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 72b9549,edcc3b3..4be9d3a
--- a/pom.xml
+++ b/pom.xml
@@@ -391,9 -385,15 +391,15 @@@
          <version>3.1</version>
          <scope>test</scope>
        </dependency>
+      <dependency>
+         <groupId>org.mockito</groupId>
+         <artifactId>mockito-all</artifactId>
+         <version>1.8.5</version>
+         <scope>test</scope>
+       </dependency>
        <dependency>
          <groupId>org.scalacheck</groupId>
 -        <artifactId>scalacheck_2.9.3</artifactId>
 +        <artifactId>scalacheck_${scala-short.version}</artifactId>
          <version>1.10.0</version>
          <scope>test</scope>
        </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/project/SparkBuild.scala
----------------------------------------------------------------------
diff --cc project/SparkBuild.scala
index b71e1b3,bccf36c..9a3cbbe
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@@ -166,16 -166,13 +166,17 @@@ object SparkBuild extends Build 
  
  
      libraryDependencies ++= Seq(
 -      "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
 -      "org.scalatest" %% "scalatest" % "1.9.1" % "test",
 -      "org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
 -      "com.novocode" % "junit-interface" % "0.9" % "test",
 -      "org.easymock" % "easymock" % "3.1" % "test",
 -      "org.mockito" % "mockito-all" % "1.8.5" % "test"
 +        "io.netty"          % "netty-all"       % "4.0.0.CR1",
 +        "org.eclipse.jetty" % "jetty-server"    % "7.6.8.v20121106",
 +        "org.scalatest"    %% "scalatest"       % "1.9.1"  % "test",
 +        "org.scalacheck"   %% "scalacheck"      % "1.10.0" % "test",
 +        "com.novocode"      % "junit-interface" % "0.9"    % "test",
 +        "org.easymock"      % "easymock"        % "3.1"    % "test",
++        "org.mockito"       % "mockito-all"     % "1.8.5"  % "test",
 +        "commons-io"        % "commons-io"      % "2.4"    % "test"
      ),
 +
 +    parallelExecution := true,
      /* Workaround for issue #206 (fixed after SBT 0.11.0) */
      watchTransitiveSources <<= 
Defaults.inDependencies[Task[Seq[File]]](watchSources.task,
        const(std.TaskExtra.constant(Nil)), aggregate = true, includeRoot = 
true) apply { _.join.map(_.flatten) },

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --cc repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index fccb6e6,6e4504d..418c31e
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@@ -43,15 -63,44 +46,44 @@@ class ReplSuite extends FunSuite 
  
    def assertDoesNotContain(message: String, output: String) {
      assert(!output.contains(message),
 -           "Interpreter output contained '" + message + "':\n" + output)
 +      "Interpreter output contained '" + message + "':\n" + output)
    }
  
-   test("simple foreach with accumulator") {
+   test("propagation of local properties") {
+     // A mock ILoop that doesn't install the SIGINT handler.
+     class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) {
+       settings = new scala.tools.nsc.Settings
+       settings.usejavacp.value = true
+       org.apache.spark.repl.Main.interp = this
+       override def createInterpreter() {
+         intp = new SparkILoopInterpreter
+         intp.setContextClassLoader()
+       }
+     }
+ 
+     val out = new StringWriter()
+     val interp = new ILoop(new PrintWriter(out))
+     interp.sparkContext = new SparkContext("local", "repl-test")
+     interp.createInterpreter()
+     interp.intp.initialize()
+     interp.sparkContext.setLocalProperty("someKey", "someValue")
+ 
+     // Make sure the value we set in the caller to interpret is propagated in 
the thread that
+     // interprets the command.
+     
interp.interpret("org.apache.spark.repl.Main.interp.sparkContext.getLocalProperty(\"someKey\")")
+     assert(out.toString.contains("someValue"))
+ 
+     interp.sparkContext.stop()
+     System.clearProperty("spark.driver.port")
+     System.clearProperty("spark.hostPort")
+   }
+ 
+   test ("simple foreach with accumulator") {
      val output = runInterpreter("local", """
 -      val accum = sc.accumulator(0)
 -      sc.parallelize(1 to 10).foreach(x => accum += x)
 -      accum.value
 -      """)
 +                                           |val accum = sc.accumulator(0)
 +                                           |sc.parallelize(1 to 10).foreach(x 
=> accum += x)
 +                                           |accum.value
 +                                         """.stripMargin)
      assertDoesNotContain("error:", output)
      assertDoesNotContain("Exception", output)
      assertContains("res1: Int = 55", output)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/spark-class
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a60620b7/yarn/pom.xml
----------------------------------------------------------------------

Reply via email to