This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 99c91ec013 IGNITE-15357 Java client async startup (#1660)
99c91ec013 is described below
commit 99c91ec01319ee5e51bb0212846a48fe363c258d
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Feb 10 17:02:44 2023 +0200
IGNITE-15357 Java client async startup (#1660)
Implement fully non-blocking async startup / connect / reconnect logic for
Java client.
---
.../ignite/internal/client/ClientFutureUtils.java | 106 ++++++
.../apache/ignite/internal/client/ClientUtils.java | 3 +
.../ignite/internal/client/ReliableChannel.java | 369 ++++++++++-----------
.../ignite/internal/client/TcpClientChannel.java | 91 +++--
.../ignite/internal/client/TcpIgniteClient.java | 7 +-
.../client/io/ClientConnectionMultiplexer.java | 3 +-
.../io/netty/NettyClientConnectionMultiplexer.java | 34 +-
.../apache/ignite/client/ConfigurationTest.java | 13 +-
.../org/apache/ignite/client/ConnectionTest.java | 4 +-
.../org/apache/ignite/client/HeartbeatTest.java | 19 +-
.../org/apache/ignite/client/MultiClusterTest.java | 9 +-
.../org/apache/ignite/client/ReconnectTest.java | 2 +-
.../org/apache/ignite/client/RetryPolicyTest.java | 7 +-
.../internal/client/ClientFutureUtilsTest.java | 127 +++++++
14 files changed, 538 insertions(+), 256 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
new file mode 100644
index 0000000000..0656e725dd
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientFutureUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future utils.
+ */
+class ClientFutureUtils {
+ static <T> @Nullable T getNowSafe(CompletableFuture<T> fut) {
+ try {
+ return fut.getNow(null);
+ } catch (Throwable e) {
+ return null;
+ }
+ }
+
+ static <T> CompletableFuture<T> doWithRetryAsync(
+ Supplier<CompletableFuture<T>> func,
+ @Nullable Predicate<T> resultValidator,
+ Predicate<RetryContext> retryPredicate) {
+ CompletableFuture<T> resFut = new CompletableFuture<>();
+ RetryContext ctx = new RetryContext();
+
+ doWithRetryAsync(func, resultValidator, retryPredicate, resFut, ctx);
+
+ return resFut;
+ }
+
+ private static <T> void doWithRetryAsync(
+ Supplier<CompletableFuture<T>> func,
+ @Nullable Predicate<T> validator,
+ Predicate<RetryContext> retryPredicate,
+ CompletableFuture<T> resFut,
+ RetryContext ctx) {
+ func.get().whenComplete((res, err) -> {
+ try {
+ if (err == null && (validator == null || validator.test(res)))
{
+ resFut.complete(res);
+ return;
+ }
+
+ if (err != null) {
+ if (ctx.errors == null) {
+ ctx.errors = new ArrayList<>();
+ }
+
+ ctx.errors.add(err);
+ }
+
+ if (retryPredicate.test(ctx)) {
+ ctx.attempt++;
+
+ doWithRetryAsync(func, validator, retryPredicate, resFut,
ctx);
+ } else {
+ if (ctx.errors == null || ctx.errors.isEmpty()) {
+ // Should not happen.
+ resFut.completeExceptionally(new
IllegalStateException("doWithRetry failed without exception"));
+ } else {
+ var resErr = ctx.errors.get(0);
+
+ for (int i = 1; i < ctx.errors.size(); i++) {
+ resErr.addSuppressed(ctx.errors.get(i));
+ }
+
+ resFut.completeExceptionally(resErr);
+ }
+ }
+ } catch (Throwable t) {
+ resFut.completeExceptionally(t);
+ }
+ });
+ }
+
+ static class RetryContext {
+ int attempt;
+
+ @Nullable ArrayList<Throwable> errors;
+
+ @Nullable Throwable lastError() {
+ return errors == null || errors.isEmpty()
+ ? null
+ : errors.get(errors.size() - 1);
+ }
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index 3e0ba81d86..bb91080436 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.client.ClientOperationType;
import org.apache.ignite.client.IgniteClientConfiguration;
@@ -47,6 +48,8 @@ public class ClientUtils {
throw IgniteException.wrap(e);
} catch (ExecutionException e) {
throw IgniteException.wrap(e);
+ } catch (CompletionException e) {
+ throw IgniteException.wrap(e.getCause());
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 04d80f52a0..d3542a7c98 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.client;
import static
org.apache.ignite.lang.ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONFIGURATION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Common.UNKNOWN_ERR;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -63,7 +62,7 @@ import org.jetbrains.annotations.Nullable;
*/
public final class ReliableChannel implements AutoCloseable {
/** Channel factory. */
- private final BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, ClientChannel> chFactory;
+ private final BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory;
/** Client channel holders for each configured address. */
private volatile List<ClientChannelHolder> channels;
@@ -113,7 +112,7 @@ public final class ReliableChannel implements AutoCloseable
{
* @param chFactory Channel factory.
* @param clientCfg Client config.
*/
- ReliableChannel(BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, ClientChannel> chFactory,
+ ReliableChannel(BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory,
IgniteClientConfiguration clientCfg) {
this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
@@ -148,10 +147,14 @@ public final class ReliableChannel implements
AutoCloseable {
List<ClusterNode> res = new ArrayList<>(channels.size());
for (var holder : nodeChannelsByName.values()) {
- var ch = holder.ch;
+ var chFut = holder.chFut;
- if (ch != null) {
- res.add(ch.protocolContext().clusterNode());
+ if (chFut != null) {
+ var ch = ClientFutureUtils.getNowSafe(chFut);
+
+ if (ch != null) {
+ res.add(ch.protocolContext().clusterNode());
+ }
}
}
@@ -178,12 +181,11 @@ public final class ReliableChannel implements
AutoCloseable {
@Nullable String preferredNodeName,
@Nullable String preferredNodeId
) {
- CompletableFuture<T> fut = new CompletableFuture<>();
-
- // Use the only one attempt to avoid blocking async method.
- handleServiceAsync(fut, opCode, payloadWriter, payloadReader,
preferredNodeName, preferredNodeId, null, 0);
-
- return fut;
+ return ClientFutureUtils.doWithRetryAsync(
+ () -> getChannelAsync(preferredNodeName, preferredNodeId)
+ .thenCompose(ch -> serviceAsyncInternal(opCode,
payloadWriter, payloadReader, ch)),
+ null,
+ ctx -> shouldRetry(opCode, ctx));
}
/**
@@ -215,15 +217,19 @@ public final class ReliableChannel implements
AutoCloseable {
return serviceAsync(opCode, null, payloadReader, null, null);
}
- private <T> void handleServiceAsync(final CompletableFuture<T> fut,
+ private <T> CompletableFuture<T> serviceAsyncInternal(
int opCode,
PayloadWriter payloadWriter,
PayloadReader<T> payloadReader,
- @Nullable String preferredNodeName,
- @Nullable String preferredNodeId,
- @Nullable IgniteClientConnectionException failure,
- int attempt) {
- ClientChannel ch = null;
+ ClientChannel ch) {
+ return ch.serviceAsync(opCode, payloadWriter,
payloadReader).whenComplete((res, err) -> {
+ if (err != null && unwrapConnectionException(err) != null) {
+ onChannelFailure(ch);
+ }
+ });
+ }
+
+ private CompletableFuture<ClientChannel> getChannelAsync(@Nullable String
preferredNodeName, @Nullable String preferredNodeId) {
ClientChannelHolder holder = null;
if (preferredNodeName != null) {
@@ -233,86 +239,16 @@ public final class ReliableChannel implements
AutoCloseable {
}
if (holder != null) {
- try {
- ch = holder.getOrCreateChannel();
- } catch (Throwable ignored) {
- // Ignore.
- }
- }
-
- if (ch == null) {
- try {
- ch = getDefaultChannel();
- } catch (Throwable ex) {
- if (failure != null) {
- failure.addSuppressed(ex);
-
- fut.completeExceptionally(failure);
-
- return;
+ return holder.getOrCreateChannelAsync().thenCompose(ch -> {
+ if (ch != null) {
+ return CompletableFuture.completedFuture(ch);
+ } else {
+ return getDefaultChannelAsync();
}
-
- fut.completeExceptionally(ex);
-
- return;
- }
+ });
}
- final ClientChannel ch0 = ch;
-
- ch0
- .serviceAsync(opCode, payloadWriter, payloadReader)
- .handle((res, err) -> {
- if (err == null) {
- fut.complete(res);
-
- return null;
- }
-
- while (err instanceof CompletionException &&
err.getCause() != null) {
- err = err.getCause();
- }
-
- IgniteClientConnectionException failure0 = failure;
-
- if (err instanceof IgniteClientConnectionException) {
- var connectionErr = (IgniteClientConnectionException)
err;
-
- try {
- // Will try to reinit channels if topology changed.
- onChannelFailure(ch0);
- } catch (Throwable ex) {
- fut.completeExceptionally(ex);
-
- return null;
- }
-
- if (failure0 == null) {
- failure0 = connectionErr;
- } else {
- failure0.addSuppressed(err);
- }
-
- if (shouldRetry(opCode, attempt, connectionErr,
failure0)) {
- log.debug("Going to retry request because of error
[opCode={}, currentAttempt={}, errMsg={}]",
- failure0, opCode, attempt,
failure0.getMessage());
-
- handleServiceAsync(fut, opCode, payloadWriter,
payloadReader, null, null, failure0, attempt + 1);
-
- return null;
- }
- } else {
- fut.completeExceptionally(err instanceof
IgniteException
- ? new CompletionException(err)
- : new IgniteException(UNKNOWN_ERR,
err.getMessage(), err));
-
- return null;
- }
-
- fut.completeExceptionally(failure0);
-
- return null;
- });
+ return getDefaultChannelAsync();
}
/**
@@ -385,11 +321,7 @@ public final class ReliableChannel implements
AutoCloseable {
/**
* On channel of the specified holder failure.
*/
- private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
- if (ch != null && ch == hld.ch) {
- hld.closeChannel();
- }
-
+ private void onChannelFailure(ClientChannelHolder hld, @Nullable
ClientChannel ch) {
chFailLsnrs.forEach(Runnable::run);
// Roll current channel even if a topology changes. To help find
working channel faster.
@@ -421,7 +353,7 @@ public final class ReliableChannel implements AutoCloseable
{
*
* @return boolean wheter channels was reinited.
*/
- synchronized boolean initChannelHolders() {
+ private synchronized boolean initChannelHolders() {
List<ClientChannelHolder> holders = channels;
// Enable parallel threads to schedule new init of channel holders.
@@ -529,86 +461,93 @@ public final class ReliableChannel implements
AutoCloseable {
/**
* Init channel holders, establish connection to default channel.
*/
- CompletableFuture<Void> channelsInitAsync() {
+ CompletableFuture<ClientChannel> channelsInitAsync() {
// Do not establish connections if interrupted.
if (!initChannelHolders()) {
return CompletableFuture.completedFuture(null);
}
// Establish default channel connection.
- getDefaultChannel();
+ var fut = getDefaultChannelAsync();
// Establish secondary connections in the background.
- initAllChannelsAsync();
+ fut.thenAccept(unused -> initAllChannelsAsync());
- // TODO: Async startup IGNITE-15357.
- return CompletableFuture.completedFuture(null);
+ return fut;
}
/**
* Gets the default channel, reconnecting if necessary.
*/
- private ClientChannel getDefaultChannel() {
- IgniteClientConnectionException failure = null;
+ private CompletableFuture<ClientChannel> getDefaultChannelAsync() {
+ return ClientFutureUtils.doWithRetryAsync(
+ () -> {
+ curChannelsGuard.readLock().lock();
- for (int attempt = 0; ; attempt++) {
- ClientChannelHolder hld = null;
- ClientChannel c = null;
+ ClientChannelHolder hld;
- try {
- if (closed) {
- throw new IgniteClientConnectionException(CONNECTION_ERR,
"Channel is closed");
- }
-
- curChannelsGuard.readLock().lock();
+ try {
+ hld = channels.get(curChIdx);
+ } finally {
+ curChannelsGuard.readLock().unlock();
+ }
- try {
- hld = channels.get(curChIdx);
- } finally {
- curChannelsGuard.readLock().unlock();
- }
+ return hld.getOrCreateChannelAsync();
+ },
+ Objects::nonNull,
+ ctx -> shouldRetry(ClientOperationType.CHANNEL_CONNECT, ctx));
+ }
- c = hld.getOrCreateChannel();
+ private CompletableFuture<ClientChannel> getCurChannelAsync() {
+ if (closed) {
+ return CompletableFuture.failedFuture(new
IgniteClientConnectionException(CONNECTION_ERR, "Channel is closed"));
+ }
- if (c != null) {
- return c;
- }
- } catch (IgniteClientConnectionException e) {
- if (failure == null) {
- failure = e;
- } else {
- failure.addSuppressed(e);
- }
+ curChannelsGuard.readLock().lock();
- onChannelFailure(hld, c);
+ try {
+ var hld = channels.get(curChIdx);
- if (!shouldRetry(ClientOperationType.CHANNEL_CONNECT, attempt,
e, failure)) {
- break;
- }
+ if (hld == null) {
+ return CompletableFuture.completedFuture(null);
}
- }
- throw new IgniteClientConnectionException(CONNECTION_ERR, "Failed to
connect", failure);
+ CompletableFuture<ClientChannel> fut =
hld.getOrCreateChannelAsync();
+ return fut == null ? CompletableFuture.completedFuture(null) : fut;
+ } finally {
+ curChannelsGuard.readLock().unlock();
+ }
}
/** Determines whether specified operation should be retried. */
- private boolean shouldRetry(int opCode, int iteration,
IgniteClientConnectionException exception,
- IgniteClientConnectionException
aggregateException) {
+ private boolean shouldRetry(int opCode, ClientFutureUtils.RetryContext
ctx) {
ClientOperationType opType =
ClientUtils.opCodeToClientOperationType(opCode);
- return shouldRetry(opType, iteration, exception, aggregateException);
+ return shouldRetry(opType, ctx);
}
/** Determines whether specified operation should be retried. */
- private boolean shouldRetry(ClientOperationType opType, int iteration,
IgniteClientConnectionException exception,
- IgniteClientConnectionException
aggregateException) {
+ private boolean shouldRetry(@Nullable ClientOperationType opType,
ClientFutureUtils.RetryContext ctx) {
+ var err = ctx.lastError();
+
+ if (err == null) {
+ // Closed channel situation - no error, but connection should be
retried.
+ return opType == ClientOperationType.CHANNEL_CONNECT;
+ }
+
+ IgniteClientConnectionException exception =
unwrapConnectionException(err);
+
+ if (exception == null) {
+ return false;
+ }
+
if (exception.code() == CLUSTER_ID_MISMATCH_ERR) {
return false;
}
if (opType == null) {
// System operation.
- return iteration < RetryLimitPolicy.DFLT_RETRY_LIMIT;
+ return ctx.attempt < RetryLimitPolicy.DFLT_RETRY_LIMIT;
}
RetryPolicy plc = clientCfg.retryPolicy();
@@ -617,14 +556,17 @@ public final class ReliableChannel implements
AutoCloseable {
return false;
}
- RetryPolicyContext ctx = new RetryPolicyContextImpl(clientCfg, opType,
iteration, exception);
+ RetryPolicyContext retryPolicyContext = new
RetryPolicyContextImpl(clientCfg, opType, ctx.attempt, exception);
- try {
- return plc.shouldRetry(ctx);
- } catch (Throwable t) {
- aggregateException.addSuppressed(t);
- return false;
+ // Exception in shouldRetry will be handled by
ClientFutureUtils.doWithRetryAsync
+ boolean shouldRetry = plc.shouldRetry(retryPolicyContext);
+
+ if (shouldRetry) {
+ log.debug("Going to retry operation because of error [op={},
currentAttempt={}, errMsg={}]",
+ exception, opType, ctx.attempt, exception.getMessage());
}
+
+ return shouldRetry;
}
/**
@@ -641,7 +583,7 @@ public final class ReliableChannel implements AutoCloseable
{
}
try {
- hld.getOrCreateChannel(true);
+ hld.getOrCreateChannelAsync(true);
} catch (Exception e) {
log.warn("Failed to establish connection to " +
hld.chCfg.getAddress() + ": " + e.getMessage(), e);
}
@@ -655,7 +597,9 @@ public final class ReliableChannel implements AutoCloseable
{
// This could be solved with a cluster-wide AssignmentVersion, but we
don't have that.
// So we only react to updates from the default channel. When no
user-initiated operations are performed on the default
// channel, heartbeat messages will trigger updates.
- if (clientChannel == channels.get(curChIdx).ch) {
+ CompletableFuture<ClientChannel> ch = channels.get(curChIdx).chFut;
+
+ if (ch != null && clientChannel == ClientFutureUtils.getNowSafe(ch)) {
assignmentVersion.incrementAndGet();
}
}
@@ -669,6 +613,19 @@ public final class ReliableChannel implements
AutoCloseable {
return assignmentVersion.get();
}
+ @Nullable
+ private static IgniteClientConnectionException
unwrapConnectionException(Throwable err) {
+ while (err instanceof CompletionException) {
+ err = err.getCause();
+ }
+
+ if (!(err instanceof IgniteClientConnectionException)) {
+ return null;
+ }
+
+ return (IgniteClientConnectionException) err;
+ }
+
/**
* Channels holder.
*/
@@ -678,7 +635,7 @@ public final class ReliableChannel implements AutoCloseable
{
private final ClientChannelConfiguration chCfg;
/** Channel. */
- private volatile ClientChannel ch;
+ private volatile @Nullable CompletableFuture<ClientChannel> chFut;
/** The last server node that channel is or was connected to. */
private volatile ClusterNode serverNode;
@@ -729,46 +686,55 @@ public final class ReliableChannel implements
AutoCloseable {
/**
* Get or create channel.
*/
- private ClientChannel getOrCreateChannel() {
- return getOrCreateChannel(false);
+ private CompletableFuture<ClientChannel> getOrCreateChannelAsync() {
+ return getOrCreateChannelAsync(false);
}
/**
* Get or create channel.
*/
- private ClientChannel getOrCreateChannel(boolean ignoreThrottling) {
- if (ch == null && !close) {
- synchronized (this) {
- if (close) {
- return null;
- }
+ private CompletableFuture<ClientChannel>
getOrCreateChannelAsync(boolean ignoreThrottling) {
+ if (close) {
+ return CompletableFuture.completedFuture(null);
+ }
- if (ch != null) {
- return ch;
- }
+ var chFut0 = chFut;
- if (!ignoreThrottling && applyReconnectionThrottling()) {
- //noinspection
NonPrivateFieldAccessedInSynchronizedContext
- throw new
IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due
to applied throttling");
- }
+ if (isFutureInProgressOrDoneAndChannelOpen(chFut0)) {
+ return chFut0;
+ }
- ClientChannel ch0 = chFactory.apply(chCfg, connMgr);
+ synchronized (this) {
+ if (close) {
+ return CompletableFuture.completedFuture(null);
+ }
- var oldClusterId = clusterId.compareAndExchange(null,
ch0.protocolContext().clusterId());
+ chFut0 = chFut;
- if (oldClusterId != null &&
!oldClusterId.equals(ch0.protocolContext().clusterId())) {
+ if (isFutureInProgressOrDoneAndChannelOpen(chFut0)) {
+ return chFut0;
+ }
+
+ if (!ignoreThrottling && applyReconnectionThrottling()) {
+ return CompletableFuture.failedFuture(
+ new
IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due
to applied throttling"));
+ }
+
+ chFut0 = chFactory.apply(chCfg, connMgr).thenApply(ch -> {
+ var oldClusterId = clusterId.compareAndExchange(null,
ch.protocolContext().clusterId());
+
+ if (oldClusterId != null &&
!oldClusterId.equals(ch.protocolContext().clusterId())) {
try {
- ch0.close();
+ ch.close();
} catch (Exception ignored) {
// Ignore
}
- throw new
IgniteClientConnectionException(CLUSTER_ID_MISMATCH_ERR, "Cluster ID mismatch:
expected=" + oldClusterId
- + ", actual=" +
ch0.protocolContext().clusterId());
+ throw new IgniteClientConnectionException(
+ CLUSTER_ID_MISMATCH_ERR,
+ "Cluster ID mismatch: expected=" +
oldClusterId + ", actual=" + ch.protocolContext().clusterId());
}
- ch = ch0;
-
ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged);
ClusterNode newNode = ch.protocolContext().clusterNode();
@@ -786,22 +752,39 @@ public final class ReliableChannel implements
AutoCloseable {
}
serverNode = newNode;
- }
- }
- return ch;
+ return ch;
+ });
+
+ chFut0.exceptionally(err -> {
+ closeChannel();
+ onChannelFailure(this, null);
+
+ log.warn("Failed to establish connection to " +
chCfg.getAddress() + ": " + err.getMessage(), err);
+
+ return null;
+ });
+
+ chFut = chFut0;
+
+ return chFut0;
+ }
}
/**
* Close channel.
*/
private synchronized void closeChannel() {
- if (ch != null) {
- try {
- ch.close();
- } catch (Exception ignored) {
- // No op.
- }
+ CompletableFuture<ClientChannel> ch0 = chFut;
+
+ if (ch0 != null) {
+ ch0.thenAccept(c -> {
+ try {
+ c.close();
+ } catch (Exception ignored) {
+ // No-op.
+ }
+ });
var oldServerNode = serverNode;
@@ -810,7 +793,7 @@ public final class ReliableChannel implements AutoCloseable
{
nodeChannelsById.remove(oldServerNode.id(), this);
}
- ch = null;
+ chFut = null;
}
}
@@ -829,5 +812,19 @@ public final class ReliableChannel implements
AutoCloseable {
closeChannel();
}
+
+ private boolean isFutureInProgressOrDoneAndChannelOpen(@Nullable
CompletableFuture<ClientChannel> f) {
+ if (f == null || f.isCompletedExceptionally()) {
+ return false;
+ }
+
+ if (!f.isDone()) {
+ return true;
+ }
+
+ var ch = f.getNow(null);
+
+ return ch != null && !ch.closed();
+ }
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 6f7bde9e2c..1036863386 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -75,11 +75,14 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** Minimum supported heartbeat interval. */
private static final long MIN_RECOMMENDED_HEARTBEAT_INTERVAL = 500;
+ /** Config. */
+ private final ClientChannelConfiguration cfg;
+
/** Protocol context. */
private volatile ProtocolContext protocolCtx;
/** Channel. */
- private final ClientConnection sock;
+ private volatile ClientConnection sock;
/** Request id. */
private final AtomicLong reqId = new AtomicLong(1);
@@ -103,7 +106,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
private final long heartbeatTimeout;
/** Heartbeat timer. */
- private final Timer heartbeatTimer;
+ private volatile Timer heartbeatTimer;
/** Logger. */
private final IgniteLogger log;
@@ -115,10 +118,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
* Constructor.
*
* @param cfg Config.
- * @param connMgr Connection multiplexer.
*/
- TcpClientChannel(ClientChannelConfiguration cfg,
ClientConnectionMultiplexer connMgr) {
+ private TcpClientChannel(ClientChannelConfiguration cfg) {
validateConfiguration(cfg);
+ this.cfg = cfg;
log = ClientUtils.logger(cfg.clientConfiguration(),
TcpClientChannel.class);
@@ -128,14 +131,40 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
connectTimeout = cfg.clientConfiguration().connectTimeout();
heartbeatTimeout = cfg.clientConfiguration().heartbeatTimeout();
+ }
- sock = connMgr.open(cfg.getAddress(), this, this);
-
- handshake(DEFAULT_VERSION);
+ private CompletableFuture<ClientChannel>
initAsync(ClientConnectionMultiplexer connMgr) {
+ return connMgr
+ .openAsync(cfg.getAddress(), this, this)
+ .thenCompose(s -> {
+ sock = s;
+
+ return handshakeAsync(DEFAULT_VERSION);
+ })
+ .whenComplete((res, err) -> {
+ if (err != null) {
+ close();
+ }
+ })
+ .thenApplyAsync(unused -> {
+ // Netty has a built-in IdleStateHandler to detect idle
connections (used on the server side).
+ // However, to adjust the heartbeat interval dynamically,
we have to use a timer here.
+ heartbeatTimer =
initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
+
+ return this;
+ }, asyncContinuationExecutor);
+ }
- // Netty has a built-in IdleStateHandler to detect idle connections
(used on the server side).
- // However, to adjust the heartbeat interval dynamically, we have to
use a timer here.
- heartbeatTimer =
initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
+ /**
+ * Creates a new channel asynchronously.
+ *
+ * @param cfg Configuration.
+ * @param connMgr Connection manager.
+ * @return Channel.
+ */
+ static CompletableFuture<ClientChannel>
createAsync(ClientChannelConfiguration cfg, ClientConnectionMultiplexer
connMgr) {
+ //noinspection resource - returned from method.
+ return new TcpClientChannel(cfg).initAsync(connMgr);
}
/** {@inheritDoc} */
@@ -178,6 +207,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** {@inheritDoc} */
@Override
public void onDisconnected(@Nullable Exception e) {
+ log.debug("Disconnected from server: " + cfg.getAddress());
close(e);
}
@@ -382,29 +412,31 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
/** Client handshake. */
- private void handshake(ProtocolVersion ver)
+ private CompletableFuture<Void> handshakeAsync(ProtocolVersion ver)
throws IgniteClientConnectionException {
ClientRequestFuture fut = new ClientRequestFuture();
pendingReqs.put(-1L, fut);
- try {
- handshakeReq(ver);
-
- // handshakeRes must be called even in case of timeout to release
the buffer.
- var resFut = fut.thenAccept(res -> handshakeRes(res, ver));
-
- if (connectTimeout > 0) {
- resFut.get(connectTimeout, TimeUnit.MILLISECONDS);
- } else {
- resFut.get();
+ handshakeReqAsync(ver).addListener(f -> {
+ if (!f.isSuccess()) {
+ fut.completeExceptionally(
+ new IgniteClientConnectionException(CONNECTION_ERR,
"Failed to send handshake request", f.cause()));
}
- } catch (Throwable e) {
- throw IgniteException.wrap(e);
+ });
+
+ if (connectTimeout > 0) {
+ fut.orTimeout(connectTimeout, TimeUnit.MILLISECONDS);
}
+
+ return fut.thenCompose(res -> handshakeRes(res, ver));
}
- /** Send handshake request. */
- private void handshakeReq(ProtocolVersion proposedVer) {
+ /**
+ * Send handshake request.
+ *
+ * @return Channel future.
+ */
+ private ChannelFuture handshakeReqAsync(ProtocolVersion proposedVer) {
sock.send(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
var req = new ClientMessagePacker(sock.getBuffer());
@@ -417,11 +449,11 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
req.packBinaryHeader(0); // Features.
req.packMapHeader(0); // Extensions.
- write(req).syncUninterruptibly();
+ return write(req);
}
/** Receive and handle handshake response. */
- private void handshakeRes(ClientMessageUnpacker unpacker, ProtocolVersion
proposedVer) {
+ private CompletableFuture<Void> handshakeRes(ClientMessageUnpacker
unpacker, ProtocolVersion proposedVer) {
try (unpacker) {
ProtocolVersion srvVer = new
ProtocolVersion(unpacker.unpackShort(), unpacker.unpackShort(),
unpacker.unpackShort());
@@ -429,8 +461,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (!unpacker.tryUnpackNil()) {
if (!proposedVer.equals(srvVer) &&
supportedVers.contains(srvVer)) {
// Retry with server version.
- handshake(srvVer);
- return;
+ return handshakeAsync(srvVer);
}
throw readError(unpacker);
@@ -451,6 +482,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
protocolCtx = new ProtocolContext(
srvVer, ProtocolBitmaskFeature.allFeaturesAsEnumSet(),
serverIdleTimeout, clusterNode, clusterId);
+
+ return CompletableFuture.completedFuture(null);
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index efc3227dde..8474f280ec 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -68,7 +68,7 @@ public class TcpIgniteClient implements IgniteClient {
* @param cfg Config.
*/
private TcpIgniteClient(IgniteClientConfiguration cfg) {
- this(TcpClientChannel::new, cfg);
+ this(TcpClientChannel::createAsync, cfg);
}
/**
@@ -78,7 +78,7 @@ public class TcpIgniteClient implements IgniteClient {
* @param cfg Config.
*/
private TcpIgniteClient(
- BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, ClientChannel> chFactory,
+ BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory,
IgniteClientConfiguration cfg
) {
assert chFactory != null;
@@ -98,7 +98,7 @@ public class TcpIgniteClient implements IgniteClient {
*
* @return Future representing pending completion of the operation.
*/
- public CompletableFuture<Void> initAsync() {
+ private CompletableFuture<ClientChannel> initAsync() {
return ch.channelsInitAsync();
}
@@ -109,6 +109,7 @@ public class TcpIgniteClient implements IgniteClient {
* @return Future representing pending completion of the operation.
*/
public static CompletableFuture<IgniteClient>
startAsync(IgniteClientConfiguration cfg) {
+ //noinspection resource: returned from method
var client = new TcpIgniteClient(cfg);
return client.initAsync().thenApply(x -> client);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
index c20cdc5f2c..b457177374 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/ClientConnectionMultiplexer.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client.io;
import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.client.IgniteClientConnectionException;
@@ -46,7 +47,7 @@ public interface ClientConnectionMultiplexer {
* @return Created connection.
* @throws IgniteClientConnectionException when connection can't be
established.
*/
- ClientConnection open(
+ CompletableFuture<ClientConnection> openAsync(
InetSocketAddress addr,
ClientMessageHandler msgHnd,
ClientConnectionStateHandler stateHnd)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index 9beb0415ef..24b8acc613 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.client.io.netty;
-import static org.apache.ignite.lang.ErrorGroups.Common.UNKNOWN_ERR;
-
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@@ -27,6 +25,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.internal.client.io.ClientConnection;
@@ -34,6 +33,7 @@ import
org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
import org.apache.ignite.internal.client.io.ClientMessageHandler;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
+import org.apache.ignite.lang.ErrorGroups.Client;
/**
* Netty-based multiplexer.
@@ -83,17 +83,31 @@ public class NettyClientConnectionMultiplexer implements
ClientConnectionMultipl
/** {@inheritDoc} */
@Override
- public ClientConnection open(InetSocketAddress addr,
+ public CompletableFuture<ClientConnection> openAsync(InetSocketAddress
addr,
ClientMessageHandler msgHnd,
ClientConnectionStateHandler stateHnd)
throws IgniteClientConnectionException {
- try {
- // TODO: Async startup IGNITE-15357.
- ChannelFuture f = bootstrap.connect(addr).syncUninterruptibly();
+ CompletableFuture<ClientConnection> fut = new CompletableFuture<>();
- return new NettyClientConnection(f.channel(), msgHnd, stateHnd);
- } catch (Throwable t) {
- throw new IgniteClientConnectionException(UNKNOWN_ERR,
t.getMessage(), t);
- }
+ ChannelFuture connectFut = bootstrap.connect(addr);
+
+ connectFut.addListener(f -> {
+ if (f.isSuccess()) {
+ NettyClientConnection conn = new
NettyClientConnection(((ChannelFuture) f).channel(), msgHnd, stateHnd);
+
+ fut.complete(conn);
+ } else {
+ Throwable cause = f.cause();
+
+ var err = new IgniteClientConnectionException(
+ Client.CONNECTION_ERR,
+ "Client failed to connect: " + cause.getMessage(),
+ cause);
+
+ fut.completeExceptionally(err);
+ }
+ });
+
+ return fut;
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
index 3d6b9325f8..b7b49751e5 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
@@ -27,8 +27,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.Test;
@@ -158,15 +160,20 @@ public class ConfigurationTest extends AbstractClientTest
{
public void testCustomAsyncContinuationExecutor() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
- IgniteClient.Builder builder = IgniteClient.builder()
+ var builderThreadName = new AtomicReference<String>();
+
+ CompletableFuture<IgniteClient> builder = IgniteClient.builder()
.addresses("127.0.0.1:" + serverPort)
- .asyncContinuationExecutor(executor);
+ .asyncContinuationExecutor(executor)
+ .buildAsync()
+ .whenComplete((res, err) ->
builderThreadName.set(Thread.currentThread().getName()));
- try (IgniteClient ignite = builder.build()) {
+ try (IgniteClient ignite = builder.join()) {
String threadName = ignite.tables().tablesAsync().thenApply(unused
-> Thread.currentThread().getName()).join();
assertEquals(executor,
ignite.configuration().asyncContinuationExecutor());
assertThat(threadName, startsWith("pool-"));
+ assertThat(builderThreadName.get(), startsWith("pool-"));
}
executor.shutdown();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index 7c6f03aba8..f294e9be1b 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -92,6 +92,8 @@ public class ConnectionTest extends AbstractClientTest {
}
private static void testConnection(String... addrs) throws Exception {
- AbstractClientTest.startClient(addrs).close();
+ IgniteClient c = AbstractClientTest.startClient(addrs);
+
+ c.close();
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index 8a797702a9..c54d6365bf 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -17,8 +17,8 @@
package org.apache.ignite.client;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.UUID;
import java.util.function.Function;
@@ -35,15 +35,17 @@ public class HeartbeatTest {
public void testHeartbeatLongerThanIdleTimeoutCausesDisconnect() throws
Exception {
try (var srv = new TestServer(10800, 10, 50, new FakeIgnite())) {
int srvPort = srv.port();
+ var loggerFactory = new TestLoggerFactory("client");
Builder builder = IgniteClient.builder()
.addresses("127.0.0.1:" + srvPort)
- .retryPolicy(null);
-
- try (var client = builder.build()) {
- Thread.sleep(300);
+ .retryPolicy(null)
+ .loggerFactory(loggerFactory);
- assertThrows(IgniteClientConnectionException.class, () ->
client.tables().tables());
+ try (var ignored = builder.build()) {
+ IgniteTestUtils.waitForCondition(
+ () ->
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Disconnected
from server")),
+ 1000);
}
}
}
@@ -65,6 +67,7 @@ public class HeartbeatTest {
}
}
+ @SuppressWarnings("ThrowableNotThrown")
@Test
public void testInvalidHeartbeatIntervalThrows() throws Exception {
try (var srv = new TestServer(10800, 10, 300, new FakeIgnite())) {
@@ -73,9 +76,7 @@ public class HeartbeatTest {
.addresses("127.0.0.1:" + srv.port())
.heartbeatInterval(-50);
- Throwable ex = assertThrows(IllegalArgumentException.class,
builder::build);
-
- assertEquals("Negative delay.", ex.getMessage());
+ assertThrowsWithCause(builder::build,
IllegalArgumentException.class, "Negative delay.");
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
index 7955e58739..b94c2ae023 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
@@ -27,7 +27,6 @@ import java.util.UUID;
import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.hamcrest.CoreMatchers;
import org.jetbrains.annotations.Nullable;
@@ -96,13 +95,7 @@ public class MultiClusterTest {
IgniteClientConnectionException ex =
(IgniteClientConnectionException) assertThrowsWithCause(
() -> client.tables().tables(),
IgniteClientConnectionException.class, "Cluster ID mismatch");
- IgniteClientConnectionException cause =
(IgniteClientConnectionException) ExceptionUtils.getSuppressedList(ex).stream()
- .filter(e -> e.getCause().getMessage().contains("Cluster
ID mismatch"))
- .findFirst()
- .orElseThrow()
- .getCause();
-
- assertEquals(CLUSTER_ID_MISMATCH_ERR, cause.code());
+ assertEquals(CLUSTER_ID_MISMATCH_ERR, ex.code());
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index caa97fc5bf..163f95c5ff 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -93,6 +93,6 @@ public class ReconnectTest {
server.close();
- assertThrowsWithCause(() -> client.tables().tables(),
IgniteClientConnectionException.class, "Channel is closed");
+ assertThrowsWithCause(() -> client.tables().tables(),
IgniteClientConnectionException.class);
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index cbcde03218..5029a38525 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -244,12 +244,9 @@ public class RetryPolicyTest {
try (var client = getClient(plc)) {
IgniteException ex = assertThrows(IgniteException.class, () ->
client.tables().tables());
+ var cause = (RuntimeException) ex.getCause();
- var cause = (IgniteClientConnectionException) ex.getCause();
- Throwable[] suppressed = cause.getSuppressed();
-
- assertEquals("TestRetryPolicy exception.",
suppressed[0].getMessage());
- assertEquals(1, suppressed.length);
+ assertEquals("TestRetryPolicy exception.", cause.getMessage());
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
new file mode 100644
index 0000000000..d78e02caed
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/internal/client/ClientFutureUtilsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Future utils test.
+ */
+public class ClientFutureUtilsTest {
+ @Test
+ public void testGetNowSafe() {
+
assertNull(ClientFutureUtils.getNowSafe(CompletableFuture.completedFuture(null)));
+
assertNull(ClientFutureUtils.getNowSafe(CompletableFuture.failedFuture(new
Exception("fail"))));
+ assertNull(ClientFutureUtils.getNowSafe(new CompletableFuture<>()));
+ assertEquals("test",
ClientFutureUtils.getNowSafe(CompletableFuture.completedFuture("test")));
+ }
+
+ @Test
+ public void testDoWithRetryAsyncWithCompletedFutureReturnsResult() {
+ var res = ClientFutureUtils.doWithRetryAsync(
+ () -> CompletableFuture.completedFuture("test"),
+ null,
+ ctx -> false
+ ).join();
+
+ assertEquals("test", res);
+ }
+
+ @Test
+ public void
testDoWithRetryAsyncWithResultValidatorRejectsAllThrowsIllegalState() {
+ var fut = ClientFutureUtils.doWithRetryAsync(
+ () -> CompletableFuture.completedFuture("test"),
+ x -> false,
+ ctx -> false
+ );
+
+ var ex = assertThrows(CompletionException.class, fut::join);
+ assertSame(IllegalStateException.class, ex.getCause().getClass());
+ }
+
+ @Test
+ public void
testDoWithRetryAsyncWithFailedFutureReturnsExceptionWithSuppressedList() {
+ var counter = new AtomicInteger();
+
+ var fut = ClientFutureUtils.doWithRetryAsync(
+ () -> CompletableFuture.failedFuture(new Exception("fail_" +
counter.get())),
+ null,
+ ctx -> counter.incrementAndGet() < 3
+ );
+
+ var completionEx = assertThrows(CompletionException.class, fut::join);
+ var ex = (Exception) completionEx.getCause();
+
+ assertEquals(2, ex.getSuppressed().length);
+
+ assertEquals("fail_0", ex.getMessage());
+ assertEquals("fail_1", ex.getSuppressed()[0].getMessage());
+ assertEquals("fail_2", ex.getSuppressed()[1].getMessage());
+ }
+
+ @Test
+ public void testDoWithRetryAsyncSucceedsAfterRetries() {
+ var counter = new AtomicInteger();
+
+ var fut = ClientFutureUtils.doWithRetryAsync(
+ () -> counter.getAndIncrement() < 3
+ ? CompletableFuture.failedFuture(new Exception("fail"))
+ : CompletableFuture.completedFuture("test"),
+ null,
+ ctx -> {
+ assertNotNull(ctx.lastError());
+
+ //noinspection DataFlowIssue
+ assertEquals("fail", ctx.lastError().getMessage());
+
+ return ctx.attempt < 5;
+ }
+ );
+
+ assertEquals("test", fut.join());
+ }
+
+ @Test
+ public void
testDoWithRetryAsyncWithExceptionInDelegateReturnsFailedFuture() {
+ var counter = new AtomicInteger();
+
+ var fut = ClientFutureUtils.doWithRetryAsync(
+ () -> {
+ if (counter.incrementAndGet() > 1) {
+ throw new RuntimeException("fail1");
+ } else {
+ return CompletableFuture.failedFuture(new
Exception("fail2"));
+ }
+ },
+ null,
+ ctx -> true
+ );
+
+ var ex = assertThrows(CompletionException.class, fut::join);
+ assertEquals("fail1", ex.getCause().getMessage());
+ }
+}