Repository: spark
Updated Branches:
  refs/heads/master 1169db44b -> 295db8259


[SPARK-17769][CORE][SCHEDULER] Some FetchFailure refactoring

## What changes were proposed in this pull request?

Readability rewrites.
Changed order of `failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)` 
and `disallowStageRetryForTest` evaluation.
Stage resubmission guard condition changed from `failedStages.isEmpty` to 
`!failedStages.contains(failedStage)`
Log all resubmission of stages
## How was this patch tested?

existing tests

Author: Mark Hamstra <[email protected]>

Closes #15335 from markhamstra/SPARK-17769.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/295db825
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/295db825
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/295db825

Branch: refs/heads/master
Commit: 295db8259b307cc0e7d9de44f5638c1aa7ef6047
Parents: 1169db4
Author: Mark Hamstra <[email protected]>
Authored: Fri Dec 16 12:46:32 2016 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Fri Dec 16 12:46:32 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 55 +++++++++++++-------
 1 file changed, 37 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/295db825/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9378f15..0a1c500 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1256,27 +1256,46 @@ class DAGScheduler(
               s"longer running")
           }
 
-          if (disallowStageRetryForTest) {
-            abortStage(failedStage, "Fetch failure will not retry stage due to 
testing config",
-              None)
-          } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-            abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
-              s"has failed the maximum allowable number of " +
-              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-              s"Most recent failure reason: ${failureMessage}", None)
-          } else {
-            if (failedStages.isEmpty) {
-              // Don't schedule an event to resubmit failed stages if failed 
isn't empty, because
-              // in that case the event will already have been scheduled.
-              // TODO: Cancel running tasks in the stage
-              logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-                s"$failedStage (${failedStage.name}) due to fetch failure")
-              messageScheduler.schedule(new Runnable {
-                override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+          val shouldAbortStage =
+            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+            disallowStageRetryForTest
+
+          if (shouldAbortStage) {
+            val abortMessage = if (disallowStageRetryForTest) {
+              "Fetch failure will not retry stage due to testing config"
+            } else {
+              s"""$failedStage (${failedStage.name})
+                 |has failed the maximum allowable number of
+                 |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+                 |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
             }
+            abortStage(failedStage, abortMessage, None)
+          } else { // update failedStages and make sure a ResubmitFailedStages 
event is enqueued
+            // TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+            val noResubmitEnqueued = !failedStages.contains(failedStage)
             failedStages += failedStage
             failedStages += mapStage
+            if (noResubmitEnqueued) {
+              // We expect one executor failure to trigger many FetchFailures 
in rapid succession,
+              // but all of those task failures can typically be handled by a 
single resubmission of
+              // the failed stage.  We avoid flooding the scheduler's event 
queue with resubmit
+              // messages by checking whether a resubmit is already in the 
event queue for the
+              // failed stage.  If there is already a resubmit enqueued for a 
different failed
+              // stage, that event would also be sufficient to handle the 
current failed stage, but
+              // producing a resubmit for each failed stage makes debugging 
and logging a little
+              // simpler while not producing an overwhelming number of 
scheduler events.
+              logInfo(
+                s"Resubmitting $mapStage (${mapStage.name}) and " +
+                s"$failedStage (${failedStage.name}) due to fetch failure"
+              )
+              messageScheduler.schedule(
+                new Runnable {
+                  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
+                },
+                DAGScheduler.RESUBMIT_TIMEOUT,
+                TimeUnit.MILLISECONDS
+              )
+            }
           }
           // Mark the map whose fetch failed as broken in the map stage
           if (mapId != -1) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to