[
https://issues.apache.org/jira/browse/SPARK-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15386524#comment-15386524
]
Imran Rashid commented on SPARK-2666:
-------------------------------------
[~tgraves] [~lianhuiwang]. When there is a fetch failure, spark considers all
shuffle output on that executor to be gone. (The code is rather confusing --
first it just removes the one block with the fetch failed:
https://github.com/apache/spark/blob/391e6be0ae883f3ea0fab79463eb8b618af79afb/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1134
but just after that, it removes everything on the executor:
https://github.com/apache/spark/blob/391e6be0ae883f3ea0fab79463eb8b618af79afb/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1184)
When a stage is retried, it reruns all the tasks for the missing shuffle
outputs, at the time the stage is retried. Usually, this is just all of the
map output that was on the executor which had the fetch failed. But, its not
necessarily exactly the same, as even more shuffle outputs could be lost before
the stage retry kicks in.
* Suppose you had three stages in a row, 0 --> 1 --> 2, and you hit a shuffle
fetch failure while running stage 2, say on executor A. So you need to
regenerate the map output for stage 1 that was on executor A. But most likely
spark will discover that to regenerate that missing output, it needs some map
output from stage 0, which was on executor A. So first it will go re-run the
missing parts of stage 0, and then when it gets to stage 1, the dag scheduler
will look at what map outputs are beginning. So there is some extra time in
there to discover more missing shuffle outputs.
* Spark only marks the shuffle output as missing for the *executor* that
shuffle data couldn't be read from, not for the entire node. So if its a
hardware failure, you're likely to hit more failures even after the first fetch
failure comes in, since you probably can't read from any of the nodes on that
host.
Despite this, I don't think there is a very good reason to leave tasks running
after there is a fetch failure. If there is a hardware failure, then the rest
of the retry process is also likely to discover this and remove those executors
as well. (Kay and I had discussed this earlier in the thread and we seemed to
agree, though I dunno if we had thought through all the details at that time.)
If anything, I wonder if when there is a fetch failure, we should mark all data
as missing on the entire node, not just the executor, but I don't think that is
necessary.
> Always try to cancel running tasks when a stage is marked as zombie
> -------------------------------------------------------------------
>
> Key: SPARK-2666
> URL: https://issues.apache.org/jira/browse/SPARK-2666
> Project: Spark
> Issue Type: Bug
> Components: Scheduler, Spark Core
> Reporter: Lianhui Wang
>
> There are some situations in which the scheduler can mark a task set as a
> "zombie" before the task set has completed all of its tasks. For example:
> (a) When a task fails b/c of a {{FetchFailed}}
> (b) When a stage completes because two different attempts create all the
> ShuffleMapOutput, though no attempt has completed all its tasks (at least,
> this *should* result in the task set being marked as zombie, see SPARK-10370)
> (there may be others, I'm not sure if this list is exhaustive.)
> Marking a taskset as zombie prevents any *additional* tasks from getting
> scheduled, however it does not cancel all currently running tasks. We should
> cancel all running to avoid wasting resources (and also to make the behavior
> a little more clear to the end user). Rather than canceling tasks in each
> case piecemeal, we should refactor the scheduler so that these two actions
> are always taken together -- canceling tasks should go hand-in-hand with
> marking the taskset as zombie.
> Some implementation notes:
> * We should change {{taskSetManager.isZombie}} to be private and put it
> behind a method like {{markZombie}} or something.
> * marking a stage as zombie before the all tasks have completed does *not*
> necessarily mean the stage attempt has failed. In case (a), the stage
> attempt has failed, but in stage (b) we are not canceling b/c of a failure,
> rather just b/c no more tasks are needed.
> * {{taskScheduler.cancelTasks}} always marks the task set as zombie.
> However, it also has some side-effects like logging that the stage has failed
> and creating a {{TaskSetFailed}} event, which we don't want eg. in case (b)
> when nothing has failed. So it may need some additional refactoring to go
> along w/ {{markZombie}}.
> * {{SchedulerBackend}}'s are free to not implement {{killTask}}, so we need
> to be sure to catch the {{UnsupportedOperationException}} s
> * Testing this *might* benefit from SPARK-10372
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]