Updated Branches: refs/heads/master dfd40e9f6 -> 96e0fb463
Fix bug where scheduler could hang after task failure. When a task fails, we need to call reviveOffers() so that the task can be rescheduled on a different machine. In the current code, the state in ClusterTaskSetManager indicating which tasks are pending may be updated after revive offers is called (there's a race condition here), so when revive offers is called, the task set manager does not yet realize that there are failed tasks that need to be relaunched. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b4546ba9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b4546ba9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b4546ba9 Branch: refs/heads/master Commit: b4546ba9e694529c359b7ca5c26829ead2c07f1a Parents: 1a4cfbe Author: Kay Ousterhout <[email protected]> Authored: Thu Nov 14 13:33:11 2013 -0800 Committer: Kay Ousterhout <[email protected]> Committed: Thu Nov 14 13:55:03 2013 -0800 ---------------------------------------------------------------------- .../spark/scheduler/cluster/ClusterScheduler.scala | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b4546ba9/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 53a5896..c1e65a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None - var taskFailed = false synchronized { try { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { @@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToExecutorId.remove(tid) } - if (state == TaskState.FAILED) { - taskFailed = true - } activeTaskSets.get(taskSetId).foreach { taskSet => if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } - if (taskFailed) { - // Also revive offers if a task had failed for some reason other than host lost - backend.reviveOffers() - } } def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { @@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) taskState: TaskState, reason: Option[TaskEndReason]) = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (taskState == TaskState.FINISHED) { - // The task finished successfully but the result was lost, so we should revive offers. + if (taskState != TaskState.KILLED) { + // Need to revive offers again now that the task set manager state has been updated to + // reflect failed tasks that need to be re-run. backend.reviveOffers() } }
