Merge remote branch 'upstream/master' into rename

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/acc7638f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/acc7638f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/acc7638f

Branch: refs/heads/master
Commit: acc7638f7c8fc9e1c355332f8db91ba508e85a5c
Parents: fdc52b2 3249e0e
Author: Kay Ousterhout <kayousterh...@gmail.com>
Authored: Tue Oct 15 14:43:56 2013 -0700
Committer: Kay Ousterhout <kayousterh...@gmail.com>
Committed: Tue Oct 15 14:43:56 2013 -0700

----------------------------------------------------------------------
 assembly/pom.xml                                |  18 +-
 bagel/pom.xml                                   |   8 +-
 bin/stop-slaves.sh                              |   2 -
 core/pom.xml                                    |  19 +-
 .../spark/network/netty/FileClientHandler.java  |   3 +-
 .../spark/network/netty/FileServerHandler.java  |   6 +-
 .../scala/org/apache/spark/Aggregator.scala     |  49 ++-
 .../apache/spark/BlockStoreShuffleFetcher.scala |  23 +-
 .../scala/org/apache/spark/CacheManager.scala   |  29 +-
 .../scala/org/apache/spark/FutureAction.scala   | 250 +++++++++++
 .../apache/spark/InterruptibleIterator.scala    |  30 ++
 .../org/apache/spark/MapOutputTracker.scala     |   4 +-
 .../scala/org/apache/spark/ShuffleFetcher.scala |   5 +-
 .../scala/org/apache/spark/SparkContext.scala   | 105 ++++-
 .../scala/org/apache/spark/TaskContext.scala    |  23 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |   2 +
 .../spark/api/python/PythonPartitioner.scala    |  10 +-
 .../org/apache/spark/api/python/PythonRDD.scala |   6 +-
 .../spark/broadcast/BitTorrentBroadcast.scala   |   4 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  13 +-
 .../apache/spark/broadcast/TreeBroadcast.scala  |   8 +-
 .../org/apache/spark/deploy/DeployMessage.scala |  29 +-
 .../spark/deploy/ExecutorDescription.scala      |  34 ++
 .../spark/deploy/FaultToleranceTest.scala       | 420 +++++++++++++++++++
 .../org/apache/spark/deploy/JsonProtocol.scala  |   6 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |   7 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  12 +-
 .../org/apache/spark/deploy/client/Client.scala |  84 +++-
 .../spark/deploy/client/ClientListener.scala    |   4 +
 .../apache/spark/deploy/client/TestClient.scala |   7 +-
 .../spark/deploy/master/ApplicationInfo.scala   |  53 ++-
 .../spark/deploy/master/ApplicationSource.scala |   2 +-
 .../spark/deploy/master/ApplicationState.scala  |   4 +-
 .../spark/deploy/master/ExecutorInfo.scala      |   7 +-
 .../master/FileSystemPersistenceEngine.scala    |  90 ++++
 .../deploy/master/LeaderElectionAgent.scala     |  45 ++
 .../org/apache/spark/deploy/master/Master.scala | 228 ++++++++--
 .../spark/deploy/master/MasterMessages.scala    |  46 ++
 .../spark/deploy/master/MasterSource.scala      |   6 +-
 .../spark/deploy/master/PersistenceEngine.scala |  53 +++
 .../spark/deploy/master/RecoveryState.scala     |  26 ++
 .../deploy/master/SparkZooKeeperSession.scala   | 203 +++++++++
 .../apache/spark/deploy/master/WorkerInfo.scala |  42 +-
 .../spark/deploy/master/WorkerState.scala       |   6 +-
 .../master/ZooKeeperLeaderElectionAgent.scala   | 136 ++++++
 .../master/ZooKeeperPersistenceEngine.scala     |  85 ++++
 .../spark/deploy/worker/ExecutorRunner.scala    |  13 +-
 .../org/apache/spark/deploy/worker/Worker.scala | 175 +++++---
 .../spark/deploy/worker/WorkerArguments.scala   |   8 +-
 .../spark/deploy/worker/WorkerSource.scala      |  10 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  10 +-
 .../org/apache/spark/executor/Executor.scala    | 122 ++++--
 .../apache/spark/executor/ExecutorSource.scala  |  18 +-
 .../spark/executor/MesosExecutorBackend.scala   |  18 +-
 .../apache/spark/network/netty/FileHeader.scala |  22 +-
 .../spark/network/netty/ShuffleCopier.scala     |  27 +-
 .../spark/network/netty/ShuffleSender.scala     |   8 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  | 122 ++++++
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |   6 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |   2 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  26 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 174 +++++---
 .../spark/rdd/MapPartitionsWithContextRDD.scala |  41 ++
 .../spark/rdd/MapPartitionsWithIndexRDD.scala   |  41 --
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  79 ++--
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  16 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |   5 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  95 +++--
 .../org/apache/spark/rdd/ShuffledRDD.scala      |   2 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |   2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 128 +++---
 .../spark/scheduler/DAGSchedulerEvent.scala     |  24 +-
 .../spark/scheduler/DAGSchedulerSource.scala    |  12 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |  62 +--
 .../scala/org/apache/spark/scheduler/Pool.scala |   5 +-
 .../org/apache/spark/scheduler/ResultTask.scala |  48 ++-
 .../spark/scheduler/SchedulableBuilder.scala    |   3 +
 .../apache/spark/scheduler/ShuffleMapTask.scala |  50 ++-
 .../apache/spark/scheduler/SparkListener.scala  |   2 +-
 .../spark/scheduler/SparkListenerBus.scala      |  18 +
 .../scala/org/apache/spark/scheduler/Task.scala |  63 ++-
 .../org/apache/spark/scheduler/TaskResult.scala |   3 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   3 +
 .../org/apache/spark/scheduler/TaskSet.scala    |   4 +
 .../scheduler/cluster/ClusterScheduler.scala    |  37 +-
 .../cluster/ClusterTaskSetManager.scala         |  87 ++--
 .../cluster/CoarseGrainedClusterMessage.scala   |   2 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |   7 +
 .../scheduler/cluster/SchedulerBackend.scala    |   6 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |  14 +-
 .../spark/scheduler/local/LocalScheduler.scala  | 190 +++------
 .../scheduler/local/LocalTaskSetManager.scala   |  13 +-
 .../spark/serializer/KryoSerializer.scala       |  30 +-
 .../apache/spark/storage/BlockException.scala   |   2 +-
 .../spark/storage/BlockFetcherIterator.scala    |  24 +-
 .../org/apache/spark/storage/BlockId.scala      |  96 +++++
 .../org/apache/spark/storage/BlockManager.scala | 130 +++---
 .../spark/storage/BlockManagerMaster.scala      |   8 +-
 .../spark/storage/BlockManagerMasterActor.scala |  21 +-
 .../spark/storage/BlockManagerMessages.scala    |  16 +-
 .../spark/storage/BlockManagerSource.scala      |   8 +-
 .../spark/storage/BlockManagerWorker.scala      |   4 +-
 .../org/apache/spark/storage/BlockMessage.scala |  38 +-
 .../spark/storage/BlockMessageArray.scala       |   7 +-
 .../spark/storage/BlockObjectWriter.scala       |   2 +-
 .../org/apache/spark/storage/BlockStore.scala   |  14 +-
 .../org/apache/spark/storage/DiskStore.scala    |  37 +-
 .../org/apache/spark/storage/MemoryStore.scala  |  34 +-
 .../spark/storage/ShuffleBlockManager.scala     |  16 +-
 .../org/apache/spark/storage/StorageUtils.scala |  47 +--
 .../apache/spark/storage/ThreadingTest.scala    |   6 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |   2 +-
 .../org/apache/spark/ui/storage/RDDPage.scala   |  23 +-
 .../org/apache/spark/util/AppendOnlyMap.scala   | 230 ++++++++++
 .../org/apache/spark/util/MetadataCleaner.scala |  36 +-
 .../scala/org/apache/spark/util/Utils.scala     |  13 +
 .../org/apache/spark/CacheManagerSuite.scala    |  21 +-
 .../org/apache/spark/CheckpointSuite.scala      |  10 +-
 .../org/apache/spark/DistributedSuite.scala     |  16 +-
 .../scala/org/apache/spark/JavaAPISuite.java    |   2 +-
 .../org/apache/spark/JobCancellationSuite.scala | 177 ++++++++
 .../apache/spark/deploy/JsonProtocolSuite.scala |   7 +-
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 176 ++++++++
 .../spark/rdd/PairRDDFunctionsSuite.scala       |   2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  27 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  30 +-
 .../spark/scheduler/cluster/FakeTask.scala      |   5 +-
 .../cluster/TaskResultGetterSuite.scala         |   3 +-
 .../scheduler/local/LocalSchedulerSuite.scala   |  28 +-
 .../spark/serializer/KryoSerializerSuite.scala  |  21 +
 .../org/apache/spark/storage/BlockIdSuite.scala | 114 +++++
 .../spark/storage/BlockManagerSuite.scala       | 102 ++---
 .../scala/org/apache/spark/ui/UISuite.scala     |   7 +-
 .../apache/spark/util/AppendOnlyMapSuite.scala  | 154 +++++++
 .../org/apache/spark/util/UtilsSuite.scala      |  11 +
 docker/README.md                                |   5 +
 docker/build                                    |  22 +
 docker/spark-test/README.md                     |  10 +
 docker/spark-test/base/Dockerfile               |  38 ++
 docker/spark-test/build                         |  22 +
 docker/spark-test/master/Dockerfile             |  21 +
 docker/spark-test/master/default_cmd            |  22 +
 docker/spark-test/worker/Dockerfile             |  22 +
 docker/spark-test/worker/default_cmd            |  22 +
 docs/_layouts/global.html                       |   4 +-
 docs/mllib-guide.md                             |  24 +-
 docs/python-programming-guide.md                |   2 +-
 docs/running-on-yarn.md                         |   9 +-
 docs/spark-standalone.md                        |  75 ++++
 docs/streaming-programming-guide.md             |   5 +-
 docs/tuning.md                                  |   2 +-
 ec2/spark_ec2.py                                |   4 +-
 examples/pom.xml                                |  30 +-
 .../org/apache/spark/examples/SparkKMeans.scala |   2 -
 mllib/pom.xml                                   |   8 +-
 .../apache/spark/mllib/recommendation/ALS.scala | 199 +++++++--
 .../mllib/recommendation/JavaALSSuite.java      |  85 +++-
 .../spark/mllib/recommendation/ALSSuite.scala   |  75 +++-
 pom.xml                                         |  43 +-
 project/SparkBuild.scala                        |  14 +-
 python/pyspark/rdd.py                           |  70 +++-
 python/pyspark/serializers.py                   |   4 +
 repl-bin/pom.xml                                |  10 +-
 repl/pom.xml                                    |  20 +-
 streaming/pom.xml                               |   9 +-
 .../spark/streaming/NetworkInputTracker.scala   |  11 +-
 .../streaming/dstream/NetworkInputDStream.scala |  14 +-
 .../streaming/dstream/RawInputDStream.scala     |   4 +-
 .../streaming/receivers/ActorReceiver.scala     |   4 +-
 tools/pom.xml                                   |   8 +-
 yarn/pom.xml                                    |   6 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  55 ++-
 .../org/apache/spark/deploy/yarn/Client.scala   | 160 ++++++-
 .../spark/deploy/yarn/ClientArguments.scala     |  25 +-
 .../spark/deploy/yarn/WorkerRunnable.scala      |  58 ++-
 176 files changed, 5608 insertions(+), 1449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --cc 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 7d74786,0000000..52b1c49
mode 100644,000000..100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@@ -1,109 -1,0 +1,117 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.executor
 +
 +import java.nio.ByteBuffer
 +
 +import akka.actor.{ActorRef, Actor, Props, Terminated}
 +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, 
RemoteClientDisconnected}
 +
 +import org.apache.spark.{Logging, SparkEnv}
 +import org.apache.spark.TaskState.TaskState
 +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 +import org.apache.spark.util.{Utils, AkkaUtils}
 +
 +
 +private[spark] class CoarseGrainedExecutorBackend(
 +    driverUrl: String,
 +    executorId: String,
 +    hostPort: String,
 +    cores: Int)
 +  extends Actor
 +  with ExecutorBackend
 +  with Logging {
 +
 +  Utils.checkHostPort(hostPort, "Expected hostport")
 +
 +  var executor: Executor = null
 +  var driver: ActorRef = null
 +
 +  override def preStart() {
 +    logInfo("Connecting to driver: " + driverUrl)
 +    driver = context.actorFor(driverUrl)
 +    driver ! RegisterExecutor(executorId, hostPort, cores)
 +    context.system.eventStream.subscribe(self, 
classOf[RemoteClientLifeCycleEvent])
 +    context.watch(driver) // Doesn't work with remote actors, but useful for 
testing
 +  }
 +
 +  override def receive = {
 +    case RegisteredExecutor(sparkProperties) =>
 +      logInfo("Successfully registered with driver")
 +      // Make this host instead of hostPort ?
 +      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, 
sparkProperties)
 +
 +    case RegisterExecutorFailed(message) =>
 +      logError("Slave registration failed: " + message)
 +      System.exit(1)
 +
 +    case LaunchTask(taskDesc) =>
 +      logInfo("Got assigned task " + taskDesc.taskId)
 +      if (executor == null) {
-         logError("Received launchTask but executor was null")
++        logError("Received LaunchTask command but executor was null")
 +        System.exit(1)
 +      } else {
 +        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
 +      }
 +
++    case KillTask(taskId, _) =>
++      if (executor == null) {
++        logError("Received KillTask command but executor was null")
++        System.exit(1)
++      } else {
++        executor.killTask(taskId)
++      }
++
 +    case Terminated(_) | RemoteClientDisconnected(_, _) | 
RemoteClientShutdown(_, _) =>
 +      logError("Driver terminated or disconnected! Shutting down.")
 +      System.exit(1)
 +  }
 +
 +  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) 
{
 +    driver ! StatusUpdate(executorId, taskId, state, data)
 +  }
 +}
 +
 +private[spark] object CoarseGrainedExecutorBackend {
 +  def run(driverUrl: String, executorId: String, hostname: String, cores: 
Int) {
 +    // Debug code
 +    Utils.checkHost(hostname)
 +
 +    // Create a new ActorSystem to run the backend, because we can't create a 
SparkEnv / Executor
 +    // before getting started with all our system properties, etc
 +    val (actorSystem, boundPort) = 
AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
 +    // set it
 +    val sparkHostPort = hostname + ":" + boundPort
 +    System.setProperty("spark.hostPort", sparkHostPort)
 +    val actor = actorSystem.actorOf(
 +      Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, 
sparkHostPort, cores)),
 +      name = "Executor")
 +    actorSystem.awaitTermination()
 +  }
 +
 +  def main(args: Array[String]) {
 +    if (args.length < 4) {
 +      //the reason we allow the last frameworkId argument is to make it easy 
to kill rogue executors
 +      System.err.println(
 +        "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> 
<hostname> <cores> " +
 +        "[<appid>]")
 +      System.exit(1)
 +    }
 +    run(args(0), args(1), args(2), args(3).toInt)
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --cc 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index d017725,0000000..3d26206
mode 100644,000000..100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@@ -1,63 -1,0 +1,65 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.scheduler.cluster
 +
 +import java.nio.ByteBuffer
 +
 +import org.apache.spark.TaskState.TaskState
 +import org.apache.spark.scheduler.TaskDescription
 +import org.apache.spark.util.{Utils, SerializableBuffer}
 +
 +
 +private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
 +
 +private[spark] object CoarseGrainedClusterMessages {
 +
 +  // Driver to executors
 +  case class LaunchTask(task: TaskDescription) extends 
CoarseGrainedClusterMessage
 +
++  case class KillTask(taskId: Long, executor: String) extends 
StandaloneClusterMessage
++
 +  case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
 +    extends CoarseGrainedClusterMessage
 +
 +  case class RegisterExecutorFailed(message: String) extends 
CoarseGrainedClusterMessage
 +
 +  // Executors to driver
 +  case class RegisterExecutor(executorId: String, hostPort: String, cores: 
Int)
 +    extends CoarseGrainedClusterMessage {
 +    Utils.checkHostPort(hostPort, "Expected host port")
 +  }
 +
 +  case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
 +    data: SerializableBuffer) extends CoarseGrainedClusterMessage
 +
 +  object StatusUpdate {
 +    /** Alternate factory method that takes a ByteBuffer directly for the 
data field */
 +    def apply(executorId: String, taskId: Long, state: TaskState, data: 
ByteBuffer)
 +      : StatusUpdate = {
 +      StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
 +    }
 +  }
 +
 +  // Internal messages in driver
 +  case object ReviveOffers extends CoarseGrainedClusterMessage
 +
 +  case object StopDriver extends CoarseGrainedClusterMessage
 +
 +  case class RemoveExecutor(executorId: String, reason: String) extends 
CoarseGrainedClusterMessage
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 11ed0e9,0000000..c0f1c6d
mode 100644,000000..100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@@ -1,203 -1,0 +1,210 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.spark.scheduler.cluster
 +
 +import java.util.concurrent.atomic.AtomicInteger
 +
 +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 +
 +import akka.actor._
 +import akka.dispatch.Await
 +import akka.pattern.ask
 +import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, 
RemoteClientLifeCycleEvent}
 +import akka.util.Duration
 +import akka.util.duration._
 +
 +import org.apache.spark.{SparkException, Logging, TaskState}
 +import org.apache.spark.scheduler.TaskDescription
 +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 +import org.apache.spark.util.Utils
 +
 +/**
 + * A scheduler backend that waits for coarse grained executors to connect to 
it through Akka.
 + * This backend holds onto each executor for the duration of the Spark job 
rather than relinquishing
 + * executors whenever a task is done and asking the scheduler to launch a new 
executor for
 + * each new task. Executors may be launched in a variety of ways, such as 
Mesos tasks for the
 + * coarse-grained Mesos mode or standalone processes for Spark's standalone 
deploy mode
 + * (spark.deploy.*).
 + */
 +private[spark]
 +class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: 
ActorSystem)
 +  extends SchedulerBackend with Logging
 +{
 +  // Use an atomic variable to track total number of cores in the cluster for 
simplicity and speed
 +  var totalCoreCount = new AtomicInteger(0)
 +
 +  class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
 +    private val executorActor = new HashMap[String, ActorRef]
 +    private val executorAddress = new HashMap[String, Address]
 +    private val executorHost = new HashMap[String, String]
 +    private val freeCores = new HashMap[String, Int]
 +    private val actorToExecutorId = new HashMap[ActorRef, String]
 +    private val addressToExecutorId = new HashMap[Address, String]
 +
 +    override def preStart() {
 +      // Listen for remote client disconnection events, since they don't go 
through Akka's watch()
 +      context.system.eventStream.subscribe(self, 
classOf[RemoteClientLifeCycleEvent])
 +
 +      // Periodically revive offers to allow delay scheduling to work
 +      val reviveInterval = 
System.getProperty("spark.scheduler.revive.interval", "1000").toLong
 +      context.system.scheduler.schedule(0.millis, reviveInterval.millis, 
self, ReviveOffers)
 +    }
 +
 +    def receive = {
 +      case RegisterExecutor(executorId, hostPort, cores) =>
 +        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
 +        if (executorActor.contains(executorId)) {
 +          sender ! RegisterExecutorFailed("Duplicate executor ID: " + 
executorId)
 +        } else {
 +          logInfo("Registered executor: " + sender + " with ID " + executorId)
 +          sender ! RegisteredExecutor(sparkProperties)
 +          context.watch(sender)
 +          executorActor(executorId) = sender
 +          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
 +          freeCores(executorId) = cores
 +          executorAddress(executorId) = sender.path.address
 +          actorToExecutorId(sender) = executorId
 +          addressToExecutorId(sender.path.address) = executorId
 +          totalCoreCount.addAndGet(cores)
 +          makeOffers()
 +        }
 +
 +      case StatusUpdate(executorId, taskId, state, data) =>
 +        scheduler.statusUpdate(taskId, state, data.value)
 +        if (TaskState.isFinished(state)) {
 +          freeCores(executorId) += 1
 +          makeOffers(executorId)
 +        }
 +
 +      case ReviveOffers =>
 +        makeOffers()
 +
++      case KillTask(taskId, executorId) =>
++        executorActor(executorId) ! KillTask(taskId, executorId)
++
 +      case StopDriver =>
 +        sender ! true
 +        context.stop(self)
 +
 +      case RemoveExecutor(executorId, reason) =>
 +        removeExecutor(executorId, reason)
 +        sender ! true
 +
 +      case Terminated(actor) =>
 +        actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor 
terminated"))
 +
 +      case RemoteClientDisconnected(transport, address) =>
 +        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote 
Akka client disconnected"))
 +
 +      case RemoteClientShutdown(transport, address) =>
 +        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote 
Akka client shutdown"))
 +    }
 +
 +    // Make fake resource offers on all executors
 +    def makeOffers() {
 +      launchTasks(scheduler.resourceOffers(
 +        executorHost.toArray.map {case (id, host) => new WorkerOffer(id, 
host, freeCores(id))}))
 +    }
 +
 +    // Make fake resource offers on just one executor
 +    def makeOffers(executorId: String) {
 +      launchTasks(scheduler.resourceOffers(
 +        Seq(new WorkerOffer(executorId, executorHost(executorId), 
freeCores(executorId)))))
 +    }
 +
 +    // Launch tasks returned by a set of resource offers
 +    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
 +      for (task <- tasks.flatten) {
 +        freeCores(task.executorId) -= 1
 +        executorActor(task.executorId) ! LaunchTask(task)
 +      }
 +    }
 +
 +    // Remove a disconnected slave from the cluster
 +    def removeExecutor(executorId: String, reason: String) {
 +      if (executorActor.contains(executorId)) {
 +        logInfo("Executor " + executorId + " disconnected, so removing it")
 +        val numCores = freeCores(executorId)
 +        actorToExecutorId -= executorActor(executorId)
 +        addressToExecutorId -= executorAddress(executorId)
 +        executorActor -= executorId
 +        executorHost -= executorId
 +        freeCores -= executorId
 +        totalCoreCount.addAndGet(-numCores)
 +        scheduler.executorLost(executorId, SlaveLost(reason))
 +      }
 +    }
 +  }
 +
 +  var driverActor: ActorRef = null
 +  val taskIdsOnSlave = new HashMap[String, HashSet[String]]
 +
 +  override def start() {
 +    val properties = new ArrayBuffer[(String, String)]
 +    val iterator = System.getProperties.entrySet.iterator
 +    while (iterator.hasNext) {
 +      val entry = iterator.next
 +      val (key, value) = (entry.getKey.toString, entry.getValue.toString)
 +      if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
 +        properties += ((key, value))
 +      }
 +    }
 +    driverActor = actorSystem.actorOf(
 +      Props(new DriverActor(properties)), name = 
CoarseGrainedSchedulerBackend.ACTOR_NAME)
 +  }
 +
 +  private val timeout = 
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, 
"seconds")
 +
 +  override def stop() {
 +    try {
 +      if (driverActor != null) {
 +        val future = driverActor.ask(StopDriver)(timeout)
 +        Await.result(future, timeout)
 +      }
 +    } catch {
 +      case e: Exception =>
 +        throw new SparkException("Error stopping standalone scheduler's 
driver actor", e)
 +    }
 +  }
 +
 +  override def reviveOffers() {
 +    driverActor ! ReviveOffers
 +  }
 +
++  override def killTask(taskId: Long, executorId: String) {
++    driverActor ! KillTask(taskId, executorId)
++  }
++
 +  override def defaultParallelism() = 
Option(System.getProperty("spark.default.parallelism"))
 +      .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
 +
 +  // Called by subclasses when notified of a lost worker
 +  def removeExecutor(executorId: String, reason: String) {
 +    try {
 +      val future = driverActor.ask(RemoveExecutor(executorId, 
reason))(timeout)
 +      Await.result(future, timeout)
 +    } catch {
 +      case e: Exception =>
 +        throw new SparkException("Error notifying standalone scheduler's 
driver actor", e)
 +    }
 +  }
 +}
 +
 +private[spark] object CoarseGrainedSchedulerBackend {
 +  val ACTOR_NAME = "CoarseGrainedScheduler"
 +}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --cc 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 226ac59,cb88159..cefa970
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@@ -26,9 -26,9 +26,9 @@@ import org.apache.spark.util.Util
  private[spark] class SparkDeploySchedulerBackend(
      scheduler: ClusterScheduler,
      sc: SparkContext,
-     master: String,
+     masters: Array[String],
      appName: String)
 -  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
 +  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
    with ClientListener
    with Logging {
  

Reply via email to