Copilot commented on code in PR #3590:
URL: https://github.com/apache/celeborn/pull/3590#discussion_r2719789693
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
shuffleKey: String,
index: Int): Unit = {
try {
- fileWriter.write(body)
+ val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+ if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+ val (mapId, attemptId) = getMapAttempt(body)
+ val endedAttemptId =
shuffleMapperAttempts.get(shuffleKey).get(mapId)
Review Comment:
Potential ArrayIndexOutOfBoundsException when accessing
shuffleMapperAttempts. If mapId is greater than or equal to the length of the
AtomicIntegerArray, this will throw an exception. Consider adding a bounds
check before accessing the array, similar to how other parts of the codebase
handle this.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
shuffleKey: String,
index: Int): Unit = {
try {
- fileWriter.write(body)
+ val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+ if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+ val (mapId, attemptId) = getMapAttempt(body)
+ val endedAttemptId =
shuffleMapperAttempts.get(shuffleKey).get(mapId)
+ val toWriteAttempt = attemptId == endedAttemptId
+ (endedAttemptId, toWriteAttempt, mapId, attemptId)
+ } else (-1, true, -1, -1)
+ if (endedAttempt == -1 || toWrite) {
Review Comment:
The logic for determining whether to write has a subtle issue. When
endedAttemptId is -1 (meaning the map task has not completed yet), the check
'attemptId == endedAttemptId' will only be true if attemptId is also -1, which
is unlikely to be a valid scenario. The condition on line 1532 'endedAttempt ==
-1 || toWrite' correctly handles this by allowing writes when endedAttempt is
-1, but the logic could be clearer. Consider restructuring to: if
endedAttemptId is -1, always write; otherwise only write if attemptId matches
endedAttemptId.
```suggestion
val toWriteAttempt =
if (endedAttemptId == -1) {
// Map task has not completed yet, always write
true
} else {
// Only write if the current attempt matches the ended
attempt
attemptId == endedAttemptId
}
(endedAttemptId, toWriteAttempt, mapId, attemptId)
} else (-1, true, -1, -1)
if (toWrite) {
```
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
shuffleKey: String,
index: Int): Unit = {
try {
- fileWriter.write(body)
+ val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+ if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+ val (mapId, attemptId) = getMapAttempt(body)
+ val endedAttemptId =
shuffleMapperAttempts.get(shuffleKey).get(mapId)
+ val toWriteAttempt = attemptId == endedAttemptId
+ (endedAttemptId, toWriteAttempt, mapId, attemptId)
+ } else (-1, true, -1, -1)
+ if (endedAttempt == -1 || toWrite) {
+ fileWriter.write(body)
+ } else {
+ fileWriter.decrementPendingWrites()
+ logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore
attemptId $curMapAttempt data and endedAttempt $endedAttempt")
Review Comment:
The log message could be more clear about what is happening. Currently it
says "Shuffle $shuffleKey is committing" but at this point the shuffle has
already been committed (the map task ended). Consider rephrasing to something
like "Skipping data from map $curMapId attempt $curMapAttempt because map
already completed with attempt $endedAttempt" for better clarity.
```suggestion
logInfo(
s"Skipping data from map $curMapId attempt $curMapAttempt for
shuffle $shuffleKey " +
s"because map already completed with attempt $endedAttempt")
```
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1522,7 +1522,19 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
shuffleKey: String,
index: Int): Unit = {
try {
- fileWriter.write(body)
+ val (endedAttempt, toWrite, curMapId, curMapAttempt) =
+ if (shuffleMapperAttempts.containsKey(shuffleKey)) {
+ val (mapId, attemptId) = getMapAttempt(body)
+ val endedAttemptId =
shuffleMapperAttempts.get(shuffleKey).get(mapId)
+ val toWriteAttempt = attemptId == endedAttemptId
+ (endedAttemptId, toWriteAttempt, mapId, attemptId)
+ } else (-1, true, -1, -1)
+ if (endedAttempt == -1 || toWrite) {
+ fileWriter.write(body)
+ } else {
+ fileWriter.decrementPendingWrites()
+ logInfo(s"Shuffle $shuffleKey is committing, map $curMapId, ignore
attemptId $curMapAttempt data and endedAttempt $endedAttempt")
+ }
Review Comment:
The new logic for skipping data from non-ended map attempts lacks test
coverage. Consider adding unit tests to verify that: 1) data from the ended
attempt is written successfully, 2) data from other attempts is skipped and
decrementPendingWrites is called, 3) edge cases like mapId out of bounds are
handled gracefully, and 4) the behavior when shuffleMapperAttempts doesn't
contain the shuffle key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]