RexXiong commented on code in PR #3118:
URL: https://github.com/apache/celeborn/pull/3118#discussion_r1982613357


##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -462,4 +462,18 @@ public static void addSparkListener(SparkListener 
listener) {
       sparkContext.addSparkListener(listener);
     }
   }
+
+  private static final DynMethods.UnboundMethod isCelebornSkewShuffle_METHOD =
+      DynMethods.builder("isCelebornSkewedShuffle")
+          .hiddenImpl("org.apache.spark.celeborn.CelebornShuffleState", 
Integer.TYPE)
+          .orNoop()
+          .build();
+
+  public static boolean isCelebornSkewShuffleOrChildShuffle(int 
sparkShuffleId) {

Review Comment:
   ditto, sparkShuffleId -> appShuffleId



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1042,6 +1043,14 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
   }
 
+  private def isCelebornSkewShuffleOrChildShuffle(shuffleId: Int): Boolean = {

Review Comment:
   better rename shuffleId to appShuffleId



##########
assets/spark-patch/Celeborn-Optimize-Skew-Partitions-spark3_2.patch:
##########
@@ -146,15 +146,59 @@ index b950c07f3d8..2cb430c3c3d 100644
  import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
  import org.apache.spark.internal.Logging
  import org.apache.spark.internal.config
-@@ -1780,7 +1781,7 @@ private[spark] class DAGScheduler(
-           failedStage.failedAttemptIds.add(task.stageAttemptId)
-           val shouldAbortStage =
-             failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts 
||
--            disallowStageRetryForTest
-+            disallowStageRetryForTest || 
CelebornShuffleState.isCelebornSkewedShuffle(shuffleId)
+@@ -1369,7 +1370,10 @@ private[spark] class DAGScheduler(
+     // The operation here can make sure for the partially completed 
intermediate stage,
+     // `findMissingPartitions()` returns all partitions every time.
+     stage match {
+-      case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable 
=>
++      case sms: ShuffleMapStage if (stage.isIndeterminate && 
!sms.isAvailable) ||

Review Comment:
   I think it should be `(stage.isIndeterminate || 
CelebornShuffleState.isCelebornSkewedShuffle(sms.shuffleDep.shuffleId)) &&  
!sms.isAvailable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to