mridulm commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1702430013


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java:
##########
@@ -66,6 +67,26 @@ public class SparkShuffleManager implements ShuffleManager {
   private ExecutorShuffleIdTracker shuffleIdTracker = new 
ExecutorShuffleIdTracker();
 
   public SparkShuffleManager(SparkConf conf, boolean isDriver) {
+    int maxStageAttempts =
+        conf.getInt(
+            "spark.stage.maxConsecutiveAttempts",
+            DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS());
+    int maxTaskAttempts = (Integer) 
conf.get(package$.MODULE$.MAX_TASK_FAILURES());

Review Comment:
   nit: Move this into the common utils, so that both `getMapAttemptNumber` and 
`validateAttemptConfig` (or some such) are together. Also, we can use the same 
for both spark 2 and 3.



##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
     }
   }
 
+  public static int getMapAttemptNumber(TaskContext context) {
+    assert (context.stageAttemptNumber() < (1 << 15) && 
context.attemptNumber() < (1 << 16));

Review Comment:
   nit: We can drop the `assert` - we are validating it during initialization.



##########
tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala:
##########
@@ -297,4 +297,45 @@ class CelebornFetchFailureSuite extends AnyFunSuite
       sparkSession.stop()
     }
   }
+
+  test("celeborn spark integration test - resubmit an unordered barrier 
stage") {

Review Comment:
   This specific test will pass when we support barrier stages (even without 
the changes in this PR).
   @RexXiong's changes did reproduce the issue - perhaps adapt it here ?



##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
     }
   }
 
+  public static int getMapAttemptNumber(TaskContext context) {
+    assert (context.stageAttemptNumber() < (1 << 15) && 
context.attemptNumber() < (1 << 16));
+    return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+  }
+

Review Comment:
   Resolving this long thread :-)



##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java:
##########
@@ -273,7 +273,7 @@ private void pushGiantRecord(int partitionId, byte[] 
buffer, int numBytes) throw
         shuffleClient.pushData(
             shuffleId,
             mapId,
-            taskContext.attemptNumber(),
+            SparkUtils.getMapAttemptNumber(taskContext),

Review Comment:
   Instead of `SparkUtils`, which is version specific, create a util in common 
and add `getMapAttemptNumber` there ?
   This is going to be the same across spark versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to