Repository: spark Updated Branches: refs/heads/master 6ecedf39b -> 695dbc816
[SPARK-14485][CORE] ignore task finished for executor lost and removed by driver Now, when executor is removed by driver with heartbeats timeout, driver will re-queue the task on this executor and send a kill command to cluster to kill this executor. But, in a situation, the running task of this executor is finished and return result to driver before this executor killed by kill command sent by driver. At this situation, driver will accept the task finished event and ignore speculative task and re-queued task. But, as we know, this executor has removed by driver, the result of this finished task can not save in driver because the BlockManagerId has also removed from BlockManagerMaster by driver. So, the result data of this stage is not complete, and then, it will cause fetch failure. For more details, [link to jira issues SPARK-14485](https://issues.apache.org/jira/browse/SPARK-14485) This PR introduce a mechanism to ignore this kind of task finished. N/A Author: zhonghaihua <[email protected]> Closes #12258 from zhonghaihua/ignoreTaskFinishForExecutorLostAndRemovedByDriver. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/695dbc81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/695dbc81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/695dbc81 Branch: refs/heads/master Commit: 695dbc816a6d70289abeb145cb62ff4e62b3f49b Parents: 6ecedf3 Author: zhonghaihua <[email protected]> Authored: Tue Jun 7 16:30:58 2016 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Tue Jun 7 16:32:27 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/695dbc81/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index c3adc28..8e1d957 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -352,9 +352,11 @@ private[spark] class TaskSchedulerImpl( } taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => + var executorId: String = null if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => + executorId = execId if (executorIdToTaskCount.contains(execId)) { executorIdToTaskCount(execId) -= 1 } @@ -362,7 +364,17 @@ private[spark] class TaskSchedulerImpl( } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + // In some case, executor has already been removed by driver for heartbeats timeout, + // but at sometime, before executor killed by cluster, the task of running on this + // executor is finished and return task success state to driver. However, this kinds + // of task should be ignored, because the task on this executor is already re-queued + // by driver. For more details, can check in SPARK-14485. + if (executorId != null && !executorIdToTaskCount.contains(executorId)) { + logInfo(s"Ignoring update with state $state for TID $tid because its executor " + + s"has already been removed by driver") + } else { + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
