This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5ec812d4e2 IGNITE-18809 Java client: Reconnect secondary endpoints in
background (#1786)
5ec812d4e2 is described below
commit 5ec812d4e2315cd1e4449370842db6a573c9d198
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Mar 13 22:28:35 2023 +0300
IGNITE-18809 Java client: Reconnect secondary endpoints in background
(#1786)
Add `reconnectInterval` property to client configuration, which controls
periodic background connect/repair of all known endpoints.
---
.../org/apache/ignite/client/IgniteClient.java | 28 +++++++
.../ignite/client/IgniteClientConfiguration.java | 23 +++++-
.../client/IgniteClientConfigurationImpl.java | 12 +++
.../ignite/internal/client/ReliableChannel.java | 44 ++++++-----
.../org/apache/ignite/client/HeartbeatTest.java | 16 ++--
.../org/apache/ignite/client/ReconnectTest.java | 88 +++++++++++++++++++++-
.../org/apache/ignite/client/RetryPolicyTest.java | 2 +-
7 files changed, 184 insertions(+), 29 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 29cbf2137a..9be68d90be 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -20,6 +20,7 @@ package org.apache.ignite.client;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_TIMEOUT;
+import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_INTERVAL;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD;
import static
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES;
import static org.apache.ignite.internal.client.ClientUtils.sync;
@@ -82,6 +83,9 @@ public interface IgniteClient extends Ignite {
/** Reconnect throttling retries. */
private int reconnectThrottlingRetries =
DFLT_RECONNECT_THROTTLING_RETRIES;
+ /** Reconnect interval, in milliseconds. */
+ private long reconnectInterval = DFLT_RECONNECT_INTERVAL;
+
/** Async continuation executor. */
private Executor asyncContinuationExecutor;
@@ -211,6 +215,29 @@ public interface IgniteClient extends Ignite {
return this;
}
+ /**
+ * Sets the reconnect interval, in milliseconds. Set to {@code 0} to
disable background reconnect.
+ *
+ * <p>Ignite balances requests across all healthy connections (when
multiple endpoints are configured).
+ * Ignite also repairs connections on demand (when a request is made).
+ * However, "secondary" connections can be lost (due to network
issues, or node restarts). This property controls how ofter Ignite
+ * client will check all configured endpoints and try to reconnect
them in case of failure.
+ *
+ * @param reconnectInterval Reconnect interval, in milliseconds.
+ * @return This instance.
+ * @throws IllegalArgumentException When value is less than zero.
+ */
+ public Builder reconnectInterval(long reconnectInterval) {
+ if (reconnectInterval < 0) {
+ throw new IllegalArgumentException("reconnectInterval ["
+ + reconnectInterval + "] must be a non-negative
integer value.");
+ }
+
+ this.reconnectInterval = reconnectInterval;
+
+ return this;
+ }
+
/**
* Sets the async continuation executor.
*
@@ -296,6 +323,7 @@ public interface IgniteClient extends Ignite {
connectTimeout,
reconnectThrottlingPeriod,
reconnectThrottlingRetries,
+ reconnectInterval,
asyncContinuationExecutor,
heartbeatInterval,
heartbeatTimeout,
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 646c9a86cd..2511e8841c 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -42,12 +42,15 @@ public interface IgniteClientConfiguration {
/** Default heartbeat interval, in milliseconds. */
int DFLT_HEARTBEAT_INTERVAL = 30_000;
- /** Default reconnect throttling period. */
+ /** Default reconnect throttling period, in milliseconds. */
long DFLT_RECONNECT_THROTTLING_PERIOD = 30_000L;
/** Default reconnect throttling retries. */
int DFLT_RECONNECT_THROTTLING_RETRIES = 3;
+ /** Default reconnect interval, in milliseconds. */
+ long DFLT_RECONNECT_INTERVAL = 30_000L;
+
/**
* Gets the address finder.
*
@@ -60,6 +63,9 @@ public interface IgniteClientConfiguration {
* port is not set then Ignite will generate multiple addresses for
default port range. See {@link IgniteClientConfiguration#DFLT_PORT},
* {@link IgniteClientConfiguration#DFLT_PORT_RANGE}.
*
+ * <p>Providing addresses of multiple nodes in the cluster will improve
performance: Ignite will balance requests across all
+ * connections, and use partition awareness to send key-based requests
directly to the primary node.
+ *
* @return Addresses.
*/
String[] addresses();
@@ -93,10 +99,23 @@ public interface IgniteClientConfiguration {
*/
int reconnectThrottlingRetries();
+ /**
+ * Gets the background reconnect interval, in milliseconds. Set to {@code
0} to disable background reconnect.
+ * Default is {@link #DFLT_RECONNECT_INTERVAL}.
+ *
+ * <p>Ignite balances requests across all healthy connections (when
multiple endpoints are configured).
+ * Ignite also repairs connections on demand (when a request is made).
+ * However, "secondary" connections can be lost (due to network issues, or
node restarts). This property controls how ofter Ignite
+ * client will check all configured endpoints and try to reconnect them in
case of failure.
+ *
+ * @return Background reconnect interval, in milliseconds.
+ */
+ long reconnectInterval();
+
/**
* Gets the async continuation executor.
*
- * <p>When <code>null</code> (default), {@link ForkJoinPool#commonPool()}
is used.
+ * <p>When {@code null} (default), {@link ForkJoinPool#commonPool()} is
used.
*
* <p>When async client operation completes, corresponding {@link
java.util.concurrent.CompletableFuture} continuations
* (such as {@link
java.util.concurrent.CompletableFuture#thenApply(Function)}) will be invoked
using this executor.
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index 777e6dcd9d..b665722f49 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -44,6 +44,9 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
/** Reconnect throttling retries. */
private final int reconnectThrottlingRetries;
+ /** Reconnect interval, in milliseconds. */
+ private final long reconnectInterval;
+
/** Async continuation executor. */
private final Executor asyncContinuationExecutor;
@@ -68,6 +71,7 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
* @param connectTimeout Socket connect timeout.
* @param reconnectThrottlingPeriod Reconnect throttling period, in
milliseconds.
* @param reconnectThrottlingRetries Reconnect throttling retries.
+ * @param reconnectInterval Reconnect throttling retries.
* @param asyncContinuationExecutor Async continuation executor.
* @param heartbeatInterval Heartbeat message interval.
* @param heartbeatTimeout Heartbeat message timeout.
@@ -80,6 +84,7 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
long connectTimeout,
long reconnectThrottlingPeriod,
int reconnectThrottlingRetries,
+ long reconnectInterval,
Executor asyncContinuationExecutor,
long heartbeatInterval,
long heartbeatTimeout,
@@ -95,6 +100,7 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
this.connectTimeout = connectTimeout;
this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
this.reconnectThrottlingRetries = reconnectThrottlingRetries;
+ this.reconnectInterval = reconnectInterval;
this.asyncContinuationExecutor = asyncContinuationExecutor;
this.heartbeatInterval = heartbeatInterval;
this.heartbeatTimeout = heartbeatTimeout;
@@ -133,6 +139,12 @@ public final class IgniteClientConfigurationImpl
implements IgniteClientConfigur
return reconnectThrottlingRetries;
}
+ /** {@inheritDoc} */
+ @Override
+ public long reconnectInterval() {
+ return reconnectInterval;
+ }
+
/** {@inheritDoc} */
@Override
public @Nullable Executor asyncContinuationExecutor() {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 65b03d33d3..09fb936c88 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -156,7 +157,7 @@ public final class ReliableChannel implements AutoCloseable
{
if (chFut != null) {
var ch = ClientFutureUtils.getNowSafe(chFut);
- if (ch != null) {
+ if (ch != null && !ch.closed()) {
res.add(ch.protocolContext().clusterNode());
}
}
@@ -484,7 +485,7 @@ public final class ReliableChannel implements AutoCloseable
{
var fut = getDefaultChannelAsync();
// Establish secondary connections in the background.
- fut.thenAccept(unused -> initAllChannelsAsync());
+ fut.thenAccept(unused ->
ForkJoinPool.commonPool().submit(this::initAllChannelsAsync));
return fut;
}
@@ -606,26 +607,33 @@ public final class ReliableChannel implements
AutoCloseable {
}
/**
- * Asynchronously try to establish a connection to all configured servers.
+ * Establish or repair connections to all configured servers.
*/
private void initAllChannelsAsync() {
- ForkJoinPool.commonPool().submit(
- () -> {
- List<ClientChannelHolder> holders = channels;
+ List<ClientChannelHolder> holders = channels;
+ List<CompletableFuture<ClientChannel>> futs = new
ArrayList<>(holders.size());
- for (ClientChannelHolder hld : holders) {
- if (closed) {
- return; // New reinit task scheduled or channel is
closed.
- }
+ for (ClientChannelHolder hld : holders) {
+ if (closed) {
+ return;
+ }
- try {
- hld.getOrCreateChannelAsync(true);
- } catch (Exception e) {
- log.warn("Failed to establish connection to " +
hld.chCfg.getAddress() + ": " + e.getMessage(), e);
- }
- }
- }
- );
+ try {
+ futs.add(hld.getOrCreateChannelAsync(true));
+ } catch (Exception e) {
+ log.warn("Failed to establish connection to " +
hld.chCfg.getAddress() + ": " + e.getMessage(), e);
+ }
+ }
+
+ long interval = clientCfg.reconnectInterval();
+
+ if (interval > 0 && !closed) {
+ // After current round of connection attempts is finished,
schedule the next one with a configured delay.
+ CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new))
+ .whenCompleteAsync(
+ (res, err) -> initAllChannelsAsync(),
+ CompletableFuture.delayedExecutor(interval,
TimeUnit.MILLISECONDS));
+ }
}
private void onTopologyAssignmentChanged(ClientChannel clientChannel) {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index c54d6365bf..c505deaf30 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.UUID;
import java.util.function.Function;
@@ -43,9 +44,10 @@ public class HeartbeatTest {
.loggerFactory(loggerFactory);
try (var ignored = builder.build()) {
- IgniteTestUtils.waitForCondition(
- () ->
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Disconnected
from server")),
- 1000);
+ assertTrue(
+ IgniteTestUtils.waitForCondition(
+ () ->
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Disconnected
from server")),
+ 1000));
}
}
}
@@ -98,9 +100,11 @@ public class HeartbeatTest {
.loggerFactory(loggerFactory);
try (var ignored = builder.build()) {
- IgniteTestUtils.waitForCondition(
- () ->
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Heartbeat
timeout, closing the channel")),
- 3000);
+ assertTrue(
+ IgniteTestUtils.waitForCondition(
+ () -> loggerFactory.logger.entries().stream()
+ .anyMatch(x -> x.contains("Heartbeat
timeout, closing the channel")),
+ 3000));
}
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index 163f95c5ff..d91faa53d0 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -18,23 +18,30 @@
package org.apache.ignite.client;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests thin client reconnect.
*/
public class ReconnectTest {
/** Test server. */
- TestServer server;
+ private TestServer server;
/** Test server 2. */
- TestServer server2;
+ private TestServer server2;
@AfterEach
void tearDown() throws Exception {
@@ -95,4 +102,81 @@ public class ReconnectTest {
assertThrowsWithCause(() -> client.tables().tables(),
IgniteClientConnectionException.class);
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void testClientRepairsBackgroundConnectionsPeriodically(boolean
reconnectEnabled) throws Exception {
+ startTwoServers();
+
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:10900..10902")
+ .reconnectInterval(reconnectEnabled ? 50 : 0)
+ .heartbeatInterval(50);
+
+ try (var client = builder.build()) {
+ waitForConnections(client, 2);
+
+ server2.close();
+ waitForConnections(client, 1);
+
+ server2 = AbstractClientTest.startServer(
+ 10902,
+ 0,
+ 0,
+ new FakeIgnite(),
+ "node3");
+
+ if (reconnectEnabled) {
+ waitForConnections(client, 2);
+
+ String[] nodeNames =
client.connections().stream().map(ClusterNode::name).sorted().toArray(String[]::new);
+ assertArrayEquals(new String[]{"node1", "node3"}, nodeNames);
+ } else {
+ Thread.sleep(100);
+ assertEquals(1, client.connections().size());
+ }
+ }
+ }
+
+ @Test
+ public void testFullClusterRestart() throws Exception {
+ startTwoServers();
+
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:10900..10902")
+ .reconnectInterval(50)
+ .heartbeatInterval(50);
+
+ try (var client = builder.build()) {
+ waitForConnections(client, 2);
+
+ IgniteUtils.closeAll(server, server2);
+ waitForConnections(client, 0);
+
+ startTwoServers();
+ waitForConnections(client, 2);
+ }
+ }
+
+ private void startTwoServers() {
+ server = AbstractClientTest.startServer(
+ 10900,
+ 0,
+ 0,
+ new FakeIgnite(),
+ "node1");
+
+ server2 = AbstractClientTest.startServer(
+ 10901,
+ 0,
+ 0,
+ new FakeIgnite(),
+ "node2");
+ }
+
+ private static void waitForConnections(IgniteClient client, int
expectedConnections) throws InterruptedException {
+ assertTrue(IgniteTestUtils.waitForCondition(
+ () -> client.connections().size() ==
expectedConnections, 5000),
+ () -> "Client should have " + expectedConnections + "
connections: " + client.connections().size());
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index d3ba937871..b154503415 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -228,7 +228,7 @@ public class RetryPolicyTest {
@Test
public void testRetryReadPolicyAllOperationsSupported() {
var plc = new RetryReadPolicy();
- var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null,
0, 0, null, null, null);
+ var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, 0,
null, 0, 0, null, null, null);
for (var op : ClientOperationType.values()) {
var ctx = new RetryPolicyContextImpl(cfg, op, 0, null);