This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 99c91ec013 IGNITE-15357 Java client async startup (#1660)
99c91ec013 is described below

commit 99c91ec01319ee5e51bb0212846a48fe363c258d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Feb 10 17:02:44 2023 +0200

    IGNITE-15357 Java client async startup (#1660)
    
    Implement fully non-blocking async startup / connect / reconnect logic for 
Java client.
---
 .../ignite/internal/client/ClientFutureUtils.java  | 106 ++++++
 .../apache/ignite/internal/client/ClientUtils.java |   3 +
 .../ignite/internal/client/ReliableChannel.java    | 369 ++++++++++-----------
 .../ignite/internal/client/TcpClientChannel.java   |  91 +++--
 .../ignite/internal/client/TcpIgniteClient.java    |   7 +-
 .../client/io/ClientConnectionMultiplexer.java     |   3 +-
 .../io/netty/NettyClientConnectionMultiplexer.java |  34 +-
 .../apache/ignite/client/ConfigurationTest.java    |  13 +-
 .../org/apache/ignite/client/ConnectionTest.java   |   4 +-
 .../org/apache/ignite/client/HeartbeatTest.java    |  19 +-
 .../org/apache/ignite/client/MultiClusterTest.java |   9 +-
 .../org/apache/ignite/client/ReconnectTest.java    |   2 +-
 .../org/apache/ignite/client/RetryPolicyTest.java  |   7 +-
 .../internal/client/ClientFutureUtilsTest.java     | 127 +++++++
 14 files changed, 538 insertions(+), 256 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
new file mode 100644
index 0000000000..0656e725dd
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future utils.
+ */
+class ClientFutureUtils {
+    static <T> @Nullable T getNowSafe(CompletableFuture<T> fut) {
+        try {
+            return fut.getNow(null);
+        } catch (Throwable e) {
+            return null;
+        }
+    }
+
+    static <T> CompletableFuture<T> doWithRetryAsync(
+            Supplier<CompletableFuture<T>> func,
+            @Nullable Predicate<T> resultValidator,
+            Predicate<RetryContext> retryPredicate) {
+        CompletableFuture<T> resFut = new CompletableFuture<>();
+        RetryContext ctx = new RetryContext();
+
+        doWithRetryAsync(func, resultValidator, retryPredicate, resFut, ctx);
+
+        return resFut;
+    }
+
+    private static <T> void doWithRetryAsync(
+            Supplier<CompletableFuture<T>> func,
+            @Nullable Predicate<T> validator,
+            Predicate<RetryContext> retryPredicate,
+            CompletableFuture<T> resFut,
+            RetryContext ctx) {
+        func.get().whenComplete((res, err) -> {
+            try {
+                if (err == null && (validator == null || validator.test(res))) 
{
+                    resFut.complete(res);
+                    return;
+                }
+
+                if (err != null) {
+                    if (ctx.errors == null) {
+                        ctx.errors = new ArrayList<>();
+                    }
+
+                    ctx.errors.add(err);
+                }
+
+                if (retryPredicate.test(ctx)) {
+                    ctx.attempt++;
+
+                    doWithRetryAsync(func, validator, retryPredicate, resFut, 
ctx);
+                } else {
+                    if (ctx.errors == null || ctx.errors.isEmpty()) {
+                        // Should not happen.
+                        resFut.completeExceptionally(new 
IllegalStateException("doWithRetry failed without exception"));
+                    } else {
+                        var resErr = ctx.errors.get(0);
+
+                        for (int i = 1; i < ctx.errors.size(); i++) {
+                            resErr.addSuppressed(ctx.errors.get(i));
+                        }
+
+                        resFut.completeExceptionally(resErr);
+                    }
+                }
+            } catch (Throwable t) {
+                resFut.completeExceptionally(t);
+            }
+        });
+    }
+
+    static class RetryContext {
+        int attempt;
+
+        @Nullable ArrayList<Throwable> errors;
+
+        @Nullable Throwable lastError() {
+            return errors == null || errors.isEmpty()
+                    ? null
+                    : errors.get(errors.size() - 1);
+        }
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index 3e0ba81d86..bb91080436 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.client.ClientOperationType;
 import org.apache.ignite.client.IgniteClientConfiguration;
@@ -47,6 +48,8 @@ public class ClientUtils {
             throw IgniteException.wrap(e);
         } catch (ExecutionException e) {
             throw IgniteException.wrap(e);
+        } catch (CompletionException e) {
+            throw IgniteException.wrap(e.getCause());
         }
     }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 04d80f52a0..d3542a7c98 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.client;
 import static 
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Common.UNKNOWN_ERR;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -63,7 +62,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public final class ReliableChannel implements AutoCloseable {
     /** Channel factory. */
-    private final BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory;
+    private final BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory;
 
     /** Client channel holders for each configured address. */
     private volatile List<ClientChannelHolder> channels;
@@ -113,7 +112,7 @@ public final class ReliableChannel implements AutoCloseable 
{
      * @param chFactory Channel factory.
      * @param clientCfg Client config.
      */
-    ReliableChannel(BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory,
+    ReliableChannel(BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory,
             IgniteClientConfiguration clientCfg) {
         this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
         this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
@@ -148,10 +147,14 @@ public final class ReliableChannel implements 
AutoCloseable {
         List<ClusterNode> res = new ArrayList<>(channels.size());
 
         for (var holder : nodeChannelsByName.values()) {
-            var ch = holder.ch;
+            var chFut = holder.chFut;
 
-            if (ch != null) {
-                res.add(ch.protocolContext().clusterNode());
+            if (chFut != null) {
+                var ch = ClientFutureUtils.getNowSafe(chFut);
+
+                if (ch != null) {
+                    res.add(ch.protocolContext().clusterNode());
+                }
             }
         }
 
@@ -178,12 +181,11 @@ public final class ReliableChannel implements 
AutoCloseable {
             @Nullable String preferredNodeName,
             @Nullable String preferredNodeId
     ) {
-        CompletableFuture<T> fut = new CompletableFuture<>();
-
-        // Use the only one attempt to avoid blocking async method.
-        handleServiceAsync(fut, opCode, payloadWriter, payloadReader, 
preferredNodeName, preferredNodeId, null, 0);
-
-        return fut;
+        return ClientFutureUtils.doWithRetryAsync(
+                () -> getChannelAsync(preferredNodeName, preferredNodeId)
+                        .thenCompose(ch -> serviceAsyncInternal(opCode, 
payloadWriter, payloadReader, ch)),
+                null,
+                ctx -> shouldRetry(opCode, ctx));
     }
 
     /**
@@ -215,15 +217,19 @@ public final class ReliableChannel implements 
AutoCloseable {
         return serviceAsync(opCode, null, payloadReader, null, null);
     }
 
-    private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+    private <T> CompletableFuture<T> serviceAsyncInternal(
             int opCode,
             PayloadWriter payloadWriter,
             PayloadReader<T> payloadReader,
-            @Nullable String preferredNodeName,
-            @Nullable String preferredNodeId,
-            @Nullable IgniteClientConnectionException failure,
-            int attempt) {
-        ClientChannel ch = null;
+            ClientChannel ch) {
+        return ch.serviceAsync(opCode, payloadWriter, 
payloadReader).whenComplete((res, err) -> {
+            if (err != null && unwrapConnectionException(err) != null) {
+                onChannelFailure(ch);
+            }
+        });
+    }
+
+    private CompletableFuture<ClientChannel> getChannelAsync(@Nullable String 
preferredNodeName, @Nullable String preferredNodeId) {
         ClientChannelHolder holder = null;
 
         if (preferredNodeName != null) {
@@ -233,86 +239,16 @@ public final class ReliableChannel implements 
AutoCloseable {
         }
 
         if (holder != null) {
-            try {
-                ch = holder.getOrCreateChannel();
-            } catch (Throwable ignored) {
-                // Ignore.
-            }
-        }
-
-        if (ch == null) {
-            try {
-                ch = getDefaultChannel();
-            } catch (Throwable ex) {
-                if (failure != null) {
-                    failure.addSuppressed(ex);
-
-                    fut.completeExceptionally(failure);
-
-                    return;
+            return holder.getOrCreateChannelAsync().thenCompose(ch -> {
+                if (ch != null) {
+                    return CompletableFuture.completedFuture(ch);
+                } else {
+                    return getDefaultChannelAsync();
                 }
-
-                fut.completeExceptionally(ex);
-
-                return;
-            }
+            });
         }
 
-        final ClientChannel ch0 = ch;
-
-        ch0
-                .serviceAsync(opCode, payloadWriter, payloadReader)
-                .handle((res, err) -> {
-                    if (err == null) {
-                        fut.complete(res);
-
-                        return null;
-                    }
-
-                    while (err instanceof CompletionException && 
err.getCause() != null) {
-                        err = err.getCause();
-                    }
-
-                    IgniteClientConnectionException failure0 = failure;
-
-                    if (err instanceof IgniteClientConnectionException) {
-                        var connectionErr = (IgniteClientConnectionException) 
err;
-
-                        try {
-                            // Will try to reinit channels if topology changed.
-                            onChannelFailure(ch0);
-                        } catch (Throwable ex) {
-                            fut.completeExceptionally(ex);
-
-                            return null;
-                        }
-
-                        if (failure0 == null) {
-                            failure0 = connectionErr;
-                        } else {
-                            failure0.addSuppressed(err);
-                        }
-
-                        if (shouldRetry(opCode, attempt, connectionErr, 
failure0)) {
-                            log.debug("Going to retry request because of error 
[opCode={}, currentAttempt={}, errMsg={}]",
-                                    failure0, opCode, attempt, 
failure0.getMessage());
-
-                            handleServiceAsync(fut, opCode, payloadWriter, 
payloadReader, null, null, failure0, attempt + 1);
-
-                            return null;
-                        }
-                    } else {
-                        fut.completeExceptionally(err instanceof 
IgniteException
-                                ? new CompletionException(err)
-                                : new IgniteException(UNKNOWN_ERR, 
err.getMessage(), err));
-
-                        return null;
-                    }
-
-                    fut.completeExceptionally(failure0);
-
-                    return null;
-                });
+        return getDefaultChannelAsync();
     }
 
     /**
@@ -385,11 +321,7 @@ public final class ReliableChannel implements 
AutoCloseable {
     /**
      * On channel of the specified holder failure.
      */
-    private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
-        if (ch != null && ch == hld.ch) {
-            hld.closeChannel();
-        }
-
+    private void onChannelFailure(ClientChannelHolder hld, @Nullable 
ClientChannel ch) {
         chFailLsnrs.forEach(Runnable::run);
 
         // Roll current channel even if a topology changes. To help find 
working channel faster.
@@ -421,7 +353,7 @@ public final class ReliableChannel implements AutoCloseable 
{
      *
      * @return boolean wheter channels was reinited.
      */
-    synchronized boolean initChannelHolders() {
+    private synchronized boolean initChannelHolders() {
         List<ClientChannelHolder> holders = channels;
 
         // Enable parallel threads to schedule new init of channel holders.
@@ -529,86 +461,93 @@ public final class ReliableChannel implements 
AutoCloseable {
     /**
      * Init channel holders, establish connection to default channel.
      */
-    CompletableFuture<Void> channelsInitAsync() {
+    CompletableFuture<ClientChannel> channelsInitAsync() {
         // Do not establish connections if interrupted.
         if (!initChannelHolders()) {
             return CompletableFuture.completedFuture(null);
         }
 
         // Establish default channel connection.
-        getDefaultChannel();
+        var fut = getDefaultChannelAsync();
 
         // Establish secondary connections in the background.
-        initAllChannelsAsync();
+        fut.thenAccept(unused -> initAllChannelsAsync());
 
-        // TODO: Async startup IGNITE-15357.
-        return CompletableFuture.completedFuture(null);
+        return fut;
     }
 
     /**
      * Gets the default channel, reconnecting if necessary.
      */
-    private ClientChannel getDefaultChannel() {
-        IgniteClientConnectionException failure = null;
+    private CompletableFuture<ClientChannel> getDefaultChannelAsync() {
+        return ClientFutureUtils.doWithRetryAsync(
+                () -> {
+                    curChannelsGuard.readLock().lock();
 
-        for (int attempt = 0; ; attempt++) {
-            ClientChannelHolder hld = null;
-            ClientChannel c = null;
+                    ClientChannelHolder hld;
 
-            try {
-                if (closed) {
-                    throw new IgniteClientConnectionException(CONNECTION_ERR, 
"Channel is closed");
-                }
-
-                curChannelsGuard.readLock().lock();
+                    try {
+                        hld = channels.get(curChIdx);
+                    } finally {
+                        curChannelsGuard.readLock().unlock();
+                    }
 
-                try {
-                    hld = channels.get(curChIdx);
-                } finally {
-                    curChannelsGuard.readLock().unlock();
-                }
+                    return hld.getOrCreateChannelAsync();
+                },
+                Objects::nonNull,
+                ctx -> shouldRetry(ClientOperationType.CHANNEL_CONNECT, ctx));
+    }
 
-                c = hld.getOrCreateChannel();
+    private CompletableFuture<ClientChannel> getCurChannelAsync() {
+        if (closed) {
+            return CompletableFuture.failedFuture(new 
IgniteClientConnectionException(CONNECTION_ERR, "Channel is closed"));
+        }
 
-                if (c != null) {
-                    return c;
-                }
-            } catch (IgniteClientConnectionException e) {
-                if (failure == null) {
-                    failure = e;
-                } else {
-                    failure.addSuppressed(e);
-                }
+        curChannelsGuard.readLock().lock();
 
-                onChannelFailure(hld, c);
+        try {
+            var hld = channels.get(curChIdx);
 
-                if (!shouldRetry(ClientOperationType.CHANNEL_CONNECT, attempt, 
e, failure)) {
-                    break;
-                }
+            if (hld == null) {
+                return CompletableFuture.completedFuture(null);
             }
-        }
 
-        throw new IgniteClientConnectionException(CONNECTION_ERR, "Failed to 
connect", failure);
+            CompletableFuture<ClientChannel> fut = 
hld.getOrCreateChannelAsync();
+            return fut == null ? CompletableFuture.completedFuture(null) : fut;
+        } finally {
+            curChannelsGuard.readLock().unlock();
+        }
     }
 
     /** Determines whether specified operation should be retried. */
-    private boolean shouldRetry(int opCode, int iteration, 
IgniteClientConnectionException exception,
-                                IgniteClientConnectionException 
aggregateException) {
+    private boolean shouldRetry(int opCode, ClientFutureUtils.RetryContext 
ctx) {
         ClientOperationType opType = 
ClientUtils.opCodeToClientOperationType(opCode);
 
-        return shouldRetry(opType, iteration, exception, aggregateException);
+        return shouldRetry(opType, ctx);
     }
 
     /** Determines whether specified operation should be retried. */
-    private boolean shouldRetry(ClientOperationType opType, int iteration, 
IgniteClientConnectionException exception,
-                                IgniteClientConnectionException 
aggregateException) {
+    private boolean shouldRetry(@Nullable ClientOperationType opType, 
ClientFutureUtils.RetryContext ctx) {
+        var err = ctx.lastError();
+
+        if (err == null) {
+            // Closed channel situation - no error, but connection should be 
retried.
+            return opType == ClientOperationType.CHANNEL_CONNECT;
+        }
+
+        IgniteClientConnectionException exception = 
unwrapConnectionException(err);
+
+        if (exception == null) {
+            return false;
+        }
+
         if (exception.code() == CLUSTER_ID_MISMATCH_ERR) {
             return false;
         }
 
         if (opType == null) {
             // System operation.
-            return iteration < RetryLimitPolicy.DFLT_RETRY_LIMIT;
+            return ctx.attempt < RetryLimitPolicy.DFLT_RETRY_LIMIT;
         }
 
         RetryPolicy plc = clientCfg.retryPolicy();
@@ -617,14 +556,17 @@ public final class ReliableChannel implements 
AutoCloseable {
             return false;
         }
 
-        RetryPolicyContext ctx = new RetryPolicyContextImpl(clientCfg, opType, 
iteration, exception);
+        RetryPolicyContext retryPolicyContext = new 
RetryPolicyContextImpl(clientCfg, opType, ctx.attempt, exception);
 
-        try {
-            return plc.shouldRetry(ctx);
-        } catch (Throwable t) {
-            aggregateException.addSuppressed(t);
-            return false;
+        // Exception in shouldRetry will be handled by 
ClientFutureUtils.doWithRetryAsync
+        boolean shouldRetry = plc.shouldRetry(retryPolicyContext);
+
+        if (shouldRetry) {
+            log.debug("Going to retry operation because of error [op={}, 
currentAttempt={}, errMsg={}]",
+                    exception, opType, ctx.attempt, exception.getMessage());
         }
+
+        return shouldRetry;
     }
 
     /**
@@ -641,7 +583,7 @@ public final class ReliableChannel implements AutoCloseable 
{
                         }
 
                         try {
-                            hld.getOrCreateChannel(true);
+                            hld.getOrCreateChannelAsync(true);
                         } catch (Exception e) {
                             log.warn("Failed to establish connection to " + 
hld.chCfg.getAddress() + ": " + e.getMessage(), e);
                         }
@@ -655,7 +597,9 @@ public final class ReliableChannel implements AutoCloseable 
{
         // This could be solved with a cluster-wide AssignmentVersion, but we 
don't have that.
         // So we only react to updates from the default channel. When no 
user-initiated operations are performed on the default
         // channel, heartbeat messages will trigger updates.
-        if (clientChannel == channels.get(curChIdx).ch) {
+        CompletableFuture<ClientChannel> ch = channels.get(curChIdx).chFut;
+
+        if (ch != null && clientChannel == ClientFutureUtils.getNowSafe(ch)) {
             assignmentVersion.incrementAndGet();
         }
     }
@@ -669,6 +613,19 @@ public final class ReliableChannel implements 
AutoCloseable {
         return assignmentVersion.get();
     }
 
+    @Nullable
+    private static IgniteClientConnectionException 
unwrapConnectionException(Throwable err) {
+        while (err instanceof CompletionException) {
+            err = err.getCause();
+        }
+
+        if (!(err instanceof IgniteClientConnectionException)) {
+            return null;
+        }
+
+        return (IgniteClientConnectionException) err;
+    }
+
     /**
      * Channels holder.
      */
@@ -678,7 +635,7 @@ public final class ReliableChannel implements AutoCloseable 
{
         private final ClientChannelConfiguration chCfg;
 
         /** Channel. */
-        private volatile ClientChannel ch;
+        private volatile @Nullable CompletableFuture<ClientChannel> chFut;
 
         /** The last server node that channel is or was connected to. */
         private volatile ClusterNode serverNode;
@@ -729,46 +686,55 @@ public final class ReliableChannel implements 
AutoCloseable {
         /**
          * Get or create channel.
          */
-        private ClientChannel getOrCreateChannel() {
-            return getOrCreateChannel(false);
+        private CompletableFuture<ClientChannel> getOrCreateChannelAsync() {
+            return getOrCreateChannelAsync(false);
         }
 
         /**
          * Get or create channel.
          */
-        private ClientChannel getOrCreateChannel(boolean ignoreThrottling) {
-            if (ch == null && !close) {
-                synchronized (this) {
-                    if (close) {
-                        return null;
-                    }
+        private CompletableFuture<ClientChannel> 
getOrCreateChannelAsync(boolean ignoreThrottling) {
+            if (close) {
+                return CompletableFuture.completedFuture(null);
+            }
 
-                    if (ch != null) {
-                        return ch;
-                    }
+            var chFut0 = chFut;
 
-                    if (!ignoreThrottling && applyReconnectionThrottling()) {
-                        //noinspection 
NonPrivateFieldAccessedInSynchronizedContext
-                        throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due 
to applied throttling");
-                    }
+            if (isFutureInProgressOrDoneAndChannelOpen(chFut0)) {
+                return chFut0;
+            }
 
-                    ClientChannel ch0 = chFactory.apply(chCfg, connMgr);
+            synchronized (this) {
+                if (close) {
+                    return CompletableFuture.completedFuture(null);
+                }
 
-                    var oldClusterId = clusterId.compareAndExchange(null, 
ch0.protocolContext().clusterId());
+                chFut0 = chFut;
 
-                    if (oldClusterId != null && 
!oldClusterId.equals(ch0.protocolContext().clusterId())) {
+                if (isFutureInProgressOrDoneAndChannelOpen(chFut0)) {
+                    return chFut0;
+                }
+
+                if (!ignoreThrottling && applyReconnectionThrottling()) {
+                    return CompletableFuture.failedFuture(
+                            new 
IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due 
to applied throttling"));
+                }
+
+                chFut0 = chFactory.apply(chCfg, connMgr).thenApply(ch -> {
+                    var oldClusterId = clusterId.compareAndExchange(null, 
ch.protocolContext().clusterId());
+
+                    if (oldClusterId != null && 
!oldClusterId.equals(ch.protocolContext().clusterId())) {
                         try {
-                            ch0.close();
+                            ch.close();
                         } catch (Exception ignored) {
                             // Ignore
                         }
 
-                        throw new 
IgniteClientConnectionException(CLUSTER_ID_MISMATCH_ERR, "Cluster ID mismatch: 
expected=" + oldClusterId
-                                + ", actual=" + 
ch0.protocolContext().clusterId());
+                        throw new IgniteClientConnectionException(
+                                CLUSTER_ID_MISMATCH_ERR,
+                                "Cluster ID mismatch: expected=" + 
oldClusterId + ", actual=" + ch.protocolContext().clusterId());
                     }
 
-                    ch = ch0;
-
                     
ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged);
 
                     ClusterNode newNode = ch.protocolContext().clusterNode();
@@ -786,22 +752,39 @@ public final class ReliableChannel implements 
AutoCloseable {
                     }
 
                     serverNode = newNode;
-                }
-            }
 
-            return ch;
+                    return ch;
+                });
+
+                chFut0.exceptionally(err -> {
+                    closeChannel();
+                    onChannelFailure(this, null);
+
+                    log.warn("Failed to establish connection to " + 
chCfg.getAddress() + ": " + err.getMessage(), err);
+
+                    return null;
+                });
+
+                chFut = chFut0;
+
+                return chFut0;
+            }
         }
 
         /**
          * Close channel.
          */
         private synchronized void closeChannel() {
-            if (ch != null) {
-                try {
-                    ch.close();
-                } catch (Exception ignored) {
-                    // No op.
-                }
+            CompletableFuture<ClientChannel> ch0 = chFut;
+
+            if (ch0 != null) {
+                ch0.thenAccept(c -> {
+                    try {
+                        c.close();
+                    } catch (Exception ignored) {
+                        // No-op.
+                    }
+                });
 
                 var oldServerNode = serverNode;
 
@@ -810,7 +793,7 @@ public final class ReliableChannel implements AutoCloseable 
{
                     nodeChannelsById.remove(oldServerNode.id(), this);
                 }
 
-                ch = null;
+                chFut = null;
             }
         }
 
@@ -829,5 +812,19 @@ public final class ReliableChannel implements 
AutoCloseable {
 
             closeChannel();
         }
+
+        private boolean isFutureInProgressOrDoneAndChannelOpen(@Nullable 
CompletableFuture<ClientChannel> f) {
+            if (f == null || f.isCompletedExceptionally()) {
+                return false;
+            }
+
+            if (!f.isDone()) {
+                return true;
+            }
+
+            var ch = f.getNow(null);
+
+            return ch != null && !ch.closed();
+        }
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 6f7bde9e2c..1036863386 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -75,11 +75,14 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** Minimum supported heartbeat interval. */
     private static final long MIN_RECOMMENDED_HEARTBEAT_INTERVAL = 500;
 
+    /** Config. */
+    private final ClientChannelConfiguration cfg;
+
     /** Protocol context. */
     private volatile ProtocolContext protocolCtx;
 
     /** Channel. */
-    private final ClientConnection sock;
+    private volatile ClientConnection sock;
 
     /** Request id. */
     private final AtomicLong reqId = new AtomicLong(1);
@@ -103,7 +106,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     private final long heartbeatTimeout;
 
     /** Heartbeat timer. */
-    private final Timer heartbeatTimer;
+    private volatile Timer heartbeatTimer;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -115,10 +118,10 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
      * Constructor.
      *
      * @param cfg     Config.
-     * @param connMgr Connection multiplexer.
      */
-    TcpClientChannel(ClientChannelConfiguration cfg, 
ClientConnectionMultiplexer connMgr) {
+    private TcpClientChannel(ClientChannelConfiguration cfg) {
         validateConfiguration(cfg);
+        this.cfg = cfg;
 
         log = ClientUtils.logger(cfg.clientConfiguration(), 
TcpClientChannel.class);
 
@@ -128,14 +131,40 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
         connectTimeout = cfg.clientConfiguration().connectTimeout();
         heartbeatTimeout = cfg.clientConfiguration().heartbeatTimeout();
+    }
 
-        sock = connMgr.open(cfg.getAddress(), this, this);
-
-        handshake(DEFAULT_VERSION);
+    private CompletableFuture<ClientChannel> 
initAsync(ClientConnectionMultiplexer connMgr) {
+        return connMgr
+                .openAsync(cfg.getAddress(), this, this)
+                .thenCompose(s -> {
+                    sock = s;
+
+                    return handshakeAsync(DEFAULT_VERSION);
+                })
+                .whenComplete((res, err) -> {
+                    if (err != null) {
+                        close();
+                    }
+                })
+                .thenApplyAsync(unused -> {
+                    // Netty has a built-in IdleStateHandler to detect idle 
connections (used on the server side).
+                    // However, to adjust the heartbeat interval dynamically, 
we have to use a timer here.
+                    heartbeatTimer = 
initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
+
+                    return this;
+                }, asyncContinuationExecutor);
+    }
 
-        // Netty has a built-in IdleStateHandler to detect idle connections 
(used on the server side).
-        // However, to adjust the heartbeat interval dynamically, we have to 
use a timer here.
-        heartbeatTimer = 
initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
+    /**
+     * Creates a new channel asynchronously.
+     *
+     * @param cfg Configuration.
+     * @param connMgr Connection manager.
+     * @return Channel.
+     */
+    static CompletableFuture<ClientChannel> 
createAsync(ClientChannelConfiguration cfg, ClientConnectionMultiplexer 
connMgr) {
+        //noinspection resource - returned from method.
+        return new TcpClientChannel(cfg).initAsync(connMgr);
     }
 
     /** {@inheritDoc} */
@@ -178,6 +207,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** {@inheritDoc} */
     @Override
     public void onDisconnected(@Nullable Exception e) {
+        log.debug("Disconnected from server: " + cfg.getAddress());
         close(e);
     }
 
@@ -382,29 +412,31 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     }
 
     /** Client handshake. */
-    private void handshake(ProtocolVersion ver)
+    private CompletableFuture<Void> handshakeAsync(ProtocolVersion ver)
             throws IgniteClientConnectionException {
         ClientRequestFuture fut = new ClientRequestFuture();
         pendingReqs.put(-1L, fut);
 
-        try {
-            handshakeReq(ver);
-
-            // handshakeRes must be called even in case of timeout to release 
the buffer.
-            var resFut = fut.thenAccept(res -> handshakeRes(res, ver));
-
-            if (connectTimeout > 0) {
-                resFut.get(connectTimeout, TimeUnit.MILLISECONDS);
-            } else {
-                resFut.get();
+        handshakeReqAsync(ver).addListener(f -> {
+            if (!f.isSuccess()) {
+                fut.completeExceptionally(
+                        new IgniteClientConnectionException(CONNECTION_ERR, 
"Failed to send handshake request", f.cause()));
             }
-        } catch (Throwable e) {
-            throw IgniteException.wrap(e);
+        });
+
+        if (connectTimeout > 0) {
+            fut.orTimeout(connectTimeout, TimeUnit.MILLISECONDS);
         }
+
+        return fut.thenCompose(res -> handshakeRes(res, ver));
     }
 
-    /** Send handshake request. */
-    private void handshakeReq(ProtocolVersion proposedVer) {
+    /**
+     * Send handshake request.
+     *
+     * @return Channel future.
+     */
+    private ChannelFuture handshakeReqAsync(ProtocolVersion proposedVer) {
         sock.send(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
 
         var req = new ClientMessagePacker(sock.getBuffer());
@@ -417,11 +449,11 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         req.packBinaryHeader(0); // Features.
         req.packMapHeader(0); // Extensions.
 
-        write(req).syncUninterruptibly();
+        return write(req);
     }
 
     /** Receive and handle handshake response. */
-    private void handshakeRes(ClientMessageUnpacker unpacker, ProtocolVersion 
proposedVer) {
+    private CompletableFuture<Void> handshakeRes(ClientMessageUnpacker 
unpacker, ProtocolVersion proposedVer) {
         try (unpacker) {
             ProtocolVersion srvVer = new 
ProtocolVersion(unpacker.unpackShort(), unpacker.unpackShort(),
                     unpacker.unpackShort());
@@ -429,8 +461,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             if (!unpacker.tryUnpackNil()) {
                 if (!proposedVer.equals(srvVer) && 
supportedVers.contains(srvVer)) {
                     // Retry with server version.
-                    handshake(srvVer);
-                    return;
+                    return handshakeAsync(srvVer);
                 }
 
                 throw readError(unpacker);
@@ -451,6 +482,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
 
             protocolCtx = new ProtocolContext(
                     srvVer, ProtocolBitmaskFeature.allFeaturesAsEnumSet(), 
serverIdleTimeout, clusterNode, clusterId);
+
+            return CompletableFuture.completedFuture(null);
         }
     }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index efc3227dde..8474f280ec 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -68,7 +68,7 @@ public class TcpIgniteClient implements IgniteClient {
      * @param cfg Config.
      */
     private TcpIgniteClient(IgniteClientConfiguration cfg) {
-        this(TcpClientChannel::new, cfg);
+        this(TcpClientChannel::createAsync, cfg);
     }
 
     /**
@@ -78,7 +78,7 @@ public class TcpIgniteClient implements IgniteClient {
      * @param cfg Config.
      */
     private TcpIgniteClient(
-            BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory,
+            BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory,
             IgniteClientConfiguration cfg
     ) {
         assert chFactory != null;
@@ -98,7 +98,7 @@ public class TcpIgniteClient implements IgniteClient {
      *
      * @return Future representing pending completion of the operation.
      */
-    public CompletableFuture<Void> initAsync() {
+    private CompletableFuture<ClientChannel> initAsync() {
         return ch.channelsInitAsync();
     }
 
@@ -109,6 +109,7 @@ public class TcpIgniteClient implements IgniteClient {
      * @return Future representing pending completion of the operation.
      */
     public static CompletableFuture<IgniteClient> 
startAsync(IgniteClientConfiguration cfg) {
+        //noinspection resource: returned from method
         var client = new TcpIgniteClient(cfg);
 
         return client.initAsync().thenApply(x -> client);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
index c20cdc5f2c..b457177374 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client.io;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.IgniteClientConfiguration;
 import org.apache.ignite.client.IgniteClientConnectionException;
 
@@ -46,7 +47,7 @@ public interface ClientConnectionMultiplexer {
      * @return Created connection.
      * @throws IgniteClientConnectionException when connection can't be 
established.
      */
-    ClientConnection open(
+    CompletableFuture<ClientConnection> openAsync(
             InetSocketAddress addr,
             ClientMessageHandler msgHnd,
             ClientConnectionStateHandler stateHnd)
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index 9beb0415ef..24b8acc613 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.client.io.netty;
 
-import static org.apache.ignite.lang.ErrorGroups.Common.UNKNOWN_ERR;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -27,6 +25,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.IgniteClientConfiguration;
 import org.apache.ignite.client.IgniteClientConnectionException;
 import org.apache.ignite.internal.client.io.ClientConnection;
@@ -34,6 +33,7 @@ import 
org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
 import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
 import org.apache.ignite.internal.client.io.ClientMessageHandler;
 import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
+import org.apache.ignite.lang.ErrorGroups.Client;
 
 /**
  * Netty-based multiplexer.
@@ -83,17 +83,31 @@ public class NettyClientConnectionMultiplexer implements 
ClientConnectionMultipl
 
     /** {@inheritDoc} */
     @Override
-    public ClientConnection open(InetSocketAddress addr,
+    public CompletableFuture<ClientConnection> openAsync(InetSocketAddress 
addr,
             ClientMessageHandler msgHnd,
             ClientConnectionStateHandler stateHnd)
             throws IgniteClientConnectionException {
-        try {
-            // TODO: Async startup IGNITE-15357.
-            ChannelFuture f = bootstrap.connect(addr).syncUninterruptibly();
+        CompletableFuture<ClientConnection> fut = new CompletableFuture<>();
 
-            return new NettyClientConnection(f.channel(), msgHnd, stateHnd);
-        } catch (Throwable t) {
-            throw new IgniteClientConnectionException(UNKNOWN_ERR, 
t.getMessage(), t);
-        }
+        ChannelFuture connectFut = bootstrap.connect(addr);
+
+        connectFut.addListener(f -> {
+            if (f.isSuccess()) {
+                NettyClientConnection conn = new 
NettyClientConnection(((ChannelFuture) f).channel(), msgHnd, stateHnd);
+
+                fut.complete(conn);
+            } else {
+                Throwable cause = f.cause();
+
+                var err = new IgniteClientConnectionException(
+                        Client.CONNECTION_ERR,
+                        "Client failed to connect: " + cause.getMessage(),
+                        cause);
+
+                fut.completeExceptionally(err);
+            }
+        });
+
+        return fut;
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
index 3d6b9325f8..b7b49751e5 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
@@ -27,8 +27,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.lang.IgniteException;
 import org.junit.jupiter.api.Test;
@@ -158,15 +160,20 @@ public class ConfigurationTest extends AbstractClientTest 
{
     public void testCustomAsyncContinuationExecutor() throws Exception {
         ExecutorService executor = Executors.newSingleThreadExecutor();
 
-        IgniteClient.Builder builder = IgniteClient.builder()
+        var builderThreadName = new AtomicReference<String>();
+
+        CompletableFuture<IgniteClient> builder = IgniteClient.builder()
                 .addresses("127.0.0.1:" + serverPort)
-                .asyncContinuationExecutor(executor);
+                .asyncContinuationExecutor(executor)
+                .buildAsync()
+                .whenComplete((res, err) -> 
builderThreadName.set(Thread.currentThread().getName()));
 
-        try (IgniteClient ignite = builder.build()) {
+        try (IgniteClient ignite = builder.join()) {
             String threadName = ignite.tables().tablesAsync().thenApply(unused 
-> Thread.currentThread().getName()).join();
 
             assertEquals(executor, 
ignite.configuration().asyncContinuationExecutor());
             assertThat(threadName, startsWith("pool-"));
+            assertThat(builderThreadName.get(), startsWith("pool-"));
         }
 
         executor.shutdown();
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index 7c6f03aba8..f294e9be1b 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -92,6 +92,8 @@ public class ConnectionTest extends AbstractClientTest {
     }
 
     private static void testConnection(String... addrs) throws Exception {
-        AbstractClientTest.startClient(addrs).close();
+        IgniteClient c = AbstractClientTest.startClient(addrs);
+
+        c.close();
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index 8a797702a9..c54d6365bf 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.client;
 
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.UUID;
 import java.util.function.Function;
@@ -35,15 +35,17 @@ public class HeartbeatTest {
     public void testHeartbeatLongerThanIdleTimeoutCausesDisconnect() throws 
Exception {
         try (var srv = new TestServer(10800, 10, 50, new FakeIgnite())) {
             int srvPort = srv.port();
+            var loggerFactory = new TestLoggerFactory("client");
 
             Builder builder = IgniteClient.builder()
                     .addresses("127.0.0.1:" + srvPort)
-                    .retryPolicy(null);
-
-            try (var client = builder.build()) {
-                Thread.sleep(300);
+                    .retryPolicy(null)
+                    .loggerFactory(loggerFactory);
 
-                assertThrows(IgniteClientConnectionException.class, () -> 
client.tables().tables());
+            try (var ignored = builder.build()) {
+                IgniteTestUtils.waitForCondition(
+                        () -> 
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Disconnected 
from server")),
+                        1000);
             }
         }
     }
@@ -65,6 +67,7 @@ public class HeartbeatTest {
         }
     }
 
+    @SuppressWarnings("ThrowableNotThrown")
     @Test
     public void testInvalidHeartbeatIntervalThrows() throws Exception {
         try (var srv = new TestServer(10800, 10, 300, new FakeIgnite())) {
@@ -73,9 +76,7 @@ public class HeartbeatTest {
                     .addresses("127.0.0.1:" + srv.port())
                     .heartbeatInterval(-50);
 
-            Throwable ex = assertThrows(IllegalArgumentException.class, 
builder::build);
-
-            assertEquals("Negative delay.", ex.getMessage());
+            assertThrowsWithCause(builder::build, 
IllegalArgumentException.class, "Negative delay.");
         }
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
index 7955e58739..b94c2ae023 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
@@ -27,7 +27,6 @@ import java.util.UUID;
 import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.hamcrest.CoreMatchers;
 import org.jetbrains.annotations.Nullable;
@@ -96,13 +95,7 @@ public class MultiClusterTest {
             IgniteClientConnectionException ex = 
(IgniteClientConnectionException) assertThrowsWithCause(
                     () -> client.tables().tables(), 
IgniteClientConnectionException.class, "Cluster ID mismatch");
 
-            IgniteClientConnectionException cause = 
(IgniteClientConnectionException) ExceptionUtils.getSuppressedList(ex).stream()
-                    .filter(e -> e.getCause().getMessage().contains("Cluster 
ID mismatch"))
-                    .findFirst()
-                    .orElseThrow()
-                    .getCause();
-
-            assertEquals(CLUSTER_ID_MISMATCH_ERR, cause.code());
+            assertEquals(CLUSTER_ID_MISMATCH_ERR, ex.code());
         }
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index caa97fc5bf..163f95c5ff 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -93,6 +93,6 @@ public class ReconnectTest {
 
         server.close();
 
-        assertThrowsWithCause(() -> client.tables().tables(), 
IgniteClientConnectionException.class, "Channel is closed");
+        assertThrowsWithCause(() -> client.tables().tables(), 
IgniteClientConnectionException.class);
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index cbcde03218..5029a38525 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -244,12 +244,9 @@ public class RetryPolicyTest {
 
         try (var client = getClient(plc)) {
             IgniteException ex = assertThrows(IgniteException.class, () -> 
client.tables().tables());
+            var cause = (RuntimeException) ex.getCause();
 
-            var cause = (IgniteClientConnectionException) ex.getCause();
-            Throwable[] suppressed = cause.getSuppressed();
-
-            assertEquals("TestRetryPolicy exception.", 
suppressed[0].getMessage());
-            assertEquals(1, suppressed.length);
+            assertEquals("TestRetryPolicy exception.", cause.getMessage());
         }
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
 
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
new file mode 100644
index 0000000000..d78e02caed
--- /dev/null
+++ 
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Future utils test.
+ */
+public class ClientFutureUtilsTest {
+    @Test
+    public void testGetNowSafe() {
+        
assertNull(ClientFutureUtils.getNowSafe(CompletableFuture.completedFuture(null)));
+        
assertNull(ClientFutureUtils.getNowSafe(CompletableFuture.failedFuture(new 
Exception("fail"))));
+        assertNull(ClientFutureUtils.getNowSafe(new CompletableFuture<>()));
+        assertEquals("test", 
ClientFutureUtils.getNowSafe(CompletableFuture.completedFuture("test")));
+    }
+
+    @Test
+    public void testDoWithRetryAsyncWithCompletedFutureReturnsResult() {
+        var res = ClientFutureUtils.doWithRetryAsync(
+                () -> CompletableFuture.completedFuture("test"),
+                null,
+                ctx -> false
+        ).join();
+
+        assertEquals("test", res);
+    }
+
+    @Test
+    public void 
testDoWithRetryAsyncWithResultValidatorRejectsAllThrowsIllegalState() {
+        var fut = ClientFutureUtils.doWithRetryAsync(
+                () -> CompletableFuture.completedFuture("test"),
+                x -> false,
+                ctx -> false
+        );
+
+        var ex = assertThrows(CompletionException.class, fut::join);
+        assertSame(IllegalStateException.class, ex.getCause().getClass());
+    }
+
+    @Test
+    public void 
testDoWithRetryAsyncWithFailedFutureReturnsExceptionWithSuppressedList() {
+        var counter = new AtomicInteger();
+
+        var fut = ClientFutureUtils.doWithRetryAsync(
+                () -> CompletableFuture.failedFuture(new Exception("fail_" + 
counter.get())),
+                null,
+                ctx -> counter.incrementAndGet() < 3
+        );
+
+        var completionEx = assertThrows(CompletionException.class, fut::join);
+        var ex = (Exception) completionEx.getCause();
+
+        assertEquals(2, ex.getSuppressed().length);
+
+        assertEquals("fail_0", ex.getMessage());
+        assertEquals("fail_1", ex.getSuppressed()[0].getMessage());
+        assertEquals("fail_2", ex.getSuppressed()[1].getMessage());
+    }
+
+    @Test
+    public void testDoWithRetryAsyncSucceedsAfterRetries() {
+        var counter = new AtomicInteger();
+
+        var fut = ClientFutureUtils.doWithRetryAsync(
+                () -> counter.getAndIncrement() < 3
+                        ? CompletableFuture.failedFuture(new Exception("fail"))
+                        : CompletableFuture.completedFuture("test"),
+                null,
+                ctx -> {
+                    assertNotNull(ctx.lastError());
+
+                    //noinspection DataFlowIssue
+                    assertEquals("fail", ctx.lastError().getMessage());
+
+                    return ctx.attempt < 5;
+                }
+        );
+
+        assertEquals("test", fut.join());
+    }
+
+    @Test
+    public void 
testDoWithRetryAsyncWithExceptionInDelegateReturnsFailedFuture() {
+        var counter = new AtomicInteger();
+
+        var fut = ClientFutureUtils.doWithRetryAsync(
+                () -> {
+                    if (counter.incrementAndGet() > 1) {
+                        throw new RuntimeException("fail1");
+                    } else {
+                        return CompletableFuture.failedFuture(new 
Exception("fail2"));
+                    }
+                },
+                null,
+                ctx -> true
+        );
+
+        var ex = assertThrows(CompletionException.class, fut::join);
+        assertEquals("fail1", ex.getCause().getMessage());
+    }
+}

Reply via email to