This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new eb8814385 [#1267] fix(client): fast fail without retry when oom occurs 
(#1344)
eb8814385 is described below

commit eb88143856d9eb212ed31d3c167ed880f3cf2e2d
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Dec 8 18:06:13 2023 +0800

    [#1267] fix(client): fast fail without retry when oom occurs (#1344)
    
    ### What changes were proposed in this pull request?
    
     fast fail without retry when oom occurs
    
    ### Why are the changes needed?
    
    Fix: #1267
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs
---
 .../org/apache/uniffle/common/util/RetryUtils.java | 26 +++++++++++++++++-----
 .../apache/uniffle/common/util/RetryUtilsTest.java | 25 +++++++++++++++++++++
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |  6 +++--
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    |  6 +++--
 4 files changed, 54 insertions(+), 9 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
index 3d45e5cca..d3134b48e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.common.util;
 
 import java.util.Set;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,23 +58,38 @@ public class RetryUtils {
       int retryTimes,
       Set<Class<? extends Throwable>> exceptionClasses)
       throws Throwable {
+    return retryWithCondition(
+        cmd,
+        callBack,
+        intervalMs,
+        retryTimes,
+        t ->
+            (exceptionClasses != null && isInstanceOf(exceptionClasses, t))
+                || !(t instanceof NotRetryException));
+  }
+
+  public static <T> T retryWithCondition(
+      RetryCmd<T> cmd,
+      RetryCallBack callBack,
+      long intervalMs,
+      int retryTimes,
+      Function<Throwable, Boolean> isRetryFunc)
+      throws Throwable {
     int retry = 0;
     while (true) {
       try {
         return cmd.execute();
       } catch (Throwable t) {
         retry++;
-        if ((exceptionClasses != null && !isInstanceOf(exceptionClasses, t))
-            || retry >= retryTimes
-            || t instanceof NotRetryException) {
-          throw t;
-        } else {
+        if (isRetryFunc.apply(t) && retry < retryTimes) {
           LOG.info("Retry due to Throwable, " + t.getClass().getName() + " " + 
t.getMessage());
           LOG.info("Waiting " + intervalMs + " milliseconds before next 
connection attempt.");
           Thread.sleep(intervalMs);
           if (callBack != null) {
             callBack.execute();
           }
+        } else {
+          throw t;
         }
       }
     }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
index fc5c8befd..d42fac20e 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
@@ -28,6 +28,31 @@ import org.apache.uniffle.common.exception.RssException;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class RetryUtilsTest {
+
+  @Test
+  public void testRetryWithCondition() {
+    AtomicInteger tryTimes = new AtomicInteger();
+    AtomicInteger callbackTime = new AtomicInteger();
+    try {
+      RetryUtils.retryWithCondition(
+          () -> {
+            tryTimes.incrementAndGet();
+            throw new RssException("");
+          },
+          () -> {
+            callbackTime.incrementAndGet();
+          },
+          10,
+          3,
+          (t) -> true);
+    } catch (Throwable throwable) {
+      // ignore
+    }
+
+    assertEquals(tryTimes.get(), 3);
+    assertEquals(callbackTime.get(), 2);
+  }
+
   @Test
   public void testRetry() {
     AtomicInteger tryTimes = new AtomicInteger();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index e36d18528..debb7ed24 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -398,7 +398,7 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
       final int allocateSize = size;
       final int finalBlockNum = blockNum;
       try {
-        RetryUtils.retry(
+        RetryUtils.retryWithCondition(
             () -> {
               long requireId =
                   requirePreAllocation(
@@ -456,8 +456,10 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
               }
               return response;
             },
+            null,
             request.getRetryIntervalMax(),
-            maxRetryAttempts);
+            maxRetryAttempts,
+            t -> !(t instanceof OutOfMemoryError));
       } catch (Throwable throwable) {
         LOG.warn(throwable.getMessage());
         isSuccessful = false;
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 9230a49e5..3a98b40e0 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -90,7 +90,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
       int allocateSize = size;
       int finalBlockNum = blockNum;
       try {
-        RetryUtils.retry(
+        RetryUtils.retryWithCondition(
             () -> {
               TransportClient transportClient = getTransportClient();
               long requireId =
@@ -149,8 +149,10 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
               }
               return rpcResponse;
             },
+            null,
             request.getRetryIntervalMax(),
-            maxRetryAttempts);
+            maxRetryAttempts,
+            t -> !(t instanceof OutOfMemoryError));
       } catch (Throwable throwable) {
         LOG.warn(throwable.getMessage());
         isSuccessful = false;

Reply via email to