This is an automated email from the ASF dual-hosted git repository.

binjieyang pushed a commit to branch CELEBORN-1445
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit 4a822e0f5be4ecacbef4b4e624b4bd5eaf19a1b0
Author: binjie yang <[email protected]>
AuthorDate: Fri May 31 20:38:21 2024 +0800

    [CELEBORN-1445] handle NPE when open stream
---
 .../celeborn/common/util/ExceptionUtils.java       |  4 ++--
 .../worker/storage/MapPartitionDataReader.java     |  2 +-
 .../service/deploy/worker/FetchHandler.scala       | 26 +++++++++++++++++-----
 3 files changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
index 6652e99ef..2811c8275 100644
--- a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java
@@ -36,8 +36,8 @@ public class ExceptionUtils {
     }
   }
 
-  public static Throwable wrapIOExceptionToUnRetryable(Throwable throwable) {
-    if (throwable instanceof IOException) {
+  public static Throwable wrapExceptionToUnRetryable(Throwable throwable) {
+    if (throwable instanceof IOException || throwable instanceof 
NullPointerException) {
       return new PartitionUnRetryAbleException(throwable.getMessage(), 
throwable);
     } else {
       return throwable;
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 6f9fa1871..117dd9644 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -419,7 +419,7 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       // And do not close channel because multiple streams are using the very 
same channel.
       // wrapIOException to PartitionUnRetryAbleException, client may choose 
regenerate the data.
       this.associatedChannel.writeAndFlush(
-          new TransportableError(streamId, 
ExceptionUtils.wrapIOExceptionToUnRetryable(throwable)));
+          new TransportableError(streamId, 
ExceptionUtils.wrapExceptionToUnRetryable(throwable)));
     }
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 934a4e967..2db266dd9 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -317,6 +317,11 @@ class FetchHandler(
             client.getChannel)}, Exception: ${e.getMessage}"
         
PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue)
           .setErrorMsg(msg).build()
+      case npe: NullPointerException =>
+        val msg = s"Fail to find the shuffle key $shuffleKey from 
${NettyUtils.getRemoteAddress(
+          client.getChannel)}, Exception: ${npe.getMessage}"
+        
PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue)
+          .setErrorMsg(msg).build()
     } finally {
       workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
     }
@@ -379,6 +384,9 @@ class FetchHandler(
       case e: IOException =>
         workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
         handleRpcIOException(client, rpcRequestId, shuffleKey, fileName, e, 
callback)
+      case npe: NullPointerException =>
+        workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
+        handleRpcNullPointerException(client, rpcRequestId, shuffleKey, npe, 
callback)
     } finally {
       workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
     }
@@ -433,15 +441,23 @@ class FetchHandler(
     logError(
       s"Read file: $fileName with shuffleKey: $shuffleKey error from 
${NettyUtils.getRemoteAddress(client.getChannel)}",
       ioe)
-    handleRpcException(client, requestId, ioe, rpcCallback)
+    handleRpcException(ioe, rpcCallback)
   }
 
-  private def handleRpcException(
+  private def handleRpcNullPointerException(
       client: TransportClient,
       requestId: Long,
-      ioe: IOException,
-      rpcResponseCallback: RpcResponseCallback): Unit = {
-    
rpcResponseCallback.onFailure(ExceptionUtils.wrapIOExceptionToUnRetryable(ioe))
+      shuffleKey: String,
+      npe: NullPointerException,
+      rpcCallback: RpcResponseCallback): Unit = {
+    logError(
+      s"Fail to find the shuffle key $shuffleKey from 
${NettyUtils.getRemoteAddress(
+        client.getChannel)}, Exception: ${npe.getMessage}")
+    handleRpcException(npe, rpcCallback)
+  }
+
+  private def handleRpcException(e: Exception, rpcResponseCallback: 
RpcResponseCallback): Unit = {
+    rpcResponseCallback.onFailure(ExceptionUtils.wrapExceptionToUnRetryable(e))
   }
 
   def handleEndStreamFromClient(client: TransportClient, streamId: Long): Unit 
= {

Reply via email to