Updated Branches:
  refs/heads/master dfd40e9f6 -> 96e0fb463

Fix bug where scheduler could hang after task failure.

When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b4546ba9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b4546ba9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b4546ba9

Branch: refs/heads/master
Commit: b4546ba9e694529c359b7ca5c26829ead2c07f1a
Parents: 1a4cfbe
Author: Kay Ousterhout <[email protected]>
Authored: Thu Nov 14 13:33:11 2013 -0800
Committer: Kay Ousterhout <[email protected]>
Committed: Thu Nov 14 13:55:03 2013 -0800

----------------------------------------------------------------------
 .../spark/scheduler/cluster/ClusterScheduler.scala     | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4546ba9/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 53a5896..c1e65a3 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
     var failedExecutor: Option[String] = None
-    var taskFailed = false
     synchronized {
       try {
         if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
               }
               taskIdToExecutorId.remove(tid)
             }
-            if (state == TaskState.FAILED) {
-              taskFailed = true
-            }
             activeTaskSets.get(taskSetId).foreach { taskSet =>
               if (state == TaskState.FINISHED) {
                 taskSet.removeRunningTask(tid)
@@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       dagScheduler.executorLost(failedExecutor.get)
       backend.reviveOffers()
     }
-    if (taskFailed) {
-      // Also revive offers if a task had failed for some reason other than 
host lost
-      backend.reviveOffers()
-    }
   }
 
   def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: 
Long) {
@@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     taskState: TaskState,
     reason: Option[TaskEndReason]) = synchronized {
     taskSetManager.handleFailedTask(tid, taskState, reason)
-    if (taskState == TaskState.FINISHED) {
-      // The task finished successfully but the result was lost, so we should 
revive offers.
+    if (taskState != TaskState.KILLED) {
+      // Need to revive offers again now that the task set manager state has 
been updated to
+      // reflect failed tasks that need to be re-run.
       backend.reviveOffers()
     }
   }

Reply via email to