This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new c1175f89f [CELEBORN-1686][FOLLOWUP] Avoid NPE when clean up data pusher
c1175f89f is described below
commit c1175f89f279e1a5a1cf030db9c4f054f7f62254
Author: wuziyi <[email protected]>
AuthorDate: Tue Nov 19 14:35:21 2024 +0800
[CELEBORN-1686][FOLLOWUP] Avoid NPE when clean up data pusher
### What changes were proposed in this pull request?
when code execute in this path , an unexpected NPE occurs .
1. write data
``` java
// org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#write
public void write(scala.collection.Iterator<Product2<K, V>> records) throws
IOException {
boolean needCleanupPusher = true;
try {
....
// write data
close();
needCleanupPusher = false;
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
} finally {
if (needCleanupPusher) {
cleanupPusher();
}
}
}
```
2. close data pusher, exception may throw when push merge data after set
`IdleQueue` to null.
```java
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#close
private void close() throws IOException, InterruptedException {
long pushMergedDataTime = System.nanoTime();
dataPusher.waitOnTermination();
// ** set `IdleQueue` to null here
sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId);
closeWrite();
// ** exception occurs when push merge data
shuffleClient.pushMergedData(shuffleId, mapId, encodedAttemptId);
.....
}
```
3. cleanup DataPusher in `finally`
``` java
//
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#write{finally_block}
//
->org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#cleanupPusher
// -> org.apache.celeborn.client.write.DataPusher#waitOnTermination
// ->
org.apache.celeborn.client.write.DataPusher#waitIdleQueueFullWithLock
private void waitIdleQueueFullWithLock() throws InterruptedException {
try {
while (idleQueue.remainingCapacity() > 0 // ** where npe occurs **
&& exceptionRef.get() == null
&& (pushThread != null && pushThread.isAlive())) {
idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
}
} catch (InterruptedException e) {
logger.error("Thread interrupted while waitIdleQueueFullWithLock.",
e);
throw e;
} finally {
idleLock.unlock();
}
}
```
error stacktrace

### Why are the changes needed?
avoid potential NPE exception when clean up data pusher.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Closes #2926 from Z1Wu/fix/npe_shuffle_write.
Authored-by: wuziyi <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
client/src/main/java/org/apache/celeborn/client/write/DataPusher.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index db8b6ab50..311a8da99 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -213,7 +213,8 @@ public class DataPusher {
private void waitIdleQueueFullWithLock() throws InterruptedException {
try {
- while (idleQueue.remainingCapacity() > 0
+ while (idleQueue != null
+ && idleQueue.remainingCapacity() > 0
&& exceptionRef.get() == null
&& (pushThread != null && pushThread.isAlive())) {
idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);