[ 
https://issues.apache.org/jira/browse/SPARK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14635663#comment-14635663
 ] 

Apache Spark commented on SPARK-8103:
-------------------------------------

User 'markhamstra' has created a pull request for this issue:
https://github.com/apache/spark/pull/7572

> DAGScheduler should not launch multiple concurrent attempts for one stage on 
> fetch failures
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8103
>                 URL: https://issues.apache.org/jira/browse/SPARK-8103
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler, Spark Core
>    Affects Versions: 1.4.0
>            Reporter: Imran Rashid
>            Assignee: Imran Rashid
>             Fix For: 1.5.0
>
>
> When there is a fetch failure, {{DAGScheduler}} is supposed to fail the 
> stage, retry the necessary portions of the preceding shuffle stage which 
> generated the shuffle data, and eventually rerun the stage.  
> We generally expect to get multiple fetch failures together, but only want to 
> re-start the stage once.  The code already makes an attempt to address this 
> https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1108
>  .  
> {code}
>        // It is likely that we receive multiple FetchFailed for a single 
> stage (because we have
>         // multiple tasks running concurrently on different executors). In 
> that case, it is possible
>         // the fetch failure has already been handled by the scheduler.
>         if (runningStages.contains(failedStage)) {
> {code}
> However, this logic is flawed because the stage may have been **resubmitted** 
> by the time we get these fetch failures.  In that case, 
> {{runningStages.contains(failedStage)}} will be true, but we've already 
> handled these failures.
> This results in multiple concurrent non-zombie attempts for one stage.  In 
> addition to being very confusing, and a waste of resources, this also can 
> lead to later stages being submitted before the previous stage has registered 
> its map output.  This happens because
> (a) when one attempt finishes all its tasks, it may not register its map 
> output because the stage still has pending tasks, from other attempts 
> https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1046
> {code}
>             if (runningStages.contains(shuffleStage) && 
> shuffleStage.pendingTasks.isEmpty) {
> {code}
> and (b) {{submitStage}} thinks the following stage is ready to go, because 
> {{getMissingParentStages}} thinks the stage is complete as long it has all of 
> its map outputs: 
> https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L397
> {code}
>                 if (!mapStage.isAvailable) {
>                   missing += mapStage
>                 }
> {code}
> So the following stage is submitted repeatedly, but it is doomed to fail 
> because its shuffle output has never been registered with the map output 
> tracker.  Here's an example failure in this case:
> {noformat}
> WARN TaskSetManager: Lost task 5.0 in stage 3.2 (TID 294, 192.168.1.104): 
> FetchFailed(null, shuffleId=0, mapId=-1, reduceId=5, message=
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing output 
> locations for shuffle ...
> {noformat}
> Note that this is a subset of the problems originally described in 
> SPARK-7308, limited to just the issues effecting the DAGScheduler



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to