This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a7cc7fe448b IGNITE-24744 Java client: fix 
ClientFutureUtils.doWithRetryAsync logic (#6992)
a7cc7fe448b is described below

commit a7cc7fe448b93ff9ddac1d1d55a68c19c9e96c1e
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Nov 18 07:59:45 2025 +0200

    IGNITE-24744 Java client: fix ClientFutureUtils.doWithRetryAsync logic 
(#6992)
    
    Remove `resultValidator` and simplify retry logic to fix "doWithRetry 
failed without exception" error.
---
 .../ignite/internal/client/ClientFutureUtils.java  | 33 ++++++------
 .../ignite/internal/client/ReliableChannel.java    | 32 ++----------
 .../org/apache/ignite/client/RetryPolicyTest.java  | 59 +++++++++++++++++++++-
 .../internal/client/ClientFutureUtilsTest.java     | 17 -------
 4 files changed, 78 insertions(+), 63 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
index 0656e725dd9..ca13ca608dd 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
@@ -37,55 +37,54 @@ class ClientFutureUtils {
 
     static <T> CompletableFuture<T> doWithRetryAsync(
             Supplier<CompletableFuture<T>> func,
-            @Nullable Predicate<T> resultValidator,
             Predicate<RetryContext> retryPredicate) {
         CompletableFuture<T> resFut = new CompletableFuture<>();
         RetryContext ctx = new RetryContext();
 
-        doWithRetryAsync(func, resultValidator, retryPredicate, resFut, ctx);
+        doWithRetryAsync(func, retryPredicate, resFut, ctx);
 
         return resFut;
     }
 
     private static <T> void doWithRetryAsync(
             Supplier<CompletableFuture<T>> func,
-            @Nullable Predicate<T> validator,
             Predicate<RetryContext> retryPredicate,
             CompletableFuture<T> resFut,
             RetryContext ctx) {
         func.get().whenComplete((res, err) -> {
             try {
-                if (err == null && (validator == null || validator.test(res))) 
{
+                if (err == null) {
                     resFut.complete(res);
                     return;
                 }
 
-                if (err != null) {
+                Throwable resErr = null;
+
+                // This code is executed by different threads, but not 
concurrently.
+                // Use synchronized block to modify ctx for simplicity 
(instead of volatile).
+                synchronized (ctx) {
                     if (ctx.errors == null) {
                         ctx.errors = new ArrayList<>();
                     }
 
                     ctx.errors.add(err);
-                }
 
-                if (retryPredicate.test(ctx)) {
-                    ctx.attempt++;
-
-                    doWithRetryAsync(func, validator, retryPredicate, resFut, 
ctx);
-                } else {
-                    if (ctx.errors == null || ctx.errors.isEmpty()) {
-                        // Should not happen.
-                        resFut.completeExceptionally(new 
IllegalStateException("doWithRetry failed without exception"));
+                    if (retryPredicate.test(ctx)) {
+                        ctx.attempt++;
                     } else {
-                        var resErr = ctx.errors.get(0);
+                        resErr = ctx.errors.get(0);
 
                         for (int i = 1; i < ctx.errors.size(); i++) {
                             resErr.addSuppressed(ctx.errors.get(i));
                         }
-
-                        resFut.completeExceptionally(resErr);
                     }
                 }
+
+                if (resErr != null) {
+                    resFut.completeExceptionally(resErr);
+                } else {
+                    doWithRetryAsync(func, retryPredicate, resFut, ctx);
+                }
             } catch (Throwable t) {
                 resFut.completeExceptionally(t);
             }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index cbe8b7a2160..8c556ef4f94 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.delayedExecutor;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.ExceptionUtils.hasCauseOrSuppressed;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
@@ -266,7 +267,6 @@ public final class ReliableChannel implements AutoCloseable 
{
         return ClientFutureUtils.doWithRetryAsync(
                 () -> getChannelAsync(preferredNodeName)
                         .thenCompose(ch -> serviceAsyncInternal(opCode, 
payloadWriter, payloadReader, expectNotifications, ch)),
-                null,
                 ctx -> shouldRetry(opCode, ctx, retryPolicyOverride));
     }
 
@@ -292,7 +292,6 @@ public final class ReliableChannel implements AutoCloseable 
{
         return ClientFutureUtils.doWithRetryAsync(
                 () -> channelResolver.get()
                         .thenCompose(ch -> serviceAsyncInternal(opCode, 
payloadWriter, payloadReader, expectNotifications, ch)),
-                null,
                 ctx -> shouldRetry(opCode, ctx, retryPolicyOverride));
     }
 
@@ -318,7 +317,6 @@ public final class ReliableChannel implements AutoCloseable 
{
                             int opCode = opCodeFunc.applyAsInt(ch);
                             return serviceAsyncInternal(opCode, payloadWriter, 
payloadReader, false, ch);
                         }),
-                null,
                 ctx -> shouldRetry(retryOpType, ctx, null));
     }
 
@@ -664,31 +662,9 @@ public final class ReliableChannel implements 
AutoCloseable {
 
                     return hld.getOrCreateChannelAsync();
                 },
-                Objects::nonNull,
                 ctx -> shouldRetry(ClientOperationType.CHANNEL_CONNECT, ctx, 
null));
     }
 
-    private CompletableFuture<ClientChannel> getCurChannelAsync() {
-        if (closed) {
-            return CompletableFuture.failedFuture(new 
IgniteClientConnectionException(CONNECTION_ERR, "ReliableChannel is closed", 
null));
-        }
-
-        curChannelsGuard.readLock().lock();
-
-        try {
-            var hld = channels.get(defaultChIdx);
-
-            if (hld == null) {
-                return nullCompletedFuture();
-            }
-
-            CompletableFuture<ClientChannel> fut = 
hld.getOrCreateChannelAsync();
-            return fut == null ? nullCompletedFuture() : fut;
-        } finally {
-            curChannelsGuard.readLock().unlock();
-        }
-    }
-
     /** Determines whether specified operation should be retried. */
     private boolean shouldRetry(int opCode, ClientFutureUtils.RetryContext 
ctx, @Nullable RetryPolicy retryPolicyOverride) {
         ClientOperationType opType = 
ClientUtils.opCodeToClientOperationType(opCode);
@@ -856,7 +832,8 @@ public final class ReliableChannel implements AutoCloseable 
{
          */
         private CompletableFuture<ClientChannel> getOrCreateChannelAsync() {
             if (close) {
-                return nullCompletedFuture();
+                return failedFuture(
+                        new IgniteClientConnectionException(CONNECTION_ERR, 
"Channel is closed", chCfg.getAddress().toString()));
             }
 
             var chFut0 = chFut;
@@ -867,7 +844,8 @@ public final class ReliableChannel implements AutoCloseable 
{
 
             synchronized (this) {
                 if (close) {
-                    return nullCompletedFuture();
+                    return failedFuture(
+                            new 
IgniteClientConnectionException(CONNECTION_ERR, "Channel is closed", 
chCfg.getAddress().toString()));
                 }
 
                 chFut0 = chFut;
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 573f0915b02..98e284e3d0f 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.client;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -54,9 +55,13 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest {
 
     private TestServer server;
 
+    private TestServer server2;
+
+    private TestServer server3;
+
     @AfterEach
     void tearDown() throws Exception {
-        closeAll(server);
+        closeAll(server, server2, server3);
     }
 
     @Test
@@ -276,6 +281,52 @@ public class RetryPolicyTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    @Test
+    public void testUnstableCluster() {
+        UUID clusterId = UUID.randomUUID();
+
+        server = createServer(reqId -> reqId % 2 == 0, clusterId);
+        server2 = createServer(reqId -> reqId % 2 != 0, clusterId);
+        server3 = createServer(reqId -> reqId % 3 == 0, clusterId);
+
+        var clientBuilder = IgniteClient.builder()
+                .addresses("127.0.0.1:" + server.port(), "127.0.0.1:" + 
server2.port(), "127.0.0.1:" + server3.port())
+                .loggerFactory(new 
ConsoleLoggerFactory("testUnstableCluster"));
+
+        try (var client = clientBuilder.build()) {
+            for (int i = 0; i < ITER; i++) {
+                assertDoesNotThrow(() -> client.tables().tables());
+            }
+        }
+    }
+
+    @Test
+    public void testNodesLeave() {
+        UUID clusterId = UUID.randomUUID();
+
+        server = createServer(reqId -> false, clusterId);
+        server2 = createServer(reqId -> false, clusterId);
+        server3 = createServer(reqId -> false, clusterId);
+
+        var clientBuilder = IgniteClient.builder()
+                .addresses("127.0.0.1:" + server.port(), "127.0.0.1:" + 
server2.port(), "127.0.0.1:" + server3.port())
+                .loggerFactory(new 
ConsoleLoggerFactory("testUnstableCluster"));
+
+        try (var client = clientBuilder.build()) {
+            for (int i = 0; i < ITER; i++) {
+                assertDoesNotThrow(() -> client.tables().tables());
+
+                if (i == ITER / 3) {
+                    server2.close();
+                }
+
+                if (i == 2 * ITER / 3) {
+                    server3.close();
+                }
+            }
+        }
+    }
+
     private IgniteClient getClient(@Nullable RetryPolicy retryPolicy) {
         return getClient(retryPolicy, null);
     }
@@ -289,9 +340,13 @@ public class RetryPolicyTest extends 
BaseIgniteAbstractTest {
     }
 
     private void initServer(Function<Integer, Boolean> shouldDropConnection) {
+        server = createServer(shouldDropConnection, UUID.randomUUID());
+    }
+
+    private static TestServer createServer(Function<Integer, Boolean> 
shouldDropConnection, UUID clusterId) {
         FakeIgnite ign = new FakeIgnite();
         ((FakeIgniteTables) ign.tables()).createTable("t");
 
-        server = new TestServer(0, ign, shouldDropConnection, null, null, 
UUID.randomUUID(), null, null);
+        return new TestServer(0, ign, shouldDropConnection, null, null, 
clusterId, null, null);
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
 
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
index 6774963985f..6920b10fd04 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.concurrent.CompletableFuture;
@@ -45,32 +44,18 @@ public class ClientFutureUtilsTest {
     public void testDoWithRetryAsyncWithCompletedFutureReturnsResult() {
         var res = ClientFutureUtils.doWithRetryAsync(
                 () -> CompletableFuture.completedFuture("test"),
-                null,
                 ctx -> false
         ).join();
 
         assertEquals("test", res);
     }
 
-    @Test
-    public void 
testDoWithRetryAsyncWithResultValidatorRejectsAllThrowsIllegalState() {
-        var fut = ClientFutureUtils.doWithRetryAsync(
-                () -> CompletableFuture.completedFuture("test"),
-                x -> false,
-                ctx -> false
-        );
-
-        var ex = assertThrows(CompletionException.class, fut::join);
-        assertSame(IllegalStateException.class, ex.getCause().getClass());
-    }
-
     @Test
     public void 
testDoWithRetryAsyncWithFailedFutureReturnsExceptionWithSuppressedList() {
         var counter = new AtomicInteger();
 
         var fut = ClientFutureUtils.doWithRetryAsync(
                 () -> CompletableFuture.failedFuture(new Exception("fail_" + 
counter.get())),
-                null,
                 ctx -> counter.incrementAndGet() < 3
         );
 
@@ -92,7 +77,6 @@ public class ClientFutureUtilsTest {
                 () -> counter.getAndIncrement() < 3
                         ? CompletableFuture.failedFuture(new Exception("fail"))
                         : CompletableFuture.completedFuture("test"),
-                null,
                 ctx -> {
                     assertNotNull(ctx.lastError());
 
@@ -118,7 +102,6 @@ public class ClientFutureUtilsTest {
                         return CompletableFuture.failedFuture(new 
Exception("fail2"));
                     }
                 },
-                null,
                 ctx -> true
         );
 

Reply via email to