[ https://issues.apache.org/jira/browse/SPARK-17610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15509712#comment-15509712 ]
Tao Wang commented on SPARK-17610: ---------------------------------- As reason mentioned in https://github.com/apache/spark/pull/15176, this is not a bug so close this. > The failed stage caused by FetchFailed may never be resubmitted > --------------------------------------------------------------- > > Key: SPARK-17610 > URL: https://issues.apache.org/jira/browse/SPARK-17610 > Project: Spark > Issue Type: Bug > Components: Scheduler > Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 2.0.0 > Reporter: Tao Wang > Priority: Critical > > We have a problem in our environment, in which the failed stage has not been > resubmitted ever. Because it is caused by FetchFailed exception, I took a > look at the corresponsive code segment and found some issues: > In DAGScheduler.handleTaskCompletion, it first check if the `failedStages` is > empty, and do two steps when the answer is true: > 1. send `ResubmitFailedStages` to evnetProcessLoop > 2. add failed stages into `failedStages` > in `eventProcessLoop`, it first take all elements in `failedStages` to > resubmit them, then clear the set. > If the event happens like below, there might be some problem: > assume t1 < t2 < t3 > at t1, failed stage 1 was handled, the ResubmitFailedStages was send to > eventProcessLoop > at t2, eventProcessLoop handle the ResubmitFailedStages and clear the empty > `failedStages` > at t3, failed stage 1 was added into `failedStages` > now failed stage 1 has not been resubmitted for now. > after anytime at t3, the `failedStages` will never be empty even if we have > new failed stages caused by FetchFailed coming in, because the `failedStages` > containing failed stage 1 is not empty. > The codes is below: > {code} > } else if (failedStages.isEmpty) { > // Don't schedule an event to resubmit failed stages if failed > isn't empty, because > // in that case the event will already have been scheduled. > // TODO: Cancel running tasks in the stage > logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + > s"$failedStage (${failedStage.name}) due to fetch failure") > messageScheduler.schedule(new Runnable { > override def run(): Unit = > eventProcessLoop.post(ResubmitFailedStages) > }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) > } > failedStages += failedStage > failedStages += mapStage > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org