[ 
https://issues.apache.org/jira/browse/SPARK-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15386524#comment-15386524
 ] 

Imran Rashid commented on SPARK-2666:
-------------------------------------

[~tgraves] [~lianhuiwang].  When there is a fetch failure, spark considers all 
shuffle output on that executor to be gone.  (The code is rather confusing -- 
first it just removes the one block with the fetch failed: 
https://github.com/apache/spark/blob/391e6be0ae883f3ea0fab79463eb8b618af79afb/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1134
  but just after that, it removes everything on the executor: 
https://github.com/apache/spark/blob/391e6be0ae883f3ea0fab79463eb8b618af79afb/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1184)


When a stage is retried, it reruns all the tasks for the missing shuffle 
outputs, at the time the stage is retried.  Usually, this is just all of the 
map output that was on the executor which had the fetch failed.  But, its not 
necessarily exactly the same, as even more shuffle outputs could be lost before 
the stage retry kicks in.

* Suppose you had three stages in a row, 0 --> 1 --> 2, and you hit a shuffle 
fetch failure while running stage 2, say on executor A.  So you need to 
regenerate the map output for stage 1 that was on executor A.  But most likely 
spark will discover that to regenerate that missing output, it needs some map 
output from stage 0, which was on executor A.  So first it will go re-run the 
missing parts of stage 0, and then when it gets to stage 1, the dag scheduler 
will look at what map outputs are beginning.  So there is some extra time in 
there to discover more missing shuffle outputs.

* Spark only marks the shuffle output as missing for the *executor* that 
shuffle data couldn't be read from, not for the entire node.  So if its a 
hardware failure, you're likely to hit more failures even after the first fetch 
failure comes in, since you probably can't read from any of the nodes on that 
host.

Despite this, I don't think there is a very good reason to leave tasks running 
after there is a fetch failure.  If there is a hardware failure, then the rest 
of the retry process is also likely to discover this and remove those executors 
as well.  (Kay and I had discussed this earlier in the thread and we seemed to 
agree, though I dunno if we had thought through all the details at that time.)  
If anything, I wonder if when there is a fetch failure, we should mark all data 
as missing on the entire node, not just the executor, but I don't think that is 
necessary.

> Always try to cancel running tasks when a stage is marked as zombie
> -------------------------------------------------------------------
>
>                 Key: SPARK-2666
>                 URL: https://issues.apache.org/jira/browse/SPARK-2666
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, Spark Core
>            Reporter: Lianhui Wang
>
> There are some situations in which the scheduler can mark a task set as a 
> "zombie" before the task set has completed all of its tasks.  For example:
> (a) When a task fails b/c of a {{FetchFailed}}
> (b) When a stage completes because two different attempts create all the 
> ShuffleMapOutput, though no attempt has completed all its tasks (at least, 
> this *should* result in the task set being marked as zombie, see SPARK-10370)
> (there may be others, I'm not sure if this list is exhaustive.)
> Marking a taskset as zombie prevents any *additional* tasks from getting 
> scheduled, however it does not cancel all currently running tasks.  We should 
> cancel all running to avoid wasting resources (and also to make the behavior 
> a little more clear to the end user).  Rather than canceling tasks in each 
> case piecemeal, we should refactor the scheduler so that these two actions 
> are always taken together -- canceling tasks should go hand-in-hand with 
> marking the taskset as zombie.
> Some implementation notes:
> * We should change {{taskSetManager.isZombie}} to be private and put it 
> behind a method like {{markZombie}} or something.
> * marking a stage as zombie before the all tasks have completed does *not* 
> necessarily mean the stage attempt has failed.  In case (a), the stage 
> attempt has failed, but in stage (b) we are not canceling b/c of a failure, 
> rather just b/c no more tasks are needed.
> * {{taskScheduler.cancelTasks}} always marks the task set as zombie.  
> However, it also has some side-effects like logging that the stage has failed 
> and creating a {{TaskSetFailed}} event, which we don't want eg. in case (b) 
> when nothing has failed.  So it may need some additional refactoring to go 
> along w/ {{markZombie}}.
> * {{SchedulerBackend}}'s are free to not implement {{killTask}}, so we need 
> to be sure to catch the {{UnsupportedOperationException}} s
> * Testing this *might* benefit from SPARK-10372



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to