This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a7cc7fe448b IGNITE-24744 Java client: fix
ClientFutureUtils.doWithRetryAsync logic (#6992)
a7cc7fe448b is described below
commit a7cc7fe448b93ff9ddac1d1d55a68c19c9e96c1e
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Nov 18 07:59:45 2025 +0200
IGNITE-24744 Java client: fix ClientFutureUtils.doWithRetryAsync logic
(#6992)
Remove `resultValidator` and simplify retry logic to fix "doWithRetry
failed without exception" error.
---
.../ignite/internal/client/ClientFutureUtils.java | 33 ++++++------
.../ignite/internal/client/ReliableChannel.java | 32 ++----------
.../org/apache/ignite/client/RetryPolicyTest.java | 59 +++++++++++++++++++++-
.../internal/client/ClientFutureUtilsTest.java | 17 -------
4 files changed, 78 insertions(+), 63 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
index 0656e725dd9..ca13ca608dd 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
@@ -37,55 +37,54 @@ class ClientFutureUtils {
static <T> CompletableFuture<T> doWithRetryAsync(
Supplier<CompletableFuture<T>> func,
- @Nullable Predicate<T> resultValidator,
Predicate<RetryContext> retryPredicate) {
CompletableFuture<T> resFut = new CompletableFuture<>();
RetryContext ctx = new RetryContext();
- doWithRetryAsync(func, resultValidator, retryPredicate, resFut, ctx);
+ doWithRetryAsync(func, retryPredicate, resFut, ctx);
return resFut;
}
private static <T> void doWithRetryAsync(
Supplier<CompletableFuture<T>> func,
- @Nullable Predicate<T> validator,
Predicate<RetryContext> retryPredicate,
CompletableFuture<T> resFut,
RetryContext ctx) {
func.get().whenComplete((res, err) -> {
try {
- if (err == null && (validator == null || validator.test(res)))
{
+ if (err == null) {
resFut.complete(res);
return;
}
- if (err != null) {
+ Throwable resErr = null;
+
+ // This code is executed by different threads, but not
concurrently.
+ // Use synchronized block to modify ctx for simplicity
(instead of volatile).
+ synchronized (ctx) {
if (ctx.errors == null) {
ctx.errors = new ArrayList<>();
}
ctx.errors.add(err);
- }
- if (retryPredicate.test(ctx)) {
- ctx.attempt++;
-
- doWithRetryAsync(func, validator, retryPredicate, resFut,
ctx);
- } else {
- if (ctx.errors == null || ctx.errors.isEmpty()) {
- // Should not happen.
- resFut.completeExceptionally(new
IllegalStateException("doWithRetry failed without exception"));
+ if (retryPredicate.test(ctx)) {
+ ctx.attempt++;
} else {
- var resErr = ctx.errors.get(0);
+ resErr = ctx.errors.get(0);
for (int i = 1; i < ctx.errors.size(); i++) {
resErr.addSuppressed(ctx.errors.get(i));
}
-
- resFut.completeExceptionally(resErr);
}
}
+
+ if (resErr != null) {
+ resFut.completeExceptionally(resErr);
+ } else {
+ doWithRetryAsync(func, retryPredicate, resFut, ctx);
+ }
} catch (Throwable t) {
resFut.completeExceptionally(t);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index cbe8b7a2160..8c556ef4f94 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.delayedExecutor;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.ExceptionUtils.hasCauseOrSuppressed;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
@@ -266,7 +267,6 @@ public final class ReliableChannel implements AutoCloseable
{
return ClientFutureUtils.doWithRetryAsync(
() -> getChannelAsync(preferredNodeName)
.thenCompose(ch -> serviceAsyncInternal(opCode,
payloadWriter, payloadReader, expectNotifications, ch)),
- null,
ctx -> shouldRetry(opCode, ctx, retryPolicyOverride));
}
@@ -292,7 +292,6 @@ public final class ReliableChannel implements AutoCloseable
{
return ClientFutureUtils.doWithRetryAsync(
() -> channelResolver.get()
.thenCompose(ch -> serviceAsyncInternal(opCode,
payloadWriter, payloadReader, expectNotifications, ch)),
- null,
ctx -> shouldRetry(opCode, ctx, retryPolicyOverride));
}
@@ -318,7 +317,6 @@ public final class ReliableChannel implements AutoCloseable
{
int opCode = opCodeFunc.applyAsInt(ch);
return serviceAsyncInternal(opCode, payloadWriter,
payloadReader, false, ch);
}),
- null,
ctx -> shouldRetry(retryOpType, ctx, null));
}
@@ -664,31 +662,9 @@ public final class ReliableChannel implements
AutoCloseable {
return hld.getOrCreateChannelAsync();
},
- Objects::nonNull,
ctx -> shouldRetry(ClientOperationType.CHANNEL_CONNECT, ctx,
null));
}
- private CompletableFuture<ClientChannel> getCurChannelAsync() {
- if (closed) {
- return CompletableFuture.failedFuture(new
IgniteClientConnectionException(CONNECTION_ERR, "ReliableChannel is closed",
null));
- }
-
- curChannelsGuard.readLock().lock();
-
- try {
- var hld = channels.get(defaultChIdx);
-
- if (hld == null) {
- return nullCompletedFuture();
- }
-
- CompletableFuture<ClientChannel> fut =
hld.getOrCreateChannelAsync();
- return fut == null ? nullCompletedFuture() : fut;
- } finally {
- curChannelsGuard.readLock().unlock();
- }
- }
-
/** Determines whether specified operation should be retried. */
private boolean shouldRetry(int opCode, ClientFutureUtils.RetryContext
ctx, @Nullable RetryPolicy retryPolicyOverride) {
ClientOperationType opType =
ClientUtils.opCodeToClientOperationType(opCode);
@@ -856,7 +832,8 @@ public final class ReliableChannel implements AutoCloseable
{
*/
private CompletableFuture<ClientChannel> getOrCreateChannelAsync() {
if (close) {
- return nullCompletedFuture();
+ return failedFuture(
+ new IgniteClientConnectionException(CONNECTION_ERR,
"Channel is closed", chCfg.getAddress().toString()));
}
var chFut0 = chFut;
@@ -867,7 +844,8 @@ public final class ReliableChannel implements AutoCloseable
{
synchronized (this) {
if (close) {
- return nullCompletedFuture();
+ return failedFuture(
+ new
IgniteClientConnectionException(CONNECTION_ERR, "Channel is closed",
chCfg.getAddress().toString()));
}
chFut0 = chFut;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 573f0915b02..98e284e3d0f 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.client;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -54,9 +55,13 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest {
private TestServer server;
+ private TestServer server2;
+
+ private TestServer server3;
+
@AfterEach
void tearDown() throws Exception {
- closeAll(server);
+ closeAll(server, server2, server3);
}
@Test
@@ -276,6 +281,52 @@ public class RetryPolicyTest extends
BaseIgniteAbstractTest {
}
}
+ @Test
+ public void testUnstableCluster() {
+ UUID clusterId = UUID.randomUUID();
+
+ server = createServer(reqId -> reqId % 2 == 0, clusterId);
+ server2 = createServer(reqId -> reqId % 2 != 0, clusterId);
+ server3 = createServer(reqId -> reqId % 3 == 0, clusterId);
+
+ var clientBuilder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + server.port(), "127.0.0.1:" +
server2.port(), "127.0.0.1:" + server3.port())
+ .loggerFactory(new
ConsoleLoggerFactory("testUnstableCluster"));
+
+ try (var client = clientBuilder.build()) {
+ for (int i = 0; i < ITER; i++) {
+ assertDoesNotThrow(() -> client.tables().tables());
+ }
+ }
+ }
+
+ @Test
+ public void testNodesLeave() {
+ UUID clusterId = UUID.randomUUID();
+
+ server = createServer(reqId -> false, clusterId);
+ server2 = createServer(reqId -> false, clusterId);
+ server3 = createServer(reqId -> false, clusterId);
+
+ var clientBuilder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + server.port(), "127.0.0.1:" +
server2.port(), "127.0.0.1:" + server3.port())
+ .loggerFactory(new
ConsoleLoggerFactory("testUnstableCluster"));
+
+ try (var client = clientBuilder.build()) {
+ for (int i = 0; i < ITER; i++) {
+ assertDoesNotThrow(() -> client.tables().tables());
+
+ if (i == ITER / 3) {
+ server2.close();
+ }
+
+ if (i == 2 * ITER / 3) {
+ server3.close();
+ }
+ }
+ }
+ }
+
private IgniteClient getClient(@Nullable RetryPolicy retryPolicy) {
return getClient(retryPolicy, null);
}
@@ -289,9 +340,13 @@ public class RetryPolicyTest extends
BaseIgniteAbstractTest {
}
private void initServer(Function<Integer, Boolean> shouldDropConnection) {
+ server = createServer(shouldDropConnection, UUID.randomUUID());
+ }
+
+ private static TestServer createServer(Function<Integer, Boolean>
shouldDropConnection, UUID clusterId) {
FakeIgnite ign = new FakeIgnite();
((FakeIgniteTables) ign.tables()).createTable("t");
- server = new TestServer(0, ign, shouldDropConnection, null, null,
UUID.randomUUID(), null, null);
+ return new TestServer(0, ign, shouldDropConnection, null, null,
clusterId, null, null);
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
index 6774963985f..6920b10fd04 100644
---
a/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.concurrent.CompletableFuture;
@@ -45,32 +44,18 @@ public class ClientFutureUtilsTest {
public void testDoWithRetryAsyncWithCompletedFutureReturnsResult() {
var res = ClientFutureUtils.doWithRetryAsync(
() -> CompletableFuture.completedFuture("test"),
- null,
ctx -> false
).join();
assertEquals("test", res);
}
- @Test
- public void
testDoWithRetryAsyncWithResultValidatorRejectsAllThrowsIllegalState() {
- var fut = ClientFutureUtils.doWithRetryAsync(
- () -> CompletableFuture.completedFuture("test"),
- x -> false,
- ctx -> false
- );
-
- var ex = assertThrows(CompletionException.class, fut::join);
- assertSame(IllegalStateException.class, ex.getCause().getClass());
- }
-
@Test
public void
testDoWithRetryAsyncWithFailedFutureReturnsExceptionWithSuppressedList() {
var counter = new AtomicInteger();
var fut = ClientFutureUtils.doWithRetryAsync(
() -> CompletableFuture.failedFuture(new Exception("fail_" +
counter.get())),
- null,
ctx -> counter.incrementAndGet() < 3
);
@@ -92,7 +77,6 @@ public class ClientFutureUtilsTest {
() -> counter.getAndIncrement() < 3
? CompletableFuture.failedFuture(new Exception("fail"))
: CompletableFuture.completedFuture("test"),
- null,
ctx -> {
assertNotNull(ctx.lastError());
@@ -118,7 +102,6 @@ public class ClientFutureUtilsTest {
return CompletableFuture.failedFuture(new
Exception("fail2"));
}
},
- null,
ctx -> true
);