[
https://issues.apache.org/jira/browse/SPARK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kay Ousterhout resolved SPARK-8103.
-----------------------------------
Resolution: Fixed
Fix Version/s: 1.5.0
> DAGScheduler should not launch multiple concurrent attempts for one stage on
> fetch failures
> -------------------------------------------------------------------------------------------
>
> Key: SPARK-8103
> URL: https://issues.apache.org/jira/browse/SPARK-8103
> Project: Spark
> Issue Type: Bug
> Components: Scheduler, Spark Core
> Affects Versions: 1.4.0
> Reporter: Imran Rashid
> Assignee: Imran Rashid
> Fix For: 1.5.0
>
>
> When there is a fetch failure, {{DAGScheduler}} is supposed to fail the
> stage, retry the necessary portions of the preceding shuffle stage which
> generated the shuffle data, and eventually rerun the stage.
> We generally expect to get multiple fetch failures together, but only want to
> re-start the stage once. The code already makes an attempt to address this
> https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1108
> .
> {code}
> // It is likely that we receive multiple FetchFailed for a single
> stage (because we have
> // multiple tasks running concurrently on different executors). In
> that case, it is possible
> // the fetch failure has already been handled by the scheduler.
> if (runningStages.contains(failedStage)) {
> {code}
> However, this logic is flawed because the stage may have been **resubmitted**
> by the time we get these fetch failures. In that case,
> {{runningStages.contains(failedStage)}} will be true, but we've already
> handled these failures.
> This results in multiple concurrent non-zombie attempts for one stage. In
> addition to being very confusing, and a waste of resources, this also can
> lead to later stages being submitted before the previous stage has registered
> its map output. This happens because
> (a) when one attempt finishes all its tasks, it may not register its map
> output because the stage still has pending tasks, from other attempts
> https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1046
> {code}
> if (runningStages.contains(shuffleStage) &&
> shuffleStage.pendingTasks.isEmpty) {
> {code}
> and (b) {{submitStage}} thinks the following stage is ready to go, because
> {{getMissingParentStages}} thinks the stage is complete as long it has all of
> its map outputs:
> https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L397
> {code}
> if (!mapStage.isAvailable) {
> missing += mapStage
> }
> {code}
> So the following stage is submitted repeatedly, but it is doomed to fail
> because its shuffle output has never been registered with the map output
> tracker. Here's an example failure in this case:
> {noformat}
> WARN TaskSetManager: Lost task 5.0 in stage 3.2 (TID 294, 192.168.1.104):
> FetchFailed(null, shuffleId=0, mapId=-1, reduceId=5, message=
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing output
> locations for shuffle ...
> {noformat}
> Note that this is a subset of the problems originally described in
> SPARK-7308, limited to just the issues effecting the DAGScheduler
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]