Updated Branches:
  refs/heads/master 9f7b9bb1c -> bf4e6131c

Ignore a task update status if the executor doesn't exist anymore.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a02eed68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a02eed68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a02eed68

Branch: refs/heads/master
Commit: a02eed68110f99c08d8ff379108c96546bbc16b0
Parents: 9f7b9bb
Author: Reynold Xin <[email protected]>
Authored: Tue Nov 5 18:46:38 2013 -0800
Committer: Reynold Xin <[email protected]>
Committed: Tue Nov 5 18:46:38 2013 -0800

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala       | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a02eed68/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 70f3f88..a45bee5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -87,8 +87,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
ClusterScheduler, actorSystem: Ac
       case StatusUpdate(executorId, taskId, state, data) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
-          freeCores(executorId) += 1
-          makeOffers(executorId)
+          if (executorActor.contains(executorId)) {
+            freeCores(executorId) += 1
+            makeOffers(executorId)
+          } else {
+            // Ignoring the update since we don't know about the executor.
+            val msg = "Ignored task status update (%d state %s) from unknown 
executor %s with ID %s"
+            logWarning(msg.format(taskId, state, sender, executorId))
+          }
         }
 
       case ReviveOffers =>
@@ -175,7 +181,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
ClusterScheduler, actorSystem: Ac
       Props(new DriverActor(properties)), name = 
CoarseGrainedSchedulerBackend.ACTOR_NAME)
   }
 
-  private val timeout = 
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, 
"seconds")
+  private val timeout = {
+    Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, 
"seconds")
+  }
 
   def stopExecutors() {
     try {

Reply via email to