This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 5341de54d [CELEBORN-1728] Fix NPE when failing to connect to celeborn 
worker
5341de54d is described below

commit 5341de54dc52723716a90aafa5bf3efe68202866
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 21 16:23:26 2024 +0800

    [CELEBORN-1728] Fix NPE when failing to connect to celeborn worker
    
    ### What changes were proposed in this pull request?
    Fix NPE. When failed to connect to celeborn worker, the currentReader might 
be `null`.
    
    ### Why are the changes needed?
    
    I am testing https://github.com/apache/celeborn/pull/2921 in the celeborn 
cluster.
    
    And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure 
testing, and it failed to connect to celeborn worker for 3 times, and then the 
currentReader was null.
    
    <img width="1700" alt="image" 
src="https://github.com/user-attachments/assets/9473294d-2cca-4f8b-bc86-ab6f70f04cff";>
    
    
https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672
    
    ```
    24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 
(TID 108550)
    java.lang.NullPointerException
            at 
org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672)
            at 
org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
            at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
            at java.io.DataInputStream.read(DataInputStream.java:149)
            at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
            at 
org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
            at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
            at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
            at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
            at 
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown
 Source)
            at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
            at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
            at 
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
            at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
            at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
            at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
            at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
            at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:131)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    GA.
    
    Closes #2933 from turboFei/npe_reader.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 094fe2813debbfd28e51fa7e035fa57f962fcf7a)
    Signed-off-by: mingji <[email protected]>
---
 .../java/org/apache/celeborn/client/read/CelebornInputStream.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 6e840135f..88de7a5e3 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -663,7 +663,7 @@ public abstract class CelebornInputStream extends 
InputStream {
             appShuffleId,
             shuffleId,
             partitionId,
-            currentReader.getLocation(),
+            
Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
             e);
         IOException ioe;
         if (e instanceof IOException) {
@@ -691,7 +691,7 @@ public abstract class CelebornInputStream extends 
InputStream {
             appShuffleId,
             shuffleId,
             partitionId,
-            currentReader.getLocation(),
+            
Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
             e);
         throw e;
       }

Reply via email to