Repository: spark
Updated Branches:
  refs/heads/master 6ecedf39b -> 695dbc816


[SPARK-14485][CORE] ignore task finished for executor lost and removed by driver

Now, when executor is removed by driver with heartbeats timeout, driver will 
re-queue the task on this executor and send a kill command to cluster to kill 
this executor.
But, in a situation, the running task of this executor is finished and return 
result to driver before this executor killed by kill command sent by driver. At 
this situation, driver will accept the task finished event and ignore 
speculative task and re-queued task.
But, as we know, this executor has removed by driver, the result of this 
finished task can not save in driver because the BlockManagerId has also 
removed from BlockManagerMaster by driver. So, the result data of this stage is 
not complete, and then, it will cause fetch failure. For more details, [link to 
jira issues SPARK-14485](https://issues.apache.org/jira/browse/SPARK-14485)
This PR introduce a mechanism to ignore this kind of task finished.

N/A

Author: zhonghaihua <[email protected]>

Closes #12258 from 
zhonghaihua/ignoreTaskFinishForExecutorLostAndRemovedByDriver.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/695dbc81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/695dbc81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/695dbc81

Branch: refs/heads/master
Commit: 695dbc816a6d70289abeb145cb62ff4e62b3f49b
Parents: 6ecedf3
Author: zhonghaihua <[email protected]>
Authored: Tue Jun 7 16:30:58 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Tue Jun 7 16:32:27 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSchedulerImpl.scala    | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/695dbc81/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index c3adc28..8e1d957 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -352,9 +352,11 @@ private[spark] class TaskSchedulerImpl(
         }
         taskIdToTaskSetManager.get(tid) match {
           case Some(taskSet) =>
+            var executorId: String = null
             if (TaskState.isFinished(state)) {
               taskIdToTaskSetManager.remove(tid)
               taskIdToExecutorId.remove(tid).foreach { execId =>
+                executorId = execId
                 if (executorIdToTaskCount.contains(execId)) {
                   executorIdToTaskCount(execId) -= 1
                 }
@@ -362,7 +364,17 @@ private[spark] class TaskSchedulerImpl(
             }
             if (state == TaskState.FINISHED) {
               taskSet.removeRunningTask(tid)
-              taskResultGetter.enqueueSuccessfulTask(taskSet, tid, 
serializedData)
+              // In some case, executor has already been removed by driver for 
heartbeats timeout,
+              // but at sometime, before executor killed by cluster, the task 
of running on this
+              // executor is finished and return task success state to driver. 
However, this kinds
+              // of task should be ignored, because the task on this executor 
is already re-queued
+              // by driver. For more details, can check in SPARK-14485.
+              if (executorId != null && 
!executorIdToTaskCount.contains(executorId)) {
+                logInfo(s"Ignoring update with state $state for TID $tid 
because its executor " +
+                  s"has already been removed by driver")
+              } else {
+                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, 
serializedData)
+              }
             } else if (Set(TaskState.FAILED, TaskState.KILLED, 
TaskState.LOST).contains(state)) {
               taskSet.removeRunningTask(tid)
               taskResultGetter.enqueueFailedTask(taskSet, tid, state, 
serializedData)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to