Repository: spark Updated Branches: refs/heads/branch-2.0 eb1d746c4 -> de56ea9bf
[SPARK-15518][CORE][FOLLOW-UP] Rename LocalSchedulerBackendEndpoint -> LocalSchedulerBackend ## What changes were proposed in this pull request? This patch is a follow-up to https://github.com/apache/spark/pull/13288 completing the renaming: - LocalScheduler -> LocalSchedulerBackend~~Endpoint~~ ## How was this patch tested? Updated test cases to reflect the name change. Author: Liwei Lin <[email protected]> Closes #13683 from lw-lin/rename-backend. (cherry picked from commit 9b234b55d1b5e4a7c80e482b3e297bfb8b583a56) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de56ea9b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de56ea9b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de56ea9b Branch: refs/heads/branch-2.0 Commit: de56ea9bfa7dc5ac12a838ee64f435d5b146c10d Parents: eb1d746 Author: Liwei Lin <[email protected]> Authored: Wed Jun 15 11:52:36 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Jun 15 11:52:41 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 8 +- .../spark/scheduler/TaskSchedulerImpl.scala | 4 +- .../scheduler/local/LocalSchedulerBackend.scala | 166 +++++++++++++++++++ .../local/LocalSchedulerBackendEndpoint.scala | 166 ------------------- .../SparkContextSchedulerCreationSuite.scala | 14 +- 5 files changed, 179 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/de56ea9b/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3c54987..d56946e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -58,7 +58,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend} -import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint +import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -2429,7 +2429,7 @@ object SparkContext extends Logging { master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, 1) + val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) @@ -2441,7 +2441,7 @@ object SparkContext extends Logging { throw new SparkException(s"Asked to run locally with $threadCount threads") } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) - val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount) + val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) @@ -2451,7 +2451,7 @@ object SparkContext extends Logging { // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) - val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount) + val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) http://git-wip-us.apache.org/repos/asf/spark/blob/de56ea9b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 01e85ca..7dd4f6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -33,13 +33,13 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.Logging import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality -import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint +import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. - * It can also work with a local setup by using a [[LocalSchedulerBackendEndpoint]] and setting + * It can also work with a local setup by using a [[LocalSchedulerBackend]] and setting * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking * up to launch speculative tasks, etc. * http://git-wip-us.apache.org/repos/asf/spark/blob/de56ea9b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala new file mode 100644 index 0000000..e386052 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.local + +import java.io.File +import java.net.URL +import java.nio.ByteBuffer + +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState} +import org.apache.spark.TaskState.TaskState +import org.apache.spark.executor.{Executor, ExecutorBackend} +import org.apache.spark.internal.Logging +import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo + +private case class ReviveOffers() + +private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) + +private case class KillTask(taskId: Long, interruptThread: Boolean) + +private case class StopExecutor() + +/** + * Calls to [[LocalSchedulerBackend]] are all serialized through LocalEndpoint. Using an + * RpcEndpoint makes the calls on [[LocalSchedulerBackend]] asynchronous, which is necessary + * to prevent deadlock between [[LocalSchedulerBackend]] and the [[TaskSchedulerImpl]]. + */ +private[spark] class LocalEndpoint( + override val rpcEnv: RpcEnv, + userClassPath: Seq[URL], + scheduler: TaskSchedulerImpl, + executorBackend: LocalSchedulerBackend, + private val totalCores: Int) + extends ThreadSafeRpcEndpoint with Logging { + + private var freeCores = totalCores + + val localExecutorId = SparkContext.DRIVER_IDENTIFIER + val localExecutorHostname = "localhost" + + private val executor = new Executor( + localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) + + override def receive: PartialFunction[Any, Unit] = { + case ReviveOffers => + reviveOffers() + + case StatusUpdate(taskId, state, serializedData) => + scheduler.statusUpdate(taskId, state, serializedData) + if (TaskState.isFinished(state)) { + freeCores += scheduler.CPUS_PER_TASK + reviveOffers() + } + + case KillTask(taskId, interruptThread) => + executor.killTask(taskId, interruptThread) + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case StopExecutor => + executor.stop() + context.reply(true) + } + + def reviveOffers() { + val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) + for (task <- scheduler.resourceOffers(offers).flatten) { + freeCores -= scheduler.CPUS_PER_TASK + executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, + task.name, task.serializedTask) + } + } +} + +/** + * Used when running a local version of Spark where the executor, backend, and master all run in + * the same JVM. It sits behind a [[TaskSchedulerImpl]] and handles launching tasks on a single + * Executor (created by the [[LocalSchedulerBackend]]) running locally. + */ +private[spark] class LocalSchedulerBackend( + conf: SparkConf, + scheduler: TaskSchedulerImpl, + val totalCores: Int) + extends SchedulerBackend with ExecutorBackend with Logging { + + private val appId = "local-" + System.currentTimeMillis + private var localEndpoint: RpcEndpointRef = null + private val userClassPath = getUserClasspath(conf) + private val listenerBus = scheduler.sc.listenerBus + private val launcherBackend = new LauncherBackend() { + override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) + } + + /** + * Returns a list of URLs representing the user classpath. + * + * @param conf Spark configuration. + */ + def getUserClasspath(conf: SparkConf): Seq[URL] = { + val userClassPathStr = conf.getOption("spark.executor.extraClassPath") + userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL) + } + + launcherBackend.connect() + + override def start() { + val rpcEnv = SparkEnv.get.rpcEnv + val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) + localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint) + listenerBus.post(SparkListenerExecutorAdded( + System.currentTimeMillis, + executorEndpoint.localExecutorId, + new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) + launcherBackend.setAppId(appId) + launcherBackend.setState(SparkAppHandle.State.RUNNING) + } + + override def stop() { + stop(SparkAppHandle.State.FINISHED) + } + + override def reviveOffers() { + localEndpoint.send(ReviveOffers) + } + + override def defaultParallelism(): Int = + scheduler.conf.getInt("spark.default.parallelism", totalCores) + + override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { + localEndpoint.send(KillTask(taskId, interruptThread)) + } + + override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { + localEndpoint.send(StatusUpdate(taskId, state, serializedData)) + } + + override def applicationId(): String = appId + + private def stop(finalState: SparkAppHandle.State): Unit = { + localEndpoint.ask(StopExecutor) + try { + launcherBackend.setState(finalState) + } finally { + launcherBackend.close() + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/de56ea9b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala deleted file mode 100644 index ee06588..0000000 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackendEndpoint.scala +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.local - -import java.io.File -import java.net.URL -import java.nio.ByteBuffer - -import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState} -import org.apache.spark.TaskState.TaskState -import org.apache.spark.executor.{Executor, ExecutorBackend} -import org.apache.spark.internal.Logging -import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} -import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.ExecutorInfo - -private case class ReviveOffers() - -private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) - -private case class KillTask(taskId: Long, interruptThread: Boolean) - -private case class StopExecutor() - -/** - * Calls to [[LocalSchedulerBackendEndpoint]] are all serialized through LocalEndpoint. Using an - * RpcEndpoint makes the calls on [[LocalSchedulerBackendEndpoint]] asynchronous, which is necessary - * to prevent deadlock between [[LocalSchedulerBackendEndpoint]] and the [[TaskSchedulerImpl]]. - */ -private[spark] class LocalEndpoint( - override val rpcEnv: RpcEnv, - userClassPath: Seq[URL], - scheduler: TaskSchedulerImpl, - executorBackend: LocalSchedulerBackendEndpoint, - private val totalCores: Int) - extends ThreadSafeRpcEndpoint with Logging { - - private var freeCores = totalCores - - val localExecutorId = SparkContext.DRIVER_IDENTIFIER - val localExecutorHostname = "localhost" - - private val executor = new Executor( - localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) - - override def receive: PartialFunction[Any, Unit] = { - case ReviveOffers => - reviveOffers() - - case StatusUpdate(taskId, state, serializedData) => - scheduler.statusUpdate(taskId, state, serializedData) - if (TaskState.isFinished(state)) { - freeCores += scheduler.CPUS_PER_TASK - reviveOffers() - } - - case KillTask(taskId, interruptThread) => - executor.killTask(taskId, interruptThread) - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case StopExecutor => - executor.stop() - context.reply(true) - } - - def reviveOffers() { - val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) - for (task <- scheduler.resourceOffers(offers).flatten) { - freeCores -= scheduler.CPUS_PER_TASK - executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, - task.name, task.serializedTask) - } - } -} - -/** - * Used when running a local version of Spark where the executor, backend, and master all run in - * the same JVM. It sits behind a [[TaskSchedulerImpl]] and handles launching tasks on a single - * Executor (created by the [[LocalSchedulerBackendEndpoint]]) running locally. - */ -private[spark] class LocalSchedulerBackendEndpoint( - conf: SparkConf, - scheduler: TaskSchedulerImpl, - val totalCores: Int) - extends SchedulerBackend with ExecutorBackend with Logging { - - private val appId = "local-" + System.currentTimeMillis - private var localEndpoint: RpcEndpointRef = null - private val userClassPath = getUserClasspath(conf) - private val listenerBus = scheduler.sc.listenerBus - private val launcherBackend = new LauncherBackend() { - override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) - } - - /** - * Returns a list of URLs representing the user classpath. - * - * @param conf Spark configuration. - */ - def getUserClasspath(conf: SparkConf): Seq[URL] = { - val userClassPathStr = conf.getOption("spark.executor.extraClassPath") - userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL) - } - - launcherBackend.connect() - - override def start() { - val rpcEnv = SparkEnv.get.rpcEnv - val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) - localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint) - listenerBus.post(SparkListenerExecutorAdded( - System.currentTimeMillis, - executorEndpoint.localExecutorId, - new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) - launcherBackend.setAppId(appId) - launcherBackend.setState(SparkAppHandle.State.RUNNING) - } - - override def stop() { - stop(SparkAppHandle.State.FINISHED) - } - - override def reviveOffers() { - localEndpoint.send(ReviveOffers) - } - - override def defaultParallelism(): Int = - scheduler.conf.getInt("spark.default.parallelism", totalCores) - - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { - localEndpoint.send(KillTask(taskId, interruptThread)) - } - - override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { - localEndpoint.send(StatusUpdate(taskId, state, serializedData)) - } - - override def applicationId(): String = appId - - private def stop(finalState: SparkAppHandle.State): Unit = { - localEndpoint.ask(StopExecutor) - try { - launcherBackend.setState(finalState) - } finally { - launcherBackend.close() - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/de56ea9b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 6e56554..7d75a93 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend} -import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint +import org.apache.spark.scheduler.local.LocalSchedulerBackend class SparkContextSchedulerCreationSuite @@ -58,7 +58,7 @@ class SparkContextSchedulerCreationSuite test("local") { val sched = createTaskScheduler("local") sched.backend match { - case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 1) + case s: LocalSchedulerBackend => assert(s.totalCores === 1) case _ => fail() } } @@ -66,7 +66,7 @@ class SparkContextSchedulerCreationSuite test("local-*") { val sched = createTaskScheduler("local[*]") sched.backend match { - case s: LocalSchedulerBackendEndpoint => + case s: LocalSchedulerBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors()) case _ => fail() } @@ -76,7 +76,7 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local[5]") assert(sched.maxTaskFailures === 1) sched.backend match { - case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 5) + case s: LocalSchedulerBackend => assert(s.totalCores === 5) case _ => fail() } } @@ -85,7 +85,7 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local[* ,2]") assert(sched.maxTaskFailures === 2) sched.backend match { - case s: LocalSchedulerBackendEndpoint => + case s: LocalSchedulerBackend => assert(s.totalCores === Runtime.getRuntime.availableProcessors()) case _ => fail() } @@ -95,7 +95,7 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local[4, 2]") assert(sched.maxTaskFailures === 2) sched.backend match { - case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 4) + case s: LocalSchedulerBackend => assert(s.totalCores === 4) case _ => fail() } } @@ -119,7 +119,7 @@ class SparkContextSchedulerCreationSuite val sched = createTaskScheduler("local", "client", conf) sched.backend match { - case s: LocalSchedulerBackendEndpoint => assert(s.defaultParallelism() === 16) + case s: LocalSchedulerBackend => assert(s.defaultParallelism() === 16) case _ => fail() } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
