This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new eb8814385 [#1267] fix(client): fast fail without retry when oom occurs
(#1344)
eb8814385 is described below
commit eb88143856d9eb212ed31d3c167ed880f3cf2e2d
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Dec 8 18:06:13 2023 +0800
[#1267] fix(client): fast fail without retry when oom occurs (#1344)
### What changes were proposed in this pull request?
fast fail without retry when oom occurs
### Why are the changes needed?
Fix: #1267
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs
---
.../org/apache/uniffle/common/util/RetryUtils.java | 26 +++++++++++++++++-----
.../apache/uniffle/common/util/RetryUtilsTest.java | 25 +++++++++++++++++++++
.../client/impl/grpc/ShuffleServerGrpcClient.java | 6 +++--
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 6 +++--
4 files changed, 54 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
index 3d45e5cca..d3134b48e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RetryUtils.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.common.util;
import java.util.Set;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,23 +58,38 @@ public class RetryUtils {
int retryTimes,
Set<Class<? extends Throwable>> exceptionClasses)
throws Throwable {
+ return retryWithCondition(
+ cmd,
+ callBack,
+ intervalMs,
+ retryTimes,
+ t ->
+ (exceptionClasses != null && isInstanceOf(exceptionClasses, t))
+ || !(t instanceof NotRetryException));
+ }
+
+ public static <T> T retryWithCondition(
+ RetryCmd<T> cmd,
+ RetryCallBack callBack,
+ long intervalMs,
+ int retryTimes,
+ Function<Throwable, Boolean> isRetryFunc)
+ throws Throwable {
int retry = 0;
while (true) {
try {
return cmd.execute();
} catch (Throwable t) {
retry++;
- if ((exceptionClasses != null && !isInstanceOf(exceptionClasses, t))
- || retry >= retryTimes
- || t instanceof NotRetryException) {
- throw t;
- } else {
+ if (isRetryFunc.apply(t) && retry < retryTimes) {
LOG.info("Retry due to Throwable, " + t.getClass().getName() + " " +
t.getMessage());
LOG.info("Waiting " + intervalMs + " milliseconds before next
connection attempt.");
Thread.sleep(intervalMs);
if (callBack != null) {
callBack.execute();
}
+ } else {
+ throw t;
}
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
index fc5c8befd..d42fac20e 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RetryUtilsTest.java
@@ -28,6 +28,31 @@ import org.apache.uniffle.common.exception.RssException;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RetryUtilsTest {
+
+ @Test
+ public void testRetryWithCondition() {
+ AtomicInteger tryTimes = new AtomicInteger();
+ AtomicInteger callbackTime = new AtomicInteger();
+ try {
+ RetryUtils.retryWithCondition(
+ () -> {
+ tryTimes.incrementAndGet();
+ throw new RssException("");
+ },
+ () -> {
+ callbackTime.incrementAndGet();
+ },
+ 10,
+ 3,
+ (t) -> true);
+ } catch (Throwable throwable) {
+ // ignore
+ }
+
+ assertEquals(tryTimes.get(), 3);
+ assertEquals(callbackTime.get(), 2);
+ }
+
@Test
public void testRetry() {
AtomicInteger tryTimes = new AtomicInteger();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index e36d18528..debb7ed24 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -398,7 +398,7 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
final int allocateSize = size;
final int finalBlockNum = blockNum;
try {
- RetryUtils.retry(
+ RetryUtils.retryWithCondition(
() -> {
long requireId =
requirePreAllocation(
@@ -456,8 +456,10 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
}
return response;
},
+ null,
request.getRetryIntervalMax(),
- maxRetryAttempts);
+ maxRetryAttempts,
+ t -> !(t instanceof OutOfMemoryError));
} catch (Throwable throwable) {
LOG.warn(throwable.getMessage());
isSuccessful = false;
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 9230a49e5..3a98b40e0 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -90,7 +90,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
int allocateSize = size;
int finalBlockNum = blockNum;
try {
- RetryUtils.retry(
+ RetryUtils.retryWithCondition(
() -> {
TransportClient transportClient = getTransportClient();
long requireId =
@@ -149,8 +149,10 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
}
return rpcResponse;
},
+ null,
request.getRetryIntervalMax(),
- maxRetryAttempts);
+ maxRetryAttempts,
+ t -> !(t instanceof OutOfMemoryError));
} catch (Throwable throwable) {
LOG.warn(throwable.getMessage());
isSuccessful = false;