Merge remote branch 'upstream/master' into rename
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/acc7638f Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/acc7638f Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/acc7638f Branch: refs/heads/master Commit: acc7638f7c8fc9e1c355332f8db91ba508e85a5c Parents: fdc52b2 3249e0e Author: Kay Ousterhout <kayousterh...@gmail.com> Authored: Tue Oct 15 14:43:56 2013 -0700 Committer: Kay Ousterhout <kayousterh...@gmail.com> Committed: Tue Oct 15 14:43:56 2013 -0700 ---------------------------------------------------------------------- assembly/pom.xml | 18 +- bagel/pom.xml | 8 +- bin/stop-slaves.sh | 2 - core/pom.xml | 19 +- .../spark/network/netty/FileClientHandler.java | 3 +- .../spark/network/netty/FileServerHandler.java | 6 +- .../scala/org/apache/spark/Aggregator.scala | 49 ++- .../apache/spark/BlockStoreShuffleFetcher.scala | 23 +- .../scala/org/apache/spark/CacheManager.scala | 29 +- .../scala/org/apache/spark/FutureAction.scala | 250 +++++++++++ .../apache/spark/InterruptibleIterator.scala | 30 ++ .../org/apache/spark/MapOutputTracker.scala | 4 +- .../scala/org/apache/spark/ShuffleFetcher.scala | 5 +- .../scala/org/apache/spark/SparkContext.scala | 105 ++++- .../scala/org/apache/spark/TaskContext.scala | 23 +- .../scala/org/apache/spark/TaskEndReason.scala | 2 + .../spark/api/python/PythonPartitioner.scala | 10 +- .../org/apache/spark/api/python/PythonRDD.scala | 6 +- .../spark/broadcast/BitTorrentBroadcast.scala | 4 +- .../apache/spark/broadcast/HttpBroadcast.scala | 13 +- .../apache/spark/broadcast/TreeBroadcast.scala | 8 +- .../org/apache/spark/deploy/DeployMessage.scala | 29 +- .../spark/deploy/ExecutorDescription.scala | 34 ++ .../spark/deploy/FaultToleranceTest.scala | 420 +++++++++++++++++++ .../org/apache/spark/deploy/JsonProtocol.scala | 6 +- .../apache/spark/deploy/LocalSparkCluster.scala | 7 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 12 +- .../org/apache/spark/deploy/client/Client.scala | 84 +++- .../spark/deploy/client/ClientListener.scala | 4 + .../apache/spark/deploy/client/TestClient.scala | 7 +- .../spark/deploy/master/ApplicationInfo.scala | 53 ++- .../spark/deploy/master/ApplicationSource.scala | 2 +- .../spark/deploy/master/ApplicationState.scala | 4 +- .../spark/deploy/master/ExecutorInfo.scala | 7 +- .../master/FileSystemPersistenceEngine.scala | 90 ++++ .../deploy/master/LeaderElectionAgent.scala | 45 ++ .../org/apache/spark/deploy/master/Master.scala | 228 ++++++++-- .../spark/deploy/master/MasterMessages.scala | 46 ++ .../spark/deploy/master/MasterSource.scala | 6 +- .../spark/deploy/master/PersistenceEngine.scala | 53 +++ .../spark/deploy/master/RecoveryState.scala | 26 ++ .../deploy/master/SparkZooKeeperSession.scala | 203 +++++++++ .../apache/spark/deploy/master/WorkerInfo.scala | 42 +- .../spark/deploy/master/WorkerState.scala | 6 +- .../master/ZooKeeperLeaderElectionAgent.scala | 136 ++++++ .../master/ZooKeeperPersistenceEngine.scala | 85 ++++ .../spark/deploy/worker/ExecutorRunner.scala | 13 +- .../org/apache/spark/deploy/worker/Worker.scala | 175 +++++--- .../spark/deploy/worker/WorkerArguments.scala | 8 +- .../spark/deploy/worker/WorkerSource.scala | 10 +- .../spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- .../executor/CoarseGrainedExecutorBackend.scala | 10 +- .../org/apache/spark/executor/Executor.scala | 122 ++++-- .../apache/spark/executor/ExecutorSource.scala | 18 +- .../spark/executor/MesosExecutorBackend.scala | 18 +- .../apache/spark/network/netty/FileHeader.scala | 22 +- .../spark/network/netty/ShuffleCopier.scala | 27 +- .../spark/network/netty/ShuffleSender.scala | 8 +- .../org/apache/spark/rdd/AsyncRDDActions.scala | 122 ++++++ .../scala/org/apache/spark/rdd/BlockRDD.scala | 6 +- .../org/apache/spark/rdd/CheckpointRDD.scala | 2 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 26 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 174 +++++--- .../spark/rdd/MapPartitionsWithContextRDD.scala | 41 ++ .../spark/rdd/MapPartitionsWithIndexRDD.scala | 41 -- .../org/apache/spark/rdd/NewHadoopRDD.scala | 79 ++-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 16 +- .../spark/rdd/ParallelCollectionRDD.scala | 5 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 95 +++-- .../org/apache/spark/rdd/ShuffledRDD.scala | 2 +- .../org/apache/spark/rdd/SubtractedRDD.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 128 +++--- .../spark/scheduler/DAGSchedulerEvent.scala | 24 +- .../spark/scheduler/DAGSchedulerSource.scala | 12 +- .../org/apache/spark/scheduler/JobWaiter.scala | 62 +-- .../scala/org/apache/spark/scheduler/Pool.scala | 5 +- .../org/apache/spark/scheduler/ResultTask.scala | 48 ++- .../spark/scheduler/SchedulableBuilder.scala | 3 + .../apache/spark/scheduler/ShuffleMapTask.scala | 50 ++- .../apache/spark/scheduler/SparkListener.scala | 2 +- .../spark/scheduler/SparkListenerBus.scala | 18 + .../scala/org/apache/spark/scheduler/Task.scala | 63 ++- .../org/apache/spark/scheduler/TaskResult.scala | 3 +- .../apache/spark/scheduler/TaskScheduler.scala | 3 + .../org/apache/spark/scheduler/TaskSet.scala | 4 + .../scheduler/cluster/ClusterScheduler.scala | 37 +- .../cluster/ClusterTaskSetManager.scala | 87 ++-- .../cluster/CoarseGrainedClusterMessage.scala | 2 + .../cluster/CoarseGrainedSchedulerBackend.scala | 7 + .../scheduler/cluster/SchedulerBackend.scala | 6 +- .../cluster/SparkDeploySchedulerBackend.scala | 14 +- .../spark/scheduler/local/LocalScheduler.scala | 190 +++------ .../scheduler/local/LocalTaskSetManager.scala | 13 +- .../spark/serializer/KryoSerializer.scala | 30 +- .../apache/spark/storage/BlockException.scala | 2 +- .../spark/storage/BlockFetcherIterator.scala | 24 +- .../org/apache/spark/storage/BlockId.scala | 96 +++++ .../org/apache/spark/storage/BlockManager.scala | 130 +++--- .../spark/storage/BlockManagerMaster.scala | 8 +- .../spark/storage/BlockManagerMasterActor.scala | 21 +- .../spark/storage/BlockManagerMessages.scala | 16 +- .../spark/storage/BlockManagerSource.scala | 8 +- .../spark/storage/BlockManagerWorker.scala | 4 +- .../org/apache/spark/storage/BlockMessage.scala | 38 +- .../spark/storage/BlockMessageArray.scala | 7 +- .../spark/storage/BlockObjectWriter.scala | 2 +- .../org/apache/spark/storage/BlockStore.scala | 14 +- .../org/apache/spark/storage/DiskStore.scala | 37 +- .../org/apache/spark/storage/MemoryStore.scala | 34 +- .../spark/storage/ShuffleBlockManager.scala | 16 +- .../org/apache/spark/storage/StorageUtils.scala | 47 +-- .../apache/spark/storage/ThreadingTest.scala | 6 +- .../apache/spark/ui/UIWorkloadGenerator.scala | 2 +- .../org/apache/spark/ui/storage/RDDPage.scala | 23 +- .../org/apache/spark/util/AppendOnlyMap.scala | 230 ++++++++++ .../org/apache/spark/util/MetadataCleaner.scala | 36 +- .../scala/org/apache/spark/util/Utils.scala | 13 + .../org/apache/spark/CacheManagerSuite.scala | 21 +- .../org/apache/spark/CheckpointSuite.scala | 10 +- .../org/apache/spark/DistributedSuite.scala | 16 +- .../scala/org/apache/spark/JavaAPISuite.java | 2 +- .../org/apache/spark/JobCancellationSuite.scala | 177 ++++++++ .../apache/spark/deploy/JsonProtocolSuite.scala | 7 +- .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 176 ++++++++ .../spark/rdd/PairRDDFunctionsSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 27 +- .../spark/scheduler/SparkListenerSuite.scala | 30 +- .../spark/scheduler/cluster/FakeTask.scala | 5 +- .../cluster/TaskResultGetterSuite.scala | 3 +- .../scheduler/local/LocalSchedulerSuite.scala | 28 +- .../spark/serializer/KryoSerializerSuite.scala | 21 + .../org/apache/spark/storage/BlockIdSuite.scala | 114 +++++ .../spark/storage/BlockManagerSuite.scala | 102 ++--- .../scala/org/apache/spark/ui/UISuite.scala | 7 +- .../apache/spark/util/AppendOnlyMapSuite.scala | 154 +++++++ .../org/apache/spark/util/UtilsSuite.scala | 11 + docker/README.md | 5 + docker/build | 22 + docker/spark-test/README.md | 10 + docker/spark-test/base/Dockerfile | 38 ++ docker/spark-test/build | 22 + docker/spark-test/master/Dockerfile | 21 + docker/spark-test/master/default_cmd | 22 + docker/spark-test/worker/Dockerfile | 22 + docker/spark-test/worker/default_cmd | 22 + docs/_layouts/global.html | 4 +- docs/mllib-guide.md | 24 +- docs/python-programming-guide.md | 2 +- docs/running-on-yarn.md | 9 +- docs/spark-standalone.md | 75 ++++ docs/streaming-programming-guide.md | 5 +- docs/tuning.md | 2 +- ec2/spark_ec2.py | 4 +- examples/pom.xml | 30 +- .../org/apache/spark/examples/SparkKMeans.scala | 2 - mllib/pom.xml | 8 +- .../apache/spark/mllib/recommendation/ALS.scala | 199 +++++++-- .../mllib/recommendation/JavaALSSuite.java | 85 +++- .../spark/mllib/recommendation/ALSSuite.scala | 75 +++- pom.xml | 43 +- project/SparkBuild.scala | 14 +- python/pyspark/rdd.py | 70 +++- python/pyspark/serializers.py | 4 + repl-bin/pom.xml | 10 +- repl/pom.xml | 20 +- streaming/pom.xml | 9 +- .../spark/streaming/NetworkInputTracker.scala | 11 +- .../streaming/dstream/NetworkInputDStream.scala | 14 +- .../streaming/dstream/RawInputDStream.scala | 4 +- .../streaming/receivers/ActorReceiver.scala | 4 +- tools/pom.xml | 8 +- yarn/pom.xml | 6 +- .../spark/deploy/yarn/ApplicationMaster.scala | 55 ++- .../org/apache/spark/deploy/yarn/Client.scala | 160 ++++++- .../spark/deploy/yarn/ClientArguments.scala | 25 +- .../spark/deploy/yarn/WorkerRunnable.scala | 58 ++- 176 files changed, 5608 insertions(+), 1449 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 7d74786,0000000..52b1c49 mode 100644,000000..100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@@ -1,109 -1,0 +1,117 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.nio.ByteBuffer + +import akka.actor.{ActorRef, Actor, Props, Terminated} +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} + +import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.util.{Utils, AkkaUtils} + + +private[spark] class CoarseGrainedExecutorBackend( + driverUrl: String, + executorId: String, + hostPort: String, + cores: Int) + extends Actor + with ExecutorBackend + with Logging { + + Utils.checkHostPort(hostPort, "Expected hostport") + + var executor: Executor = null + var driver: ActorRef = null + + override def preStart() { + logInfo("Connecting to driver: " + driverUrl) + driver = context.actorFor(driverUrl) + driver ! RegisterExecutor(executorId, hostPort, cores) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(driver) // Doesn't work with remote actors, but useful for testing + } + + override def receive = { + case RegisteredExecutor(sparkProperties) => + logInfo("Successfully registered with driver") + // Make this host instead of hostPort ? + executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) + + case RegisterExecutorFailed(message) => + logError("Slave registration failed: " + message) + System.exit(1) + + case LaunchTask(taskDesc) => + logInfo("Got assigned task " + taskDesc.taskId) + if (executor == null) { - logError("Received launchTask but executor was null") ++ logError("Received LaunchTask command but executor was null") + System.exit(1) + } else { + executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + } + ++ case KillTask(taskId, _) => ++ if (executor == null) { ++ logError("Received KillTask command but executor was null") ++ System.exit(1) ++ } else { ++ executor.killTask(taskId) ++ } ++ + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + logError("Driver terminated or disconnected! Shutting down.") + System.exit(1) + } + + override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { + driver ! StatusUpdate(executorId, taskId, state, data) + } +} + +private[spark] object CoarseGrainedExecutorBackend { + def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { + // Debug code + Utils.checkHost(hostname) + + // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor + // before getting started with all our system properties, etc + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) + // set it + val sparkHostPort = hostname + ":" + boundPort + System.setProperty("spark.hostPort", sparkHostPort) + val actor = actorSystem.actorOf( + Props(new CoarseGrainedExecutorBackend(driverUrl, executorId, sparkHostPort, cores)), + name = "Executor") + actorSystem.awaitTermination() + } + + def main(args: Array[String]) { + if (args.length < 4) { + //the reason we allow the last frameworkId argument is to make it easy to kill rogue executors + System.err.println( + "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " + + "[<appid>]") + System.exit(1) + } + run(args(0), args(1), args(2), args(3).toInt) + } +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index d017725,0000000..3d26206 mode 100644,000000..100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@@ -1,63 -1,0 +1,65 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.nio.ByteBuffer + +import org.apache.spark.TaskState.TaskState +import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.util.{Utils, SerializableBuffer} + + +private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable + +private[spark] object CoarseGrainedClusterMessages { + + // Driver to executors + case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage + ++ case class KillTask(taskId: Long, executor: String) extends StandaloneClusterMessage ++ + case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) + extends CoarseGrainedClusterMessage + + case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage + + // Executors to driver + case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) + extends CoarseGrainedClusterMessage { + Utils.checkHostPort(hostPort, "Expected host port") + } + + case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, + data: SerializableBuffer) extends CoarseGrainedClusterMessage + + object StatusUpdate { + /** Alternate factory method that takes a ByteBuffer directly for the data field */ + def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer) + : StatusUpdate = { + StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) + } + } + + // Internal messages in driver + case object ReviveOffers extends CoarseGrainedClusterMessage + + case object StopDriver extends CoarseGrainedClusterMessage + + case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage + +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 11ed0e9,0000000..c0f1c6d mode 100644,000000..100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@@ -1,203 -1,0 +1,210 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import akka.actor._ +import akka.dispatch.Await +import akka.pattern.ask +import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import akka.util.Duration +import akka.util.duration._ + +import org.apache.spark.{SparkException, Logging, TaskState} +import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.util.Utils + +/** + * A scheduler backend that waits for coarse grained executors to connect to it through Akka. + * This backend holds onto each executor for the duration of the Spark job rather than relinquishing + * executors whenever a task is done and asking the scheduler to launch a new executor for + * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the + * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode + * (spark.deploy.*). + */ +private[spark] +class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) + extends SchedulerBackend with Logging +{ + // Use an atomic variable to track total number of cores in the cluster for simplicity and speed + var totalCoreCount = new AtomicInteger(0) + + class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { + private val executorActor = new HashMap[String, ActorRef] + private val executorAddress = new HashMap[String, Address] + private val executorHost = new HashMap[String, String] + private val freeCores = new HashMap[String, Int] + private val actorToExecutorId = new HashMap[ActorRef, String] + private val addressToExecutorId = new HashMap[Address, String] + + override def preStart() { + // Listen for remote client disconnection events, since they don't go through Akka's watch() + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + + // Periodically revive offers to allow delay scheduling to work + val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong + context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers) + } + + def receive = { + case RegisterExecutor(executorId, hostPort, cores) => + Utils.checkHostPort(hostPort, "Host port expected " + hostPort) + if (executorActor.contains(executorId)) { + sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) + } else { + logInfo("Registered executor: " + sender + " with ID " + executorId) + sender ! RegisteredExecutor(sparkProperties) + context.watch(sender) + executorActor(executorId) = sender + executorHost(executorId) = Utils.parseHostPort(hostPort)._1 + freeCores(executorId) = cores + executorAddress(executorId) = sender.path.address + actorToExecutorId(sender) = executorId + addressToExecutorId(sender.path.address) = executorId + totalCoreCount.addAndGet(cores) + makeOffers() + } + + case StatusUpdate(executorId, taskId, state, data) => + scheduler.statusUpdate(taskId, state, data.value) + if (TaskState.isFinished(state)) { + freeCores(executorId) += 1 + makeOffers(executorId) + } + + case ReviveOffers => + makeOffers() + ++ case KillTask(taskId, executorId) => ++ executorActor(executorId) ! KillTask(taskId, executorId) ++ + case StopDriver => + sender ! true + context.stop(self) + + case RemoveExecutor(executorId, reason) => + removeExecutor(executorId, reason) + sender ! true + + case Terminated(actor) => + actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) + + case RemoteClientDisconnected(transport, address) => + addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected")) + + case RemoteClientShutdown(transport, address) => + addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown")) + } + + // Make fake resource offers on all executors + def makeOffers() { + launchTasks(scheduler.resourceOffers( + executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) + } + + // Make fake resource offers on just one executor + def makeOffers(executorId: String) { + launchTasks(scheduler.resourceOffers( + Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) + } + + // Launch tasks returned by a set of resource offers + def launchTasks(tasks: Seq[Seq[TaskDescription]]) { + for (task <- tasks.flatten) { + freeCores(task.executorId) -= 1 + executorActor(task.executorId) ! LaunchTask(task) + } + } + + // Remove a disconnected slave from the cluster + def removeExecutor(executorId: String, reason: String) { + if (executorActor.contains(executorId)) { + logInfo("Executor " + executorId + " disconnected, so removing it") + val numCores = freeCores(executorId) + actorToExecutorId -= executorActor(executorId) + addressToExecutorId -= executorAddress(executorId) + executorActor -= executorId + executorHost -= executorId + freeCores -= executorId + totalCoreCount.addAndGet(-numCores) + scheduler.executorLost(executorId, SlaveLost(reason)) + } + } + } + + var driverActor: ActorRef = null + val taskIdsOnSlave = new HashMap[String, HashSet[String]] + + override def start() { + val properties = new ArrayBuffer[(String, String)] + val iterator = System.getProperties.entrySet.iterator + while (iterator.hasNext) { + val entry = iterator.next + val (key, value) = (entry.getKey.toString, entry.getValue.toString) + if (key.startsWith("spark.") && !key.equals("spark.hostPort")) { + properties += ((key, value)) + } + } + driverActor = actorSystem.actorOf( + Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) + } + + private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + + override def stop() { + try { + if (driverActor != null) { + val future = driverActor.ask(StopDriver)(timeout) + Await.result(future, timeout) + } + } catch { + case e: Exception => + throw new SparkException("Error stopping standalone scheduler's driver actor", e) + } + } + + override def reviveOffers() { + driverActor ! ReviveOffers + } + ++ override def killTask(taskId: Long, executorId: String) { ++ driverActor ! KillTask(taskId, executorId) ++ } ++ + override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism")) + .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2)) + + // Called by subclasses when notified of a lost worker + def removeExecutor(executorId: String, reason: String) { + try { + val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) + Await.result(future, timeout) + } catch { + case e: Exception => + throw new SparkException("Error notifying standalone scheduler's driver actor", e) + } + } +} + +private[spark] object CoarseGrainedSchedulerBackend { + val ACTOR_NAME = "CoarseGrainedScheduler" +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/acc7638f/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 226ac59,cb88159..cefa970 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@@ -26,9 -26,9 +26,9 @@@ import org.apache.spark.util.Util private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, - master: String, + masters: Array[String], appName: String) - extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with ClientListener with Logging {