Copilot commented on code in PR #3590:
URL: https://github.com/apache/celeborn/pull/3590#discussion_r2719789693


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         shuffleKey: String,
         index: Int): Unit = {
       try {
-        fileWriter.write(body)
+        val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+          if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+            val (mapId, attemptId) = getMapAttempt(body)
+            val endedAttemptId = 
shuffleMapperAttempts.get(shuffleKey).get(mapId)

Review Comment:
   Potential ArrayIndexOutOfBoundsException when accessing 
shuffleMapperAttempts. If mapId is greater than or equal to the length of the 
AtomicIntegerArray, this will throw an exception. Consider adding a bounds 
check before accessing the array, similar to how other parts of the codebase 
handle this.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         shuffleKey: String,
         index: Int): Unit = {
       try {
-        fileWriter.write(body)
+        val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+          if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+            val (mapId, attemptId) = getMapAttempt(body)
+            val endedAttemptId = 
shuffleMapperAttempts.get(shuffleKey).get(mapId)
+            val toWriteAttempt = attemptId == endedAttemptId
+            (endedAttemptId, toWriteAttempt, mapId, attemptId)
+          } else (-1, true, -1, -1)
+        if (endedAttempt == -1 || toWrite) {

Review Comment:
   The logic for determining whether to write has a subtle issue. When 
endedAttemptId is -1 (meaning the map task has not completed yet), the check 
'attemptId == endedAttemptId' will only be true if attemptId is also -1, which 
is unlikely to be a valid scenario. The condition on line 1532 'endedAttempt == 
-1 || toWrite' correctly handles this by allowing writes when endedAttempt is 
-1, but the logic could be clearer. Consider restructuring to: if 
endedAttemptId is -1, always write; otherwise only write if attemptId matches 
endedAttemptId.
   ```suggestion
               val toWriteAttempt =
                 if (endedAttemptId == -1) {
                   // Map task has not completed yet, always write
                   true
                 } else {
                   // Only write if the current attempt matches the ended 
attempt
                   attemptId == endedAttemptId
                 }
               (endedAttemptId, toWriteAttempt, mapId, attemptId)
             } else (-1, true, -1, -1)
           if (toWrite) {
   ```



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         shuffleKey: String,
         index: Int): Unit = {
       try {
-        fileWriter.write(body)
+        val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+          if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+            val (mapId, attemptId) = getMapAttempt(body)
+            val endedAttemptId = 
shuffleMapperAttempts.get(shuffleKey).get(mapId)
+            val toWriteAttempt = attemptId == endedAttemptId
+            (endedAttemptId, toWriteAttempt, mapId, attemptId)
+          } else (-1, true, -1, -1)
+        if (endedAttempt == -1 || toWrite) {
+          fileWriter.write(body)
+        } else {
+          fileWriter.decrementPendingWrites()
+          logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore 
attemptId $curMapAttempt data and endedAttempt $endedAttempt")

Review Comment:
   The log message could be more clear about what is happening. Currently it 
says "Shuffle $shuffleKey is committing" but at this point the shuffle has 
already been committed (the map task ended). Consider rephrasing to something 
like "Skipping data from map $curMapId attempt $curMapAttempt because map 
already completed with attempt $endedAttempt" for better clarity.
   ```suggestion
             logInfo(
               s"Skipping data from map $curMapId attempt $curMapAttempt for 
shuffle $shuffleKey " +
                 s"because map already completed with attempt $endedAttempt")
   ```



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         shuffleKey: String,
         index: Int): Unit = {
       try {
-        fileWriter.write(body)
+        val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+          if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+            val (mapId, attemptId) = getMapAttempt(body)
+            val endedAttemptId = 
shuffleMapperAttempts.get(shuffleKey).get(mapId)
+            val toWriteAttempt = attemptId == endedAttemptId
+            (endedAttemptId, toWriteAttempt, mapId, attemptId)
+          } else (-1, true, -1, -1)
+        if (endedAttempt == -1 || toWrite) {
+          fileWriter.write(body)
+        } else {
+          fileWriter.decrementPendingWrites()
+          logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore 
attemptId $curMapAttempt data and endedAttempt $endedAttempt")
+        }

Review Comment:
   The new logic for skipping data from non-ended map attempts lacks test 
coverage. Consider adding unit tests to verify that: 1) data from the ended 
attempt is written successfully, 2) data from other attempts is skipped and 
decrementPendingWrites is called, 3) edge cases like mapId out of bounds are 
handled gracefully, and 4) the behavior when shuffleMapperAttempts doesn't 
contain the shuffle key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to