This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f4cda5b70 [#1456] improvement(client): Better exception handling when
calling requireBuffer using GRPC (#1457)
f4cda5b70 is described below
commit f4cda5b70cfe10ad572f651c580426eeca672709
Author: RickyMa <[email protected]>
AuthorDate: Thu Jan 18 10:07:47 2024 +0800
[#1456] improvement(client): Better exception handling when calling
requireBuffer using GRPC (#1457)
### What changes were proposed in this pull request?
1、Refactor the method ShuffleServerGrpcClient.requirePreAllocation, calling
getBlockingStub().requireBuffer only once.
2、Try-catch getBlockingStub().requireBuffer's exception, log the error, and
return FAILED_REQUIRE_ID.
3、Refactor the log to make it more appropriate.
### Why are the changes needed?
For #1456
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../client/impl/grpc/ShuffleServerGrpcClient.java | 50 +++++++++++++++-------
1 file changed, 35 insertions(+), 15 deletions(-)
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 18144378b..6bae6f62c 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -237,23 +237,33 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
.build();
long start = System.currentTimeMillis();
- RequireBufferResponse rpcResponse =
getBlockingStub().requireBuffer(rpcRequest);
int retry = 0;
long result = FAILED_REQUIRE_ID;
Random random = new Random();
final int backOffBase = 2000;
- LOG.info(
- "Can't require buffer for appId: {}, shuffleId: {}, partitionIds: {}
with {} bytes from {}:{} due to {}, sleep and try[{}] again",
- appId,
- shuffleId,
- partitionIds,
- requireSize,
- host,
- port,
- rpcResponse.getStatus(),
- retry);
- while (rpcResponse.getStatus() == RssProtos.StatusCode.NO_BUFFER
- || rpcResponse.getStatus() ==
RssProtos.StatusCode.NO_BUFFER_FOR_HUGE_PARTITION) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Requiring buffer for appId: {}, shuffleId: {}, partitionIds: {}
with {} bytes from {}:{}",
+ appId,
+ shuffleId,
+ partitionIds,
+ requireSize,
+ host,
+ port);
+ }
+ RequireBufferResponse rpcResponse;
+ while (true) {
+ try {
+ rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
+ } catch (Exception e) {
+ LOG.error(
+ "Exception happened when requiring pre-allocated buffer from
{}:{}", host, port, e);
+ return result;
+ }
+ if (rpcResponse.getStatus() != RssProtos.StatusCode.NO_BUFFER
+ && rpcResponse.getStatus() !=
RssProtos.StatusCode.NO_BUFFER_FOR_HUGE_PARTITION) {
+ break;
+ }
if (retry >= retryMax) {
LOG.warn(
"ShuffleServer "
@@ -270,15 +280,25 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
return result;
}
try {
+ LOG.info(
+ "Can't require buffer for appId: {}, shuffleId: {}, partitionIds:
{} with {} bytes from {}:{} due to {}, sleep and try[{}] again",
+ appId,
+ shuffleId,
+ partitionIds,
+ requireSize,
+ host,
+ port,
+ rpcResponse.getStatus(),
+ retry);
long backoffTime =
Math.min(
retryIntervalMax,
backOffBase * (1L << Math.min(retry, 16)) +
random.nextInt(backOffBase));
Thread.sleep(backoffTime);
} catch (Exception e) {
- LOG.warn("Exception happened when require pre allocation from " + host
+ ":" + port, e);
+ LOG.warn(
+ "Exception happened when requiring pre-allocated buffer from
{}:{}", host, port, e);
}
- rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
retry++;
}
if (rpcResponse.getStatus() == RssProtos.StatusCode.SUCCESS) {