Repository: spark Updated Branches: refs/heads/master 1169db44b -> 295db8259
[SPARK-17769][CORE][SCHEDULER] Some FetchFailure refactoring ## What changes were proposed in this pull request? Readability rewrites. Changed order of `failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)` and `disallowStageRetryForTest` evaluation. Stage resubmission guard condition changed from `failedStages.isEmpty` to `!failedStages.contains(failedStage)` Log all resubmission of stages ## How was this patch tested? existing tests Author: Mark Hamstra <[email protected]> Closes #15335 from markhamstra/SPARK-17769. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/295db825 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/295db825 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/295db825 Branch: refs/heads/master Commit: 295db8259b307cc0e7d9de44f5638c1aa7ef6047 Parents: 1169db4 Author: Mark Hamstra <[email protected]> Authored: Fri Dec 16 12:46:32 2016 -0800 Committer: Marcelo Vanzin <[email protected]> Committed: Fri Dec 16 12:46:32 2016 -0800 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 55 +++++++++++++------- 1 file changed, 37 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/295db825/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9378f15..0a1c500 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1256,27 +1256,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { - abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { - abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { - if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + - s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = + failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || + disallowStageRetryForTest + + if (shouldAbortStage) { + val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" + } else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } + abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued + // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 + val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage failedStages += mapStage + if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( + s"Resubmitting $mapStage (${mapStage.name}) and " + + s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( + new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, + DAGScheduler.RESUBMIT_TIMEOUT, + TimeUnit.MILLISECONDS + ) + } } // Mark the map whose fetch failed as broken in the map stage if (mapId != -1) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
