This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dd091a9ab1a IGNITE-26719 Java client: fix async continuation executor 
(#6862)
dd091a9ab1a is described below

commit dd091a9ab1a70cae3f9ab0d38cc5b90c7293e2bc
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Oct 29 20:01:20 2025 +0200

    IGNITE-26719 Java client: fix async continuation executor (#6862)
    
    Fix `completeAsync` logic to always complete outer future using the 
`asyncContinuationExecutor`.
    
    Before that, we relied on `thenCompose` which could keep us in the Netty 
thread in some cases - `testDefaultAsyncContinuationExecutorIsForkJoinPool` was 
flaky (reproducible locally).
---
 .../ignite/internal/client/TcpClientChannel.java   | 69 ++++++++++++++--------
 1 file changed, 46 insertions(+), 23 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index bda28cdd2b9..9e19da2613c 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
@@ -424,11 +425,14 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             }
 
             // Handle the response in the async continuation pool with 
completeAsync.
-            return fut
-                    .thenCompose(unpacker -> completeAsync(payloadReader, 
notificationFut, unpacker))
-                    .exceptionally(err -> {
-                        throw 
sneakyThrow(ViewUtils.ensurePublicException(err));
-                    });
+            CompletableFuture<T> resFut = new CompletableFuture<>();
+
+            fut.handle((unpacker, err) -> {
+                completeAsync(payloadReader, notificationFut, unpacker, err, 
resFut);
+                return null;
+            });
+
+            return resFut;
         } catch (Throwable t) {
             log.warn("Failed to send request [id=" + id + ", op=" + opCode + 
", remoteAddress=" + cfg.getAddress() + "]: "
                     + t.getMessage(), t);
@@ -443,14 +447,28 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         }
     }
 
-    private <T> CompletableFuture<T> completeAsync(
+    private <T> void completeAsync(
             @Nullable PayloadReader<T> payloadReader,
             @Nullable CompletableFuture<PayloadInputChannel> notificationFut,
-            ClientMessageUnpacker unpacker
+            ClientMessageUnpacker unpacker,
+            @Nullable Throwable err,
+            CompletableFuture<T> resFut
     ) {
-        try {
-            CompletableFuture<T> resFut = new CompletableFuture<>();
+        if (err != null) {
+            assert unpacker == null : "unpacker must be null if err is not 
null";
+
+            try {
+                asyncContinuationExecutor.execute(() -> 
resFut.completeExceptionally(ViewUtils.ensurePublicException(err)));
+            } catch (Throwable execError) {
+                // Executor error, complete directly.
+                execError.addSuppressed(err);
+                
resFut.completeExceptionally(ViewUtils.ensurePublicException(execError));
+            }
+
+            return;
+        }
 
+        try {
             // Use asyncContinuationExecutor explicitly to close unpacker if 
the executor throws.
             // With handleAsync et al we can't close the unpacker in that case.
             asyncContinuationExecutor.execute(() -> {
@@ -460,11 +478,11 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                     
resFut.completeExceptionally(ViewUtils.ensurePublicException(t));
                 }
             });
-
-            return resFut;
-        } catch (Throwable t) {
+        } catch (Throwable execErr) {
             unpacker.close();
-            return failedFuture(ViewUtils.ensurePublicException(t));
+
+            // Executor error, complete directly.
+            
resFut.completeExceptionally(ViewUtils.ensurePublicException(execErr));
         }
     }
 
@@ -669,16 +687,21 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             }
         });
 
-        return fut
-                .thenCompose(unpacker -> completeAsync(r -> 
handshakeRes(r.in()), null, unpacker))
-                .exceptionally(err -> {
-                    if (err instanceof TimeoutException || err.getCause() 
instanceof TimeoutException) {
-                        metrics.handshakesFailedTimeoutIncrement();
-                        throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout", 
endpoint(), err);
-                    }
-                    metrics.handshakesFailedIncrement();
-                    throw new IgniteClientConnectionException(CONNECTION_ERR, 
"Handshake error", endpoint(), err);
-                });
+        CompletableFuture<Object> resFut = new CompletableFuture<>();
+
+        fut.handle((unpacker, err) -> {
+            completeAsync(r -> handshakeRes(r.in()), null, unpacker, err, 
resFut);
+            return null;
+        });
+
+        return resFut.exceptionally(err -> {
+            if (unwrapRootCause(err) instanceof TimeoutException) {
+                metrics.handshakesFailedTimeoutIncrement();
+                throw new IgniteClientConnectionException(CONNECTION_ERR, 
"Handshake timeout", endpoint(), err);
+            }
+            metrics.handshakesFailedIncrement();
+            throw new IgniteClientConnectionException(CONNECTION_ERR, 
"Handshake error", endpoint(), err);
+        });
     }
 
     /**

Reply via email to