Updated Branches:
  refs/heads/branch-0.8 96670e716 -> 0f6278641

Merge pull request #143 from rxin/scheduler-hang

Ignore a task update status if the executor doesn't exist anymore.

Otherwise if the scheduler receives a task update message when the executor's 
been removed, the scheduler would hang.

It is pretty hard to add unit tests for these right now because it is hard to 
mock the cluster scheduler. We should do that once @kayousterhout finishes 
merging the local scheduler and the cluster scheduler.

(cherry picked from commit bf4e6131cceef4fe00fb5693117c0732f181dbd9)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0f627864
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0f627864
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0f627864

Branch: refs/heads/branch-0.8
Commit: 0f627864171c83d32b73394b361d1b4aa8dac95b
Parents: 96670e7
Author: Reynold Xin <[email protected]>
Authored: Tue Nov 5 23:14:09 2013 -0800
Committer: Reynold Xin <[email protected]>
Committed: Tue Nov 5 23:14:28 2013 -0800

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala       | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f627864/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 70f3f88..a45bee5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -87,8 +87,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
ClusterScheduler, actorSystem: Ac
       case StatusUpdate(executorId, taskId, state, data) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
-          freeCores(executorId) += 1
-          makeOffers(executorId)
+          if (executorActor.contains(executorId)) {
+            freeCores(executorId) += 1
+            makeOffers(executorId)
+          } else {
+            // Ignoring the update since we don't know about the executor.
+            val msg = "Ignored task status update (%d state %s) from unknown 
executor %s with ID %s"
+            logWarning(msg.format(taskId, state, sender, executorId))
+          }
         }
 
       case ReviveOffers =>
@@ -175,7 +181,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
ClusterScheduler, actorSystem: Ac
       Props(new DriverActor(properties)), name = 
CoarseGrainedSchedulerBackend.ACTOR_NAME)
   }
 
-  private val timeout = 
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, 
"seconds")
+  private val timeout = {
+    Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, 
"seconds")
+  }
 
   def stopExecutors() {
     try {

Reply via email to