More logging changes (task killing for local cluster doesn't work yet).

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f19984da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f19984da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f19984da

Branch: refs/heads/master
Commit: f19984dafe2154e150076b3d2b335d08a4bb073d
Parents: 85a0dff
Author: Reynold Xin <reyno...@gmail.com>
Authored: Thu Sep 19 18:14:51 2013 -0700
Committer: Reynold Xin <reyno...@gmail.com>
Committed: Thu Sep 19 18:14:51 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |  4 +++-
 .../scheduler/cluster/ClusterScheduler.scala    | 18 +++++++--------
 .../spark/scheduler/local/LocalScheduler.scala  | 24 ++++++++++++++++----
 3 files changed, 30 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f19984da/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 7a2f8c5..b3ba65f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -160,6 +160,7 @@ private[spark] class Executor(
     @volatile private var task: Task[Any] = _
 
     def kill() {
+      logInfo("Executor is trying to kill task " + taskId)
       killed = true
       if (task != null) {
         task.kill()
@@ -188,7 +189,7 @@ private[spark] class Executor(
         // If this task has been killed before we deserialized it, let's quit 
now. Otherwise,
         // continue executing the task.
         if (killed) {
-          logInfo("Task " + taskId + " was killed before it had a chance to 
run.")
+          logInfo("Executor killed task " + taskId)
           execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled))
         }
 
@@ -203,6 +204,7 @@ private[spark] class Executor(
 
         // If the task has been killed, let's fail it.
         if (task.killed) {
+          logInfo("Executor killed task " + taskId)
           execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled))
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f19984da/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 3d7ddd7..93ff12a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -166,16 +166,14 @@ private[spark] class ClusterScheduler(val sc: 
SparkContext)
     backend.reviveOffers()
   }
 
-  override def killTasks(stageId: Int) {
-    synchronized {
-      schedulableBuilder.getTaskSetManagers(stageId).foreach { t =>
-        // Notify the executors to kill the tasks.
-        val ts = t.asInstanceOf[TaskSetManager].taskSet
-        val taskIds = taskSetTaskIds(ts.id)
-        taskIds.foreach { tid =>
-          val execId = taskIdToExecutorId(tid)
-          backend.killTask(tid, execId)
-        }
+  override def killTasks(stageId: Int): Unit = synchronized {
+    schedulableBuilder.getTaskSetManagers(stageId).foreach { t =>
+      // Notify the executors to kill the tasks.
+      val ts = t.asInstanceOf[TaskSetManager].taskSet
+      val taskIds = taskSetTaskIds(ts.id)
+      taskIds.foreach { tid =>
+        val execId = taskIdToExecutorId(tid)
+        backend.killTask(tid, execId)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f19984da/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 6b6d1eb..cec051b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -42,22 +42,29 @@ import org.apache.spark.util.Utils
  * testing fault recovery.
  */
 
-private[spark]
+private[local]
 case class LocalReviveOffers()
 
-private[spark]
+private[local]
 case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: 
ByteBuffer)
 
+private[local]
+case class KillTask(taskId: Long)
+
 private[spark]
 class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends 
Actor with Logging {
 
   def receive = {
     case LocalReviveOffers =>
       launchTask(localScheduler.resourceOffer(freeCores))
+
     case LocalStatusUpdate(taskId, state, serializeData) =>
       freeCores += 1
       localScheduler.statusUpdate(taskId, state, serializeData)
       launchTask(localScheduler.resourceOffer(freeCores))
+
+    case KillTask(taskId) =>
+      killTask(taskId)
   }
 
   def launchTask(tasks : Seq[TaskDescription]) {
@@ -70,6 +77,10 @@ class LocalActor(localScheduler: LocalScheduler, var 
freeCores: Int) extends Act
       })
     }
   }
+
+  def killTask(taskId: Long) {
+
+  }
 }
 
 private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val 
sc: SparkContext)
@@ -128,9 +139,12 @@ private[spark] class LocalScheduler(threads: Int, val 
maxFailures: Int, val sc:
     }
   }
 
-  override def killTasks(stageId: Int) = synchronized {
+  override def killTasks(stageId: Int): Unit = synchronized {
     schedulableBuilder.getTaskSetManagers(stageId).foreach { sched =>
-      sched.asInstanceOf[TaskSetManager].taskSet.kill()
+      val taskIds = 
taskSetTaskIds(sched.asInstanceOf[TaskSetManager].taskSet.id)
+      for (tid <- taskIds) {
+        localActor ! KillTask(tid)
+      }
     }
   }
 
@@ -183,7 +197,7 @@ private[spark] class LocalScheduler(threads: Int, val 
maxFailures: Int, val sc:
     var attemptedTask: Option[Task[_]] = None   
     val start = System.currentTimeMillis()
     var taskStart: Long = 0
-    def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => 
g.getCollectionTime).sum
+    def getTotalGCTime = 
ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
     val startGCTime = getTotalGCTime
 
     try {

Reply via email to