This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dd091a9ab1a IGNITE-26719 Java client: fix async continuation executor
(#6862)
dd091a9ab1a is described below
commit dd091a9ab1a70cae3f9ab0d38cc5b90c7293e2bc
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Oct 29 20:01:20 2025 +0200
IGNITE-26719 Java client: fix async continuation executor (#6862)
Fix `completeAsync` logic to always complete outer future using the
`asyncContinuationExecutor`.
Before that, we relied on `thenCompose` which could keep us in the Netty
thread in some cases - `testDefaultAsyncContinuationExecutorIsForkJoinPool` was
flaky (reproducible locally).
---
.../ignite/internal/client/TcpClientChannel.java | 69 ++++++++++++++--------
1 file changed, 46 insertions(+), 23 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index bda28cdd2b9..9e19da2613c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.ExceptionUtils.copyExceptionWithCause;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
@@ -424,11 +425,14 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
// Handle the response in the async continuation pool with
completeAsync.
- return fut
- .thenCompose(unpacker -> completeAsync(payloadReader,
notificationFut, unpacker))
- .exceptionally(err -> {
- throw
sneakyThrow(ViewUtils.ensurePublicException(err));
- });
+ CompletableFuture<T> resFut = new CompletableFuture<>();
+
+ fut.handle((unpacker, err) -> {
+ completeAsync(payloadReader, notificationFut, unpacker, err,
resFut);
+ return null;
+ });
+
+ return resFut;
} catch (Throwable t) {
log.warn("Failed to send request [id=" + id + ", op=" + opCode +
", remoteAddress=" + cfg.getAddress() + "]: "
+ t.getMessage(), t);
@@ -443,14 +447,28 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
}
- private <T> CompletableFuture<T> completeAsync(
+ private <T> void completeAsync(
@Nullable PayloadReader<T> payloadReader,
@Nullable CompletableFuture<PayloadInputChannel> notificationFut,
- ClientMessageUnpacker unpacker
+ ClientMessageUnpacker unpacker,
+ @Nullable Throwable err,
+ CompletableFuture<T> resFut
) {
- try {
- CompletableFuture<T> resFut = new CompletableFuture<>();
+ if (err != null) {
+ assert unpacker == null : "unpacker must be null if err is not
null";
+
+ try {
+ asyncContinuationExecutor.execute(() ->
resFut.completeExceptionally(ViewUtils.ensurePublicException(err)));
+ } catch (Throwable execError) {
+ // Executor error, complete directly.
+ execError.addSuppressed(err);
+
resFut.completeExceptionally(ViewUtils.ensurePublicException(execError));
+ }
+
+ return;
+ }
+ try {
// Use asyncContinuationExecutor explicitly to close unpacker if
the executor throws.
// With handleAsync et al we can't close the unpacker in that case.
asyncContinuationExecutor.execute(() -> {
@@ -460,11 +478,11 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
resFut.completeExceptionally(ViewUtils.ensurePublicException(t));
}
});
-
- return resFut;
- } catch (Throwable t) {
+ } catch (Throwable execErr) {
unpacker.close();
- return failedFuture(ViewUtils.ensurePublicException(t));
+
+ // Executor error, complete directly.
+
resFut.completeExceptionally(ViewUtils.ensurePublicException(execErr));
}
}
@@ -669,16 +687,21 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
});
- return fut
- .thenCompose(unpacker -> completeAsync(r ->
handshakeRes(r.in()), null, unpacker))
- .exceptionally(err -> {
- if (err instanceof TimeoutException || err.getCause()
instanceof TimeoutException) {
- metrics.handshakesFailedTimeoutIncrement();
- throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout",
endpoint(), err);
- }
- metrics.handshakesFailedIncrement();
- throw new IgniteClientConnectionException(CONNECTION_ERR,
"Handshake error", endpoint(), err);
- });
+ CompletableFuture<Object> resFut = new CompletableFuture<>();
+
+ fut.handle((unpacker, err) -> {
+ completeAsync(r -> handshakeRes(r.in()), null, unpacker, err,
resFut);
+ return null;
+ });
+
+ return resFut.exceptionally(err -> {
+ if (unwrapRootCause(err) instanceof TimeoutException) {
+ metrics.handshakesFailedTimeoutIncrement();
+ throw new IgniteClientConnectionException(CONNECTION_ERR,
"Handshake timeout", endpoint(), err);
+ }
+ metrics.handshakesFailedIncrement();
+ throw new IgniteClientConnectionException(CONNECTION_ERR,
"Handshake error", endpoint(), err);
+ });
}
/**