[ https://issues.apache.org/jira/browse/SPARK-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14635663#comment-14635663 ]
Apache Spark commented on SPARK-8103: ------------------------------------- User 'markhamstra' has created a pull request for this issue: https://github.com/apache/spark/pull/7572 > DAGScheduler should not launch multiple concurrent attempts for one stage on > fetch failures > ------------------------------------------------------------------------------------------- > > Key: SPARK-8103 > URL: https://issues.apache.org/jira/browse/SPARK-8103 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core > Affects Versions: 1.4.0 > Reporter: Imran Rashid > Assignee: Imran Rashid > Fix For: 1.5.0 > > > When there is a fetch failure, {{DAGScheduler}} is supposed to fail the > stage, retry the necessary portions of the preceding shuffle stage which > generated the shuffle data, and eventually rerun the stage. > We generally expect to get multiple fetch failures together, but only want to > re-start the stage once. The code already makes an attempt to address this > https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1108 > . > {code} > // It is likely that we receive multiple FetchFailed for a single > stage (because we have > // multiple tasks running concurrently on different executors). In > that case, it is possible > // the fetch failure has already been handled by the scheduler. > if (runningStages.contains(failedStage)) { > {code} > However, this logic is flawed because the stage may have been **resubmitted** > by the time we get these fetch failures. In that case, > {{runningStages.contains(failedStage)}} will be true, but we've already > handled these failures. > This results in multiple concurrent non-zombie attempts for one stage. In > addition to being very confusing, and a waste of resources, this also can > lead to later stages being submitted before the previous stage has registered > its map output. This happens because > (a) when one attempt finishes all its tasks, it may not register its map > output because the stage still has pending tasks, from other attempts > https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1046 > {code} > if (runningStages.contains(shuffleStage) && > shuffleStage.pendingTasks.isEmpty) { > {code} > and (b) {{submitStage}} thinks the following stage is ready to go, because > {{getMissingParentStages}} thinks the stage is complete as long it has all of > its map outputs: > https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L397 > {code} > if (!mapStage.isAvailable) { > missing += mapStage > } > {code} > So the following stage is submitted repeatedly, but it is doomed to fail > because its shuffle output has never been registered with the map output > tracker. Here's an example failure in this case: > {noformat} > WARN TaskSetManager: Lost task 5.0 in stage 3.2 (TID 294, 192.168.1.104): > FetchFailed(null, shuffleId=0, mapId=-1, reduceId=5, message= > org.apache.spark.shuffle.MetadataFetchFailedException: Missing output > locations for shuffle ... > {noformat} > Note that this is a subset of the problems originally described in > SPARK-7308, limited to just the issues effecting the DAGScheduler -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org