More logging changes (task killing for local cluster doesn't work yet).
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f19984da Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f19984da Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f19984da Branch: refs/heads/master Commit: f19984dafe2154e150076b3d2b335d08a4bb073d Parents: 85a0dff Author: Reynold Xin <reyno...@gmail.com> Authored: Thu Sep 19 18:14:51 2013 -0700 Committer: Reynold Xin <reyno...@gmail.com> Committed: Thu Sep 19 18:14:51 2013 -0700 ---------------------------------------------------------------------- .../org/apache/spark/executor/Executor.scala | 4 +++- .../scheduler/cluster/ClusterScheduler.scala | 18 +++++++-------- .../spark/scheduler/local/LocalScheduler.scala | 24 ++++++++++++++++---- 3 files changed, 30 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f19984da/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 7a2f8c5..b3ba65f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -160,6 +160,7 @@ private[spark] class Executor( @volatile private var task: Task[Any] = _ def kill() { + logInfo("Executor is trying to kill task " + taskId) killed = true if (task != null) { task.kill() @@ -188,7 +189,7 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. if (killed) { - logInfo("Task " + taskId + " was killed before it had a chance to run.") + logInfo("Executor killed task " + taskId) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) } @@ -203,6 +204,7 @@ private[spark] class Executor( // If the task has been killed, let's fail it. if (task.killed) { + logInfo("Executor killed task " + taskId) execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f19984da/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 3d7ddd7..93ff12a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -166,16 +166,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext) backend.reviveOffers() } - override def killTasks(stageId: Int) { - synchronized { - schedulableBuilder.getTaskSetManagers(stageId).foreach { t => - // Notify the executors to kill the tasks. - val ts = t.asInstanceOf[TaskSetManager].taskSet - val taskIds = taskSetTaskIds(ts.id) - taskIds.foreach { tid => - val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId) - } + override def killTasks(stageId: Int): Unit = synchronized { + schedulableBuilder.getTaskSetManagers(stageId).foreach { t => + // Notify the executors to kill the tasks. + val ts = t.asInstanceOf[TaskSetManager].taskSet + val taskIds = taskSetTaskIds(ts.id) + taskIds.foreach { tid => + val execId = taskIdToExecutorId(tid) + backend.killTask(tid, execId) } } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f19984da/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala index 6b6d1eb..cec051b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala @@ -42,22 +42,29 @@ import org.apache.spark.util.Utils * testing fault recovery. */ -private[spark] +private[local] case class LocalReviveOffers() -private[spark] +private[local] case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) +private[local] +case class KillTask(taskId: Long) + private[spark] class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging { def receive = { case LocalReviveOffers => launchTask(localScheduler.resourceOffer(freeCores)) + case LocalStatusUpdate(taskId, state, serializeData) => freeCores += 1 localScheduler.statusUpdate(taskId, state, serializeData) launchTask(localScheduler.resourceOffer(freeCores)) + + case KillTask(taskId) => + killTask(taskId) } def launchTask(tasks : Seq[TaskDescription]) { @@ -70,6 +77,10 @@ class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Act }) } } + + def killTask(taskId: Long) { + + } } private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext) @@ -128,9 +139,12 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: } } - override def killTasks(stageId: Int) = synchronized { + override def killTasks(stageId: Int): Unit = synchronized { schedulableBuilder.getTaskSetManagers(stageId).foreach { sched => - sched.asInstanceOf[TaskSetManager].taskSet.kill() + val taskIds = taskSetTaskIds(sched.asInstanceOf[TaskSetManager].taskSet.id) + for (tid <- taskIds) { + localActor ! KillTask(tid) + } } } @@ -183,7 +197,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var attemptedTask: Option[Task[_]] = None val start = System.currentTimeMillis() var taskStart: Long = 0 - def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum + def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum val startGCTime = getTotalGCTime try {