This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new c1175f89f [CELEBORN-1686][FOLLOWUP] Avoid NPE when clean up data pusher
c1175f89f is described below

commit c1175f89f279e1a5a1cf030db9c4f054f7f62254
Author: wuziyi <[email protected]>
AuthorDate: Tue Nov 19 14:35:21 2024 +0800

    [CELEBORN-1686][FOLLOWUP] Avoid NPE when clean up data pusher
    
    ### What changes were proposed in this pull request?
    
    when code execute in this path , an unexpected NPE occurs .
    
    1. write data
    ``` java
    // org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#write
    public void write(scala.collection.Iterator<Product2<K, V>> records) throws 
IOException {
        boolean needCleanupPusher = true;
        try {
           ....
           // write data
          close();
          needCleanupPusher = false;
        } catch (InterruptedException e) {
          TaskInterruptedHelper.throwTaskKillException();
        } finally {
          if (needCleanupPusher) {
            cleanupPusher();
          }
        }
      }
    ```
    2. close data pusher, exception may throw when push merge data after set 
`IdleQueue` to null.
    ```java
    org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#close
    private void close() throws IOException, InterruptedException {
        long pushMergedDataTime = System.nanoTime();
        dataPusher.waitOnTermination();
        // ** set `IdleQueue` to null here
        sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
        shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId);
        closeWrite();
        // ** exception occurs when push merge data
        shuffleClient.pushMergedData(shuffleId, mapId, encodedAttemptId);
        .....
      }
    ```
    3.  cleanup DataPusher in  `finally`
    ``` java
    // 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#write{finally_block}
    //   
->org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter#cleanupPusher
    //     -> org.apache.celeborn.client.write.DataPusher#waitOnTermination
    //       -> 
org.apache.celeborn.client.write.DataPusher#waitIdleQueueFullWithLock
    private void waitIdleQueueFullWithLock() throws InterruptedException {
        try {
          while (idleQueue.remainingCapacity() > 0 // ** where npe occurs **
              && exceptionRef.get() == null
              && (pushThread != null && pushThread.isAlive())) {
            idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
          }
        } catch (InterruptedException e) {
          logger.error("Thread interrupted while waitIdleQueueFullWithLock.", 
e);
          throw e;
        } finally {
          idleLock.unlock();
        }
      }
    
    ```
    
    error stacktrace
    
    
![image](https://github.com/user-attachments/assets/b05b3483-cf90-4a7d-b4cd-114ea1621508)
    
    ### Why are the changes needed?
    
    avoid potential NPE exception when clean up data pusher.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Closes #2926 from Z1Wu/fix/npe_shuffle_write.
    
    Authored-by: wuziyi <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 client/src/main/java/org/apache/celeborn/client/write/DataPusher.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java 
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index db8b6ab50..311a8da99 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -213,7 +213,8 @@ public class DataPusher {
 
   private void waitIdleQueueFullWithLock() throws InterruptedException {
     try {
-      while (idleQueue.remainingCapacity() > 0
+      while (idleQueue != null
+          && idleQueue.remainingCapacity() > 0
           && exceptionRef.get() == null
           && (pushThread != null && pushThread.isAlive())) {
         idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);

Reply via email to