This is an automated email from the ASF dual-hosted git repository. xianjingfeng pushed a commit to branch branch-0.8 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit e7f446ea6257d12bb8fd75a4525c8c1b04a200a3 Author: Junfan Zhang <[email protected]> AuthorDate: Fri Dec 8 18:06:13 2023 +0800 [#1267] fix(client): fast fail without retry when oom occurs (#1344) ### What changes were proposed in this pull request? fast fail without retry when oom occurs ### Why are the changes needed? Fix: #1267 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs (cherry picked from commit eb88143856d9eb212ed31d3c167ed880f3cf2e2d) --- .../org/apache/uniffle/common/util/RetryUtils.java | 26 +++++++++++++++++----- .../apache/uniffle/common/util/RetryUtilsTest.java | 25 +++++++++++++++++++++ .../client/impl/grpc/ShuffleServerGrpcClient.java | 6 +++-- .../impl/grpc/ShuffleServerGrpcNettyClient.java | 6 +++-- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java index 3d45e5cca..d3134b48e 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java @@ -18,6 +18,7 @@ package org.apache.uniffle.common.util; import java.util.Set; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,23 +58,38 @@ public class RetryUtils { int retryTimes, Set<Class<? extends Throwable>> exceptionClasses) throws Throwable { + return retryWithCondition( + cmd, + callBack, + intervalMs, + retryTimes, + t -> + (exceptionClasses != null && isInstanceOf(exceptionClasses, t)) + || !(t instanceof NotRetryException)); + } + + public static <T> T retryWithCondition( + RetryCmd<T> cmd, + RetryCallBack callBack, + long intervalMs, + int retryTimes, + Function<Throwable, Boolean> isRetryFunc) + throws Throwable { int retry = 0; while (true) { try { return cmd.execute(); } catch (Throwable t) { retry++; - if ((exceptionClasses != null && !isInstanceOf(exceptionClasses, t)) - || retry >= retryTimes - || t instanceof NotRetryException) { - throw t; - } else { + if (isRetryFunc.apply(t) && retry < retryTimes) { LOG.info("Retry due to Throwable, " + t.getClass().getName() + " " + t.getMessage()); LOG.info("Waiting " + intervalMs + " milliseconds before next connection attempt."); Thread.sleep(intervalMs); if (callBack != null) { callBack.execute(); } + } else { + throw t; } } } diff --git a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java index fc5c8befd..d42fac20e 100644 --- a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java +++ b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java @@ -28,6 +28,31 @@ import org.apache.uniffle.common.exception.RssException; import static org.junit.jupiter.api.Assertions.assertEquals; public class RetryUtilsTest { + + @Test + public void testRetryWithCondition() { + AtomicInteger tryTimes = new AtomicInteger(); + AtomicInteger callbackTime = new AtomicInteger(); + try { + RetryUtils.retryWithCondition( + () -> { + tryTimes.incrementAndGet(); + throw new RssException(""); + }, + () -> { + callbackTime.incrementAndGet(); + }, + 10, + 3, + (t) -> true); + } catch (Throwable throwable) { + // ignore + } + + assertEquals(tryTimes.get(), 3); + assertEquals(callbackTime.get(), 2); + } + @Test public void testRetry() { AtomicInteger tryTimes = new AtomicInteger(); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index b7f976ead..5780e00ee 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -394,7 +394,7 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer final int allocateSize = size; final int finalBlockNum = blockNum; try { - RetryUtils.retry( + RetryUtils.retryWithCondition( () -> { long requireId = requirePreAllocation( @@ -450,8 +450,10 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer } return response; }, + null, request.getRetryIntervalMax(), - maxRetryAttempts); + maxRetryAttempts, + t -> !(t instanceof OutOfMemoryError)); } catch (Throwable throwable) { LOG.warn(throwable.getMessage()); isSuccessful = false; diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index 2fe15aae7..3d9ae4b98 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -90,7 +90,7 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { int allocateSize = size; int finalBlockNum = blockNum; try { - RetryUtils.retry( + RetryUtils.retryWithCondition( () -> { TransportClient transportClient = getTransportClient(); long requireId = @@ -147,8 +147,10 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { } return rpcResponse; }, + null, request.getRetryIntervalMax(), - maxRetryAttempts); + maxRetryAttempts, + t -> !(t instanceof OutOfMemoryError)); } catch (Throwable throwable) { LOG.warn(throwable.getMessage()); isSuccessful = false;
