This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f4cda5b70 [#1456] improvement(client): Better exception handling when 
calling requireBuffer using GRPC (#1457)
f4cda5b70 is described below

commit f4cda5b70cfe10ad572f651c580426eeca672709
Author: RickyMa <[email protected]>
AuthorDate: Thu Jan 18 10:07:47 2024 +0800

    [#1456] improvement(client): Better exception handling when calling 
requireBuffer using GRPC (#1457)
    
    ### What changes were proposed in this pull request?
    1、Refactor the method ShuffleServerGrpcClient.requirePreAllocation, calling 
getBlockingStub().requireBuffer only once.
    2、Try-catch getBlockingStub().requireBuffer's exception, log the error, and 
return FAILED_REQUIRE_ID.
    3、Refactor the log to make it more appropriate.
    
    ### Why are the changes needed?
    For #1456
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
---
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 50 +++++++++++++++-------
 1 file changed, 35 insertions(+), 15 deletions(-)

diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 18144378b..6bae6f62c 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -237,23 +237,33 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
             .build();
 
     long start = System.currentTimeMillis();
-    RequireBufferResponse rpcResponse = 
getBlockingStub().requireBuffer(rpcRequest);
     int retry = 0;
     long result = FAILED_REQUIRE_ID;
     Random random = new Random();
     final int backOffBase = 2000;
-    LOG.info(
-        "Can't require buffer for appId: {}, shuffleId: {}, partitionIds: {} 
with {} bytes from {}:{} due to {}, sleep and try[{}] again",
-        appId,
-        shuffleId,
-        partitionIds,
-        requireSize,
-        host,
-        port,
-        rpcResponse.getStatus(),
-        retry);
-    while (rpcResponse.getStatus() == RssProtos.StatusCode.NO_BUFFER
-        || rpcResponse.getStatus() == 
RssProtos.StatusCode.NO_BUFFER_FOR_HUGE_PARTITION) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Requiring buffer for appId: {}, shuffleId: {}, partitionIds: {} 
with {} bytes from {}:{}",
+          appId,
+          shuffleId,
+          partitionIds,
+          requireSize,
+          host,
+          port);
+    }
+    RequireBufferResponse rpcResponse;
+    while (true) {
+      try {
+        rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
+      } catch (Exception e) {
+        LOG.error(
+            "Exception happened when requiring pre-allocated buffer from 
{}:{}", host, port, e);
+        return result;
+      }
+      if (rpcResponse.getStatus() != RssProtos.StatusCode.NO_BUFFER
+          && rpcResponse.getStatus() != 
RssProtos.StatusCode.NO_BUFFER_FOR_HUGE_PARTITION) {
+        break;
+      }
       if (retry >= retryMax) {
         LOG.warn(
             "ShuffleServer "
@@ -270,15 +280,25 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
         return result;
       }
       try {
+        LOG.info(
+            "Can't require buffer for appId: {}, shuffleId: {}, partitionIds: 
{} with {} bytes from {}:{} due to {}, sleep and try[{}] again",
+            appId,
+            shuffleId,
+            partitionIds,
+            requireSize,
+            host,
+            port,
+            rpcResponse.getStatus(),
+            retry);
         long backoffTime =
             Math.min(
                 retryIntervalMax,
                 backOffBase * (1L << Math.min(retry, 16)) + 
random.nextInt(backOffBase));
         Thread.sleep(backoffTime);
       } catch (Exception e) {
-        LOG.warn("Exception happened when require pre allocation from " + host 
+ ":" + port, e);
+        LOG.warn(
+            "Exception happened when requiring pre-allocated buffer from 
{}:{}", host, port, e);
       }
-      rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
       retry++;
     }
     if (rpcResponse.getStatus() == RssProtos.StatusCode.SUCCESS) {

Reply via email to