Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22112#discussion_r212462874
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
                 failedStages += failedStage
                 failedStages += mapStage
                 if (noResubmitEnqueued) {
    +              // If the map stage is INDETERMINATE, which means the map 
tasks may return
    +              // different result when re-try, we need to re-try all the 
tasks of the failed
    +              // stage and its succeeding stages, because the input data 
will be changed after the
    +              // map tasks are re-tried.
    +              // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
    +              // guaranteed to be idempotent, so the input data of the 
reducers will not change even
    +              // if the map tasks are re-tried.
    +              if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
    +                def rollBackStage(stage: Stage): Unit = stage match {
    +                  case mapStage: ShuffleMapStage =>
    +                    val numMissingPartitions = 
mapStage.findMissingPartitions().length
    +                    if (numMissingPartitions < mapStage.numTasks) {
    +                      markStageAsFinished(
    +                        mapStage,
    +                        Some("preceding shuffle map stage with random 
output gets retried."),
    +                        willRetry = true)
    +                      
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
    +                      failedStages += mapStage
    +                    }
    +
    +                  case resultStage =>
    +                    val numMissingPartitions = 
resultStage.findMissingPartitions().length
    +                    if (numMissingPartitions < resultStage.numTasks) {
    +                      // TODO: support to rollback result tasks.
    +                      val errorMessage = "A shuffle map stage with random 
output was failed and " +
    +                        s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
    +                        "to re-process the input data, and has to fail 
this job. Please " +
    +                        "eliminate the randomness by checkpointing the RDD 
before " +
    +                        "repartition/zip and try again."
    +                      abortStage(failedStage, errorMessage, None)
    +                    }
    +                }
    +
    +                def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
    +                  if (stageChain.head.id == failedStage.id) {
    +                    stageChain.foreach { stage =>
    +                      if (!failedStages.contains(stage)) 
rollBackStage(stage)
    +                    }
    +                  } else {
    +                    stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
    +                  }
    +                }
    --- End diff --
    
    
    Something like this sketch was what I meant :
    ```
    def rollbackSucceedingStages(stageChain: List[Stage], alreadyProcessed: 
Set[Int]): Set[Int] = {
    
      var processed = alreadyProcessed
      val stage = stageChain.head
    
      if (stage.id == failedStage.id) {
        stageChain.foreach { stage =>
          if (!failedStages.contains(stage)) rollBackStage(stage)
        }
      } else {
        stage.parents.foreach(s =>
          if (! processed.contains(s.id)){
            processed = rollbackSucceedingStages(s :: stageChain, processed)
        })
      }
    
      processed + failedStage.id
    }
    ```
    
    (or perhaps with mutable Set to make it simpler ?)
    This will reduce need to reprocess stages we have already handled in large 
dag's; where a stage subtree figures out in multiple places in the dag.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to