Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22112#discussion_r212197939
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
                 failedStages += failedStage
                 failedStages += mapStage
                 if (noResubmitEnqueued) {
    +              // If the map stage is INDETERMINATE, which means the map 
tasks may return
    +              // different result when re-try, we need to re-try all the 
tasks of the failed
    +              // stage and its succeeding stages, because the input data 
will be changed after the
    +              // map tasks are re-tried.
    +              // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
    +              // guaranteed to be idempotent, so the input data of the 
reducers will not change even
    +              // if the map tasks are re-tried.
    +              if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
    +                def rollBackStage(stage: Stage): Unit = stage match {
    +                  case mapStage: ShuffleMapStage =>
    +                    val numMissingPartitions = 
mapStage.findMissingPartitions().length
    +                    if (numMissingPartitions < mapStage.numTasks) {
    +                      markStageAsFinished(
    +                        mapStage,
    +                        Some("preceding shuffle map stage with random 
output gets retried."),
    +                        willRetry = true)
    +                      
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
    +                      failedStages += mapStage
    +                    }
    +
    +                  case resultStage =>
    +                    val numMissingPartitions = 
resultStage.findMissingPartitions().length
    +                    if (numMissingPartitions < resultStage.numTasks) {
    +                      // TODO: support to rollback result tasks.
    +                      val errorMessage = "A shuffle map stage with random 
output was failed and " +
    +                        s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
    +                        "to re-process the input data, and has to fail 
this job. Please " +
    +                        "eliminate the randomness by checkpointing the RDD 
before " +
    +                        "repartition/zip and try again."
    +                      abortStage(failedStage, errorMessage, None)
    +                    }
    +                }
    +
    +                def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
    +                  if (stageChain.head.id == failedStage.id) {
    +                    stageChain.foreach { stage =>
    +                      if (!failedStages.contains(stage)) 
rollBackStage(stage)
    +                    }
    +                  } else {
    +                    stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
    +                  }
    +                }
    --- End diff --
    
    This method looks expensive for large DAG's, memorization should help 
reduce the cost.
    Compute the set of stages to rollback and rollback the stages found.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to