turboFei commented on code in PR #3118:
URL: https://github.com/apache/celeborn/pull/3118#discussion_r2039004058
##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch:
##########
@@ -146,15 +146,76 @@ index b950c07f3d8..2cb430c3c3d 100644
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
-@@ -1780,7 +1781,7 @@ private[spark] class DAGScheduler(
- failedStage.failedAttemptIds.add(task.stageAttemptId)
- val shouldAbortStage =
- failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts
||
-- disallowStageRetryForTest
-+ disallowStageRetryForTest ||
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+@@ -1369,7 +1370,10 @@ private[spark] class DAGScheduler(
+ // The operation here can make sure for the partially completed
intermediate stage,
+ // `findMissingPartitions()` returns all partitions every time.
+ stage match {
+- case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable
=>
++ case sms: ShuffleMapStage if (stage.isIndeterminate ||
++
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId)) &&
!sms.isAvailable =>
++ logInfo(s"Unregistering shuffle output for stage ${stage.id}" +
++ s" shuffle ${sms.shuffleDep.shuffleId}")
+
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
+ sms.shuffleDep.newShuffleMergeState()
+ case _ =>
+@@ -1689,7 +1693,15 @@ private[spark] class DAGScheduler(
+ // tasks complete, they still count and we can mark the corresponding
partitions as
+ // finished. Here we notify the task scheduler to skip running tasks
for the same partition,
+ // to save resource.
+- if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
++ // CELEBORN-1856, if stage is indeterminate or shuffleMapStage is
skewed and read by
++ // Celeborn chunkOffsets, should not call notifyPartitionCompletion,
otherwise will
++ // skip running tasks for the same partition because
TaskSetManager.dequeueTaskFromList
++ // will skip running task which TaskSetManager.successful(taskIndex)
is true.
++ // TODO: Suggest cherry-pick SPARK-45182 and SPARK-45498, ResultStage
may has result commit and other issues
++ val isStageIndeterminate = stage.isInstanceOf[ShuffleMapStage] &&
++ CelebornShuffleState.isCelebornSkewedShuffle(
++ stage.asInstanceOf[ShuffleMapStage].shuffleDep.shuffleId)
++ if (task.stageAttemptId < stage.latestInfo.attemptNumber() &&
!isStageIndeterminate) {
+ taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
+ }
+
+@@ -1772,6 +1784,14 @@ private[spark] class DAGScheduler(
+ val failedStage = stageIdToStage(task.stageId)
+ val mapStage = shuffleIdToMapStage(shuffleId)
+
++ // In Celeborn-1139 we support read skew partition by Celeborn
chunkOffsets,
++ // it will make shuffle be indeterminate, so abort the ResultStage
directly here.
++ if (failedStage.isInstanceOf[ResultStage] &&
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
++ val shuffleFailedReason = s"ResultStage:${failedStage.id} fetch
failed and the shuffle:$shuffleId " +
++ s"is skewed partition read by Celeborn, so abort it."
++ abortStage(failedStage, shuffleFailedReason, None)
++ }
++
+ if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
+ logInfo(s"Ignoring fetch failure from $task as it's from
$failedStage attempt" +
+ s" ${task.stageAttemptId} and there is a more recent attempt for
that stage " +
+@@ -1850,7 +1870,7 @@ private[spark] class DAGScheduler(
+ // Note that, if map stage is UNORDERED, we are fine. The
shuffle partitioner is
+ // guaranteed to be determinate, so the input data of the
reducers will not change
+ // even if the map tasks are re-tried.
+- if (mapStage.isIndeterminate) {
++ if (mapStage.isIndeterminate ||
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)) {
+ // It's a little tricky to find all the succeeding stages of
`mapStage`, because
+ // each stage only know its parents not children. Here we
traverse the stages from
+ // the leaf nodes (the result stages of active jobs), and
rollback all the stages
+@@ -1861,7 +1881,15 @@ private[spark] class DAGScheduler(
- // It is likely that we receive multiple FetchFailed for a single
stage (because we have
- // multiple tasks running concurrently on different executors). In
that case, it is
+ def collectStagesToRollback(stageChain: List[Stage]): Unit = {
+ if (stagesToRollback.contains(stageChain.head)) {
+- stageChain.drop(1).foreach(s => stagesToRollback += s)
++ stageChain.drop(1).foreach(s => {
++ stagesToRollback += s
++ s match {
++ case currentMapStage: ShuffleMapStage =>
++
CelebornShuffleState.registerCelebornSkewedShuffle(currentMapStage.shuffleDep.shuffleId)
Review Comment:
Hi @wangshengjie123 do we need to have some check before
registerCelebornSkewedShuffle?
cc @RexXiong @Z1Wu
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]