Support job cancellation in multi-pool scheduler.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ddf64f01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ddf64f01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ddf64f01 Branch: refs/heads/master Commit: ddf64f019fa98010e0a59e6e1559f4e3f8b25b5f Parents: 3bd2890 Author: Reynold Xin <r...@apache.org> Authored: Thu Oct 10 13:20:27 2013 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Thu Oct 10 13:20:27 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/Pool.scala | 2 +- .../spark/scheduler/SchedulableBuilder.scala | 19 +++++++++++++++++-- .../scheduler/cluster/ClusterScheduler.scala | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ddf64f01/core/src/main/scala/org/apache/spark/scheduler/Pool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 9eb8d48..8b33319 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -43,7 +43,7 @@ private[spark] class Pool( var runningTasks = 0 var priority = 0 - var stageId = 0 + var stageId = -1 var name = poolName var parent: Pool = null http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ddf64f01/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index c4f555b..a4e8653 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -36,8 +36,23 @@ private[spark] trait SchedulableBuilder { def addTaskSetManager(manager: Schedulable, properties: Properties) - def getTaskSetManagers(stageId: Int): Iterable[Schedulable] = { - rootPool.schedulableQueue.filter(_.stageId == stageId) + /** + * Find the TaskSetManager for the given stage. In fair scheduler, this function examines + * all the pools to find the TaskSetManager. + */ + def getTaskSetManagers(stageId: Int): Option[TaskSetManager] = { + def getTsm(pool: Pool): Option[TaskSetManager] = { + pool.schedulableQueue.foreach { + case tsm: TaskSetManager => + if (tsm.stageId == stageId) { + return Some(tsm) + } + case pool: Pool => + getTsm(pool) + } + return None + } + getTsm(rootPool) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ddf64f01/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 031d0b1..250dec5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -172,7 +172,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) override def cancelTasks(stageId: Int): Unit = synchronized { logInfo("Cancelling stage " + stageId) - schedulableBuilder.getTaskSetManagers(stageId).foreach { case tsm: TaskSetManager => + schedulableBuilder.getTaskSetManagers(stageId).foreach { tsm => // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task.