[
https://issues.apache.org/jira/browse/SPARK-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15387145#comment-15387145
]
Lianhui Wang edited comment on SPARK-2666 at 7/21/16 4:38 AM:
--------------------------------------------------------------
Thanks. I think what [~irashid] said is more about non-external shuffle. But in
our use cases we usually use Yarn-external shuffle service. for 0 --> 1 --> 2,
if it hitx a shuffle fetch failure while running stage 2, say on executor A. So
it needs to regenerate the map output for stage 1 that was on executor A. But
it don't rerun for stage 0 on executor A.
So i think we can firstly handle with FetchFailed on Yarn-external shuffle
service(maybe connection timeout, out of memory, etc). I think many users have
met FetchFailed on Yarn-external shuffle service.
as [~tgraves] said before, Now If the stages fails because FetchFailed, it
rerun 1) all the ones not succeeded yet in the failed stage (including the ones
that could still be running). So it cause many duplicate running tasks of
failed stage. Once there is a FetchFailed, it will rerun all the unsuccessful
tasks of the failed stage.
Until now, i think our first target is for Yarn-external shuffle service if the
stages fails because FetchFailed it should decrease the number of rerunning
tasks of the failed stage. As i pointed out before that the best way is like
Mapreduce we just resubmit the map stage of failed stage.
1. When FetchFailed has happened on task, the task don't be finished and
continue to fetch other results. It just report the ShuffleBlockId of
FetchFailed to DAGScheduler. other running tasks of this stage did like this
task.
2. DAGScheduler receive the ShuffleBlockId of FetchFailed and resubmit the task
for the ShuffleBlockId. Once the task has been finished, it will register the
map output to MapOutputTracker.
3. The task that has FetchFailed before get the map output of FetchFailed from
MapOutputTracker every hearbeat. Once step-2 is finished. The task can get the
map output of FetchFailed successfully and will fetch the results of
FetchFailed.
But there is a dead lock if the tasks of Step-2 can not be run because there is
no slots for it.Under this situation it should kill some running tasks for it.
In addition, i find that https://issues.apache.org/jira/browse/SPARK-14649 did
it for 2) it only run the failed ones and wait for the ones still running in
failed stage. The disadvantage of SPARK-14649 is that other running tasks of
the failed stage maybe need a long time to rerun when they spend time to fetch
other's results.
was (Author: lianhuiwang):
I think what [~irashid] said is more about non-external shuffle. But in our use
cases we usually use Yarn-external shuffle service. for 0 --> 1 --> 2, if it
hitx a shuffle fetch failure while running stage 2, say on executor A. So it
needs to regenerate the map output for stage 1 that was on executor A. But it
don't rerun for stage 0 on executor A.
So i think we can firstly handle with FetchFailed on Yarn-external shuffle
service(maybe connection timeout, out of memory, etc). I think many users have
met FetchFailed on Yarn-external shuffle service.
as [~tgraves] said before, Now If the stages fails because FetchFailed, it
rerun 1) all the ones not succeeded yet in the failed stage (including the ones
that could still be running). So it cause many duplicate running tasks of
failed stage. Once there is a FetchFailed, it will rerun all the unsuccessful
tasks of the failed stage.
Until now, i think our first target is for Yarn-external shuffle service if the
stages fails because FetchFailed it should decrease the number of rerunning
tasks of the failed stage. As i pointed out before that the best way is like
Mapreduce we just resubmit the map stage of failed stage.
1. When FetchFailed has happened on task, the task don't be finished and
continue to fetch other results. It just report the ShuffleBlockId of
FetchFailed to DAGScheduler. other running tasks of this stage did like this
task.
2. DAGScheduler receive the ShuffleBlockId of FetchFailed and resubmit the task
for the ShuffleBlockId. Once the task has been finished, it will register the
map output to MapOutputTracker.
3. The task that has FetchFailed before get the map output of FetchFailed from
MapOutputTracker every hearbeat. Once step-2 is finished. The task can get the
map output of FetchFailed successfully and will fetch the results of
FetchFailed.
But there is a dead lock if the tasks of Step-2 can not be run because there is
no slots for it.Under this situation it should kill some running tasks for it.
In addition, i find that https://issues.apache.org/jira/browse/SPARK-14649 did
it for 2) it only run the failed ones and wait for the ones still running in
failed stage. The disadvantage of SPARK-14649 is that other running tasks of
the failed stage maybe need a long time to rerun when they spend time to fetch
other's results.
> Always try to cancel running tasks when a stage is marked as zombie
> -------------------------------------------------------------------
>
> Key: SPARK-2666
> URL: https://issues.apache.org/jira/browse/SPARK-2666
> Project: Spark
> Issue Type: Bug
> Components: Scheduler, Spark Core
> Reporter: Lianhui Wang
>
> There are some situations in which the scheduler can mark a task set as a
> "zombie" before the task set has completed all of its tasks. For example:
> (a) When a task fails b/c of a {{FetchFailed}}
> (b) When a stage completes because two different attempts create all the
> ShuffleMapOutput, though no attempt has completed all its tasks (at least,
> this *should* result in the task set being marked as zombie, see SPARK-10370)
> (there may be others, I'm not sure if this list is exhaustive.)
> Marking a taskset as zombie prevents any *additional* tasks from getting
> scheduled, however it does not cancel all currently running tasks. We should
> cancel all running to avoid wasting resources (and also to make the behavior
> a little more clear to the end user). Rather than canceling tasks in each
> case piecemeal, we should refactor the scheduler so that these two actions
> are always taken together -- canceling tasks should go hand-in-hand with
> marking the taskset as zombie.
> Some implementation notes:
> * We should change {{taskSetManager.isZombie}} to be private and put it
> behind a method like {{markZombie}} or something.
> * marking a stage as zombie before the all tasks have completed does *not*
> necessarily mean the stage attempt has failed. In case (a), the stage
> attempt has failed, but in stage (b) we are not canceling b/c of a failure,
> rather just b/c no more tasks are needed.
> * {{taskScheduler.cancelTasks}} always marks the task set as zombie.
> However, it also has some side-effects like logging that the stage has failed
> and creating a {{TaskSetFailed}} event, which we don't want eg. in case (b)
> when nothing has failed. So it may need some additional refactoring to go
> along w/ {{markZombie}}.
> * {{SchedulerBackend}}'s are free to not implement {{killTask}}, so we need
> to be sure to catch the {{UnsupportedOperationException}} s
> * Testing this *might* benefit from SPARK-10372
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]