turboFei commented on code in PR #3118:
URL: https://github.com/apache/celeborn/pull/3118#discussion_r2039004058


##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch:
##########
@@ -146,15 +146,76 @@ index b950c07f3d8..2cb430c3c3d 100644
  import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
  import org.apache.spark.internal.Logging
  import org.apache.spark.internal.config
-@@ -1780,7 +1781,7 @@ private[spark] class DAGScheduler(
-           failedStage.failedAttemptIds.add(task.stageAttemptId)
-           val shouldAbortStage =
-             failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts 
||
--            disallowStageRetryForTest
-+            disallowStageRetryForTest || 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+@@ -1369,7 +1370,10 @@ private[spark] class DAGScheduler(
+     // The operation here can make sure for the partially completed 
intermediate stage,
+     // `findMissingPartitions()` returns all partitions every time.
+     stage match {
+-      case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable 
=>
++      case sms: ShuffleMapStage if (stage.isIndeterminate ||
++        
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId)) && 
!sms.isAvailable =>
++        logInfo(s"Unregistering shuffle output for stage ${stage.id}" +
++          s" shuffle ${sms.shuffleDep.shuffleId}")
+         
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
+         sms.shuffleDep.newShuffleMergeState()
+       case _ =>
+@@ -1689,7 +1693,15 @@ private[spark] class DAGScheduler(
+         // tasks complete, they still count and we can mark the corresponding 
partitions as
+         // finished. Here we notify the task scheduler to skip running tasks 
for the same partition,
+         // to save resource.
+-        if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
++        // CELEBORN-1856, if stage is indeterminate or shuffleMapStage is 
skewed and read by
++        // Celeborn chunkOffsets, should not call notifyPartitionCompletion, 
otherwise will
++        // skip running tasks for the same partition because 
TaskSetManager.dequeueTaskFromList
++        // will skip running task which TaskSetManager.successful(taskIndex) 
is true.
++        // TODO: Suggest cherry-pick SPARK-45182 and SPARK-45498, ResultStage 
may has result commit and other issues
++        val isStageIndeterminate = stage.isInstanceOf[ShuffleMapStage] &&
++          CelebornShuffleState.isCelebornSkewedShuffle(
++            stage.asInstanceOf[ShuffleMapStage].shuffleDep.shuffleId)
++        if (task.stageAttemptId < stage.latestInfo.attemptNumber() && 
!isStageIndeterminate) {
+           taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
+         }
+ 
+@@ -1772,6 +1784,14 @@ private[spark] class DAGScheduler(
+         val failedStage = stageIdToStage(task.stageId)
+         val mapStage = shuffleIdToMapStage(shuffleId)
+ 
++        // In Celeborn-1139 we support read skew partition by Celeborn 
chunkOffsets,
++        // it will make shuffle be indeterminate, so abort the ResultStage 
directly here.
++        if (failedStage.isInstanceOf[ResultStage] && 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++          val shuffleFailedReason = s"ResultStage:${failedStage.id} fetch 
failed and the shuffle:$shuffleId " +
++            s"is skewed partition read by Celeborn, so abort it."
++          abortStage(failedStage, shuffleFailedReason, None)
++        }
++
+         if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
+           logInfo(s"Ignoring fetch failure from $task as it's from 
$failedStage attempt" +
+             s" ${task.stageAttemptId} and there is a more recent attempt for 
that stage " +
+@@ -1850,7 +1870,7 @@ private[spark] class DAGScheduler(
+               // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+               // guaranteed to be determinate, so the input data of the 
reducers will not change
+               // even if the map tasks are re-tried.
+-              if (mapStage.isIndeterminate) {
++              if (mapStage.isIndeterminate || 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
+                 // It's a little tricky to find all the succeeding stages of 
`mapStage`, because
+                 // each stage only know its parents not children. Here we 
traverse the stages from
+                 // the leaf nodes (the result stages of active jobs), and 
rollback all the stages
+@@ -1861,7 +1881,15 @@ private[spark] class DAGScheduler(
  
-           // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
-           // multiple tasks running concurrently on different executors). In 
that case, it is
+                 def collectStagesToRollback(stageChain: List[Stage]): Unit = {
+                   if (stagesToRollback.contains(stageChain.head)) {
+-                    stageChain.drop(1).foreach(s => stagesToRollback += s)
++                    stageChain.drop(1).foreach(s => {
++                      stagesToRollback += s
++                      s match {
++                        case currentMapStage: ShuffleMapStage =>
++                          
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)

Review Comment:
   Hi @wangshengjie123 do we need to have some check before 
registerCelebornSkewedShuffle?
   
   cc @RexXiong @Z1Wu  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to