Imran Rashid created SPARK-8103:
-----------------------------------
Summary: DAGScheduler should not launch multiple concurrent
attempts for one stage on fetch failures
Key: SPARK-8103
URL: https://issues.apache.org/jira/browse/SPARK-8103
Project: Spark
Issue Type: Bug
Components: Scheduler, Spark Core
Affects Versions: 1.4.0
Reporter: Imran Rashid
Assignee: Imran Rashid
When there is a fetch failure, {{DAGScheduler}} is supposed to fail the stage,
retry the necessary portions of the preceding shuffle stage which generated the
shuffle data, and eventually rerun the stage.
We generally expect to get multiple fetch failures together, but only want to
re-start the stage once. The code already makes an attempt to address this
https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1108
.
{code}
// It is likely that we receive multiple FetchFailed for a single stage
(because we have
// multiple tasks running concurrently on different executors). In that
case, it is possible
// the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
{code}
However, this logic is flawed because the stage may have been **resubmitted**
by the time we get these fetch failures. In that case,
{{runningStages.contains(failedStage)}} will be true, but we've already handled
these failures.
This results in multiple concurrent non-zombie attempts for one stage. In
addition to being very confusing, and a waste of resources, this also can lead
to later stages being submitted before the previous stage has registered its
map output. This happens because
(a) when one attempt finishes all its tasks, it may not register its map output
because the stage still has pending tasks, from other attempts
https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1046
{code}
if (runningStages.contains(shuffleStage) &&
shuffleStage.pendingTasks.isEmpty) {
{code}
and (b) {{submitStage}} thinks the following stage is ready to go, because
{{getMissingParentStages}} thinks the stage is complete as long it has all of
its map outputs:
https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L397
{code}
if (!mapStage.isAvailable) {
missing += mapStage
}
{code}
So the following stage is submitted repeatedly, but it is doomed to fail
because its shuffle output has never been registered with the map output
tracker. Here's an example failure in this case:
{noformat}
WARN TaskSetManager: Lost task 5.0 in stage 3.2 (TID 294, 192.168.1.104):
FetchFailed(null, shuffleId=0, mapId=-1, reduceId=5, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing output locations
for shuffle ...
{noformat}
Note that this is a subset of the problems originally described in SPARK-7308,
limited to just the issues effecting the DAGScheduler
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]