This is an automated email from the ASF dual-hosted git repository. binjieyang pushed a commit to branch CELEBORN-1445 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 4a822e0f5be4ecacbef4b4e624b4bd5eaf19a1b0 Author: binjie yang <[email protected]> AuthorDate: Fri May 31 20:38:21 2024 +0800 [CELEBORN-1445] handle NPE when open stream --- .../celeborn/common/util/ExceptionUtils.java | 4 ++-- .../worker/storage/MapPartitionDataReader.java | 2 +- .../service/deploy/worker/FetchHandler.scala | 26 +++++++++++++++++----- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java index 6652e99ef..2811c8275 100644 --- a/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java +++ b/common/src/main/java/org/apache/celeborn/common/util/ExceptionUtils.java @@ -36,8 +36,8 @@ public class ExceptionUtils { } } - public static Throwable wrapIOExceptionToUnRetryable(Throwable throwable) { - if (throwable instanceof IOException) { + public static Throwable wrapExceptionToUnRetryable(Throwable throwable) { + if (throwable instanceof IOException || throwable instanceof NullPointerException) { return new PartitionUnRetryAbleException(throwable.getMessage(), throwable); } else { return throwable; diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java index 6f9fa1871..117dd9644 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java @@ -419,7 +419,7 @@ public class MapPartitionDataReader implements Comparable<MapPartitionDataReader // And do not close channel because multiple streams are using the very same channel. // wrapIOException to PartitionUnRetryAbleException, client may choose regenerate the data. this.associatedChannel.writeAndFlush( - new TransportableError(streamId, ExceptionUtils.wrapIOExceptionToUnRetryable(throwable))); + new TransportableError(streamId, ExceptionUtils.wrapExceptionToUnRetryable(throwable))); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 934a4e967..2db266dd9 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -317,6 +317,11 @@ class FetchHandler( client.getChannel)}, Exception: ${e.getMessage}" PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue) .setErrorMsg(msg).build() + case npe: NullPointerException => + val msg = s"Fail to find the shuffle key $shuffleKey from ${NettyUtils.getRemoteAddress( + client.getChannel)}, Exception: ${npe.getMessage}" + PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue) + .setErrorMsg(msg).build() } finally { workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) } @@ -379,6 +384,9 @@ class FetchHandler( case e: IOException => workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT) handleRpcIOException(client, rpcRequestId, shuffleKey, fileName, e, callback) + case npe: NullPointerException => + workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT) + handleRpcNullPointerException(client, rpcRequestId, shuffleKey, npe, callback) } finally { workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey) } @@ -433,15 +441,23 @@ class FetchHandler( logError( s"Read file: $fileName with shuffleKey: $shuffleKey error from ${NettyUtils.getRemoteAddress(client.getChannel)}", ioe) - handleRpcException(client, requestId, ioe, rpcCallback) + handleRpcException(ioe, rpcCallback) } - private def handleRpcException( + private def handleRpcNullPointerException( client: TransportClient, requestId: Long, - ioe: IOException, - rpcResponseCallback: RpcResponseCallback): Unit = { - rpcResponseCallback.onFailure(ExceptionUtils.wrapIOExceptionToUnRetryable(ioe)) + shuffleKey: String, + npe: NullPointerException, + rpcCallback: RpcResponseCallback): Unit = { + logError( + s"Fail to find the shuffle key $shuffleKey from ${NettyUtils.getRemoteAddress( + client.getChannel)}, Exception: ${npe.getMessage}") + handleRpcException(npe, rpcCallback) + } + + private def handleRpcException(e: Exception, rpcResponseCallback: RpcResponseCallback): Unit = { + rpcResponseCallback.onFailure(ExceptionUtils.wrapExceptionToUnRetryable(e)) } def handleEndStreamFromClient(client: TransportClient, streamId: Long): Unit = {
