Imran Rashid created SPARK-8103:
-----------------------------------

             Summary: DAGScheduler should not launch multiple concurrent 
attempts for one stage on fetch failures
                 Key: SPARK-8103
                 URL: https://issues.apache.org/jira/browse/SPARK-8103
             Project: Spark
          Issue Type: Bug
          Components: Scheduler, Spark Core
    Affects Versions: 1.4.0
            Reporter: Imran Rashid
            Assignee: Imran Rashid


When there is a fetch failure, {{DAGScheduler}} is supposed to fail the stage, 
retry the necessary portions of the preceding shuffle stage which generated the 
shuffle data, and eventually rerun the stage.  

We generally expect to get multiple fetch failures together, but only want to 
re-start the stage once.  The code already makes an attempt to address this 
https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1108
 .  

{code}
       // It is likely that we receive multiple FetchFailed for a single stage 
(because we have
        // multiple tasks running concurrently on different executors). In that 
case, it is possible
        // the fetch failure has already been handled by the scheduler.
        if (runningStages.contains(failedStage)) {
{code}

However, this logic is flawed because the stage may have been **resubmitted** 
by the time we get these fetch failures.  In that case, 
{{runningStages.contains(failedStage)}} will be true, but we've already handled 
these failures.

This results in multiple concurrent non-zombie attempts for one stage.  In 
addition to being very confusing, and a waste of resources, this also can lead 
to later stages being submitted before the previous stage has registered its 
map output.  This happens because

(a) when one attempt finishes all its tasks, it may not register its map output 
because the stage still has pending tasks, from other attempts 
https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1046

{code}
            if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingTasks.isEmpty) {
{code}

and (b) {{submitStage}} thinks the following stage is ready to go, because 
{{getMissingParentStages}} thinks the stage is complete as long it has all of 
its map outputs: 
https://github.com/apache/spark/blob/10ba1880878d0babcdc5c9b688df5458ea131531/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L397

{code}
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
{code}


So the following stage is submitted repeatedly, but it is doomed to fail 
because its shuffle output has never been registered with the map output 
tracker.  Here's an example failure in this case:
{noformat}
WARN TaskSetManager: Lost task 5.0 in stage 3.2 (TID 294, 192.168.1.104): 
FetchFailed(null, shuffleId=0, mapId=-1, reduceId=5, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing output locations 
for shuffle ...
{noformat}


Note that this is a subset of the problems originally described in SPARK-7308, 
limited to just the issues effecting the DAGScheduler



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to