This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f984efb0c [#1894] fix(server): Fix NPE caused by app not found issue
(#1915)
f984efb0c is described below
commit f984efb0c4681619182dc777c1cd365c98238406
Author: maobaolong <[email protected]>
AuthorDate: Wed Jul 17 14:13:43 2024 +0800
[#1894] fix(server): Fix NPE caused by app not found issue (#1915)
### What changes were proposed in this pull request?
Fix the NPE issue within shuffle server.
### Why are the changes needed?
Fix: #1894
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested on our test cluster.
---
.../org/apache/uniffle/common/rpc/StatusCode.java | 2 ++
.../client/impl/grpc/ShuffleServerGrpcClient.java | 7 +++++--
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 4 ++--
proto/src/main/proto/Rss.proto | 2 ++
.../uniffle/server/ShuffleServerGrpcService.java | 20 ++++++++++++++++++++
.../apache/uniffle/server/ShuffleServerMetrics.java | 4 ++++
.../server/netty/ShuffleServerNettyHandler.java | 17 +++++++++++++++++
7 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
index ff8ac231c..4b891440d 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
@@ -36,6 +36,8 @@ public enum StatusCode {
INVALID_REQUEST(9),
NO_BUFFER_FOR_HUGE_PARTITION(10),
STAGE_RETRY_IGNORE(11),
+ APP_NOT_FOUND(13),
+ INTERNAL_NOT_RETRY_ERROR(14),
UNKNOWN(-1);
static final Map<Integer, StatusCode> VALUE_MAP =
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 14dbf2f60..988b7a7f0 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -130,6 +130,9 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
protected Random random = new Random();
protected static final int BACK_OFF_BASE = 2000;
+ static final List<StatusCode> NOT_RETRY_STATUS_CODES =
+ Lists.newArrayList(
+ StatusCode.NO_REGISTER, StatusCode.APP_NOT_FOUND,
StatusCode.INTERNAL_NOT_RETRY_ERROR);
@VisibleForTesting
public ShuffleServerGrpcClient(String host, int port) {
@@ -595,7 +598,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
+ ", errorMsg:"
+ response.getRetMsg();
failedStatusCode.set(StatusCode.fromCode(response.getStatus().getNumber()));
- if (response.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
+ if (NOT_RETRY_STATUS_CODES.contains(failedStatusCode.get())) {
throw new NotRetryException(msg);
} else {
throw new RssException(msg);
@@ -606,7 +609,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
null,
request.getRetryIntervalMax(),
maxRetryAttempts,
- t -> !(t instanceof OutOfMemoryError));
+ t -> !(t instanceof OutOfMemoryError) && !(t instanceof
NotRetryException));
} catch (Throwable throwable) {
LOG.warn("Failed to send shuffle data due to ", throwable);
isSuccessful = false;
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 26e53851d..6da8788d8 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -217,7 +217,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
+ rpcResponse.getStatusCode()
+ ", errorMsg:"
+ rpcResponse.getRetMessage();
- if (rpcResponse.getStatusCode() == StatusCode.NO_REGISTER) {
+ if
(NOT_RETRY_STATUS_CODES.contains(rpcResponse.getStatusCode())) {
throw new NotRetryException(msg);
} else {
throw new RssException(msg);
@@ -228,7 +228,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
null,
request.getRetryIntervalMax(),
maxRetryAttempts,
- t -> !(t instanceof OutOfMemoryError));
+ t -> !(t instanceof OutOfMemoryError) && !(t instanceof
NotRetryException));
} catch (Throwable throwable) {
LOG.warn("Failed to send shuffle data due to ", throwable);
isSuccessful = false;
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 97928bf20..5ce36df7e 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -308,6 +308,8 @@ enum StatusCode {
INVALID_REQUEST = 9;
NO_BUFFER_FOR_HUGE_PARTITION = 10;
STAGE_RETRY_IGNORE = 11;
+ APP_NOT_FOUND = 13;
+ INTERNAL_NOT_RETRY_ERROR = 14;
// add more status
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index b6e37029f..aea43d24e 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -254,6 +254,26 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
long timestamp = req.getTimestamp();
int stageAttemptNumber = req.getStageAttemptNumber();
ShuffleTaskInfo taskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+ if (taskInfo == null) {
+ String errorMsg =
+ "APP_NOT_FOUND error, requireBufferId["
+ + requireBufferId
+ + "] for appId["
+ + appId
+ + "], shuffleId["
+ + shuffleId
+ + "]";
+ LOG.error(errorMsg);
+ ShuffleServerMetrics.counterAppNotFound.inc();
+ reply =
+ SendShuffleDataResponse.newBuilder()
+ .setStatus(StatusCode.APP_NOT_FOUND.toProto())
+ .setRetMsg(errorMsg)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ return;
+ }
Integer latestStageAttemptNumber =
taskInfo.getLatestStageAttemptNumber(shuffleId);
// The Stage retry occurred, and the task before StageNumber was simply
ignored and not
// processed if the task was being sent.
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index f97978c0f..8eada73c5 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -134,6 +134,7 @@ public class ShuffleServerMetrics {
private static final String TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM =
"total_expired_preAllocated_buffer_num";
+ private static final String TOTAL_APP_NOT_FOUND_NUM =
"total_app_not_found_num";
private static final String TOTAL_REMOVE_RESOURCE_TIME =
"total_remove_resource_time";
private static final String TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME =
@@ -237,6 +238,7 @@ public class ShuffleServerMetrics {
public static Counter counterLocalFileEventFlush;
public static Counter counterHadoopEventFlush;
public static Counter counterPreAllocatedBufferExpired;
+ public static Counter counterAppNotFound;
private static MetricsManager metricsManager;
private static boolean isRegister = false;
@@ -462,6 +464,8 @@ public class ShuffleServerMetrics {
counterPreAllocatedBufferExpired =
metricsManager.addCounter(TOTAL_EXPIRED_PRE_ALLOCATED_BUFFER_NUM);
+ counterAppNotFound = metricsManager.addCounter(TOTAL_APP_NOT_FOUND_NUM);
+
summaryTotalRemoveResourceTime =
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_TIME);
summaryTotalRemoveResourceByShuffleIdsTime =
metricsManager.addSummary(TOTAL_REMOVE_RESOURCE_BY_SHUFFLE_IDS_TIME);
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index 448c12a23..d297603b9 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -105,6 +105,23 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
long timestamp = req.getTimestamp();
int stageAttemptNumber = req.getStageAttemptNumber();
ShuffleTaskInfo taskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+ if (taskInfo == null) {
+ rpcResponse =
+ new RpcResponse(
+ req.getRequestId(), StatusCode.APP_NOT_FOUND, "appId: " + appId
+ " not found");
+ String errorMsg =
+ "APP_NOT_FOUND error, requireBufferId["
+ + requireBufferId
+ + "] for appId["
+ + appId
+ + "], shuffleId["
+ + shuffleId
+ + "]";
+ LOG.error(errorMsg);
+ ShuffleServerMetrics.counterAppNotFound.inc();
+ client.getChannel().writeAndFlush(rpcResponse);
+ return;
+ }
Integer latestStageAttemptNumber =
taskInfo.getLatestStageAttemptNumber(shuffleId);
// The Stage retry occurred, and the task before StageNumber was simply
ignored and not
// processed if the task was being sent.