This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ec812d4e2 IGNITE-18809 Java client: Reconnect secondary endpoints in 
background (#1786)
5ec812d4e2 is described below

commit 5ec812d4e2315cd1e4449370842db6a573c9d198
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Mar 13 22:28:35 2023 +0300

    IGNITE-18809 Java client: Reconnect secondary endpoints in background 
(#1786)
    
    Add `reconnectInterval` property to client configuration, which controls 
periodic background connect/repair of all known endpoints.
---
 .../org/apache/ignite/client/IgniteClient.java     | 28 +++++++
 .../ignite/client/IgniteClientConfiguration.java   | 23 +++++-
 .../client/IgniteClientConfigurationImpl.java      | 12 +++
 .../ignite/internal/client/ReliableChannel.java    | 44 ++++++-----
 .../org/apache/ignite/client/HeartbeatTest.java    | 16 ++--
 .../org/apache/ignite/client/ReconnectTest.java    | 88 +++++++++++++++++++++-
 .../org/apache/ignite/client/RetryPolicyTest.java  |  2 +-
 7 files changed, 184 insertions(+), 29 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java 
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 29cbf2137a..9be68d90be 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -20,6 +20,7 @@ package org.apache.ignite.client;
 import static 
org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT;
 import static 
org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
 import static 
org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_TIMEOUT;
+import static 
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_INTERVAL;
 import static 
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD;
 import static 
org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES;
 import static org.apache.ignite.internal.client.ClientUtils.sync;
@@ -82,6 +83,9 @@ public interface IgniteClient extends Ignite {
         /** Reconnect throttling retries. */
         private int reconnectThrottlingRetries = 
DFLT_RECONNECT_THROTTLING_RETRIES;
 
+        /** Reconnect interval, in milliseconds. */
+        private long reconnectInterval = DFLT_RECONNECT_INTERVAL;
+
         /** Async continuation executor. */
         private Executor asyncContinuationExecutor;
 
@@ -211,6 +215,29 @@ public interface IgniteClient extends Ignite {
             return this;
         }
 
+        /**
+         * Sets the reconnect interval, in milliseconds. Set to {@code 0} to 
disable background reconnect.
+         *
+         * <p>Ignite balances requests across all healthy connections (when 
multiple endpoints are configured).
+         * Ignite also repairs connections on demand (when a request is made).
+         * However, "secondary" connections can be lost (due to network 
issues, or node restarts). This property controls how ofter Ignite
+         * client will check all configured endpoints and try to reconnect 
them in case of failure.
+         *
+         * @param reconnectInterval Reconnect interval, in milliseconds.
+         * @return This instance.
+         * @throws IllegalArgumentException When value is less than zero.
+         */
+        public Builder reconnectInterval(long reconnectInterval) {
+            if (reconnectInterval < 0) {
+                throw new IllegalArgumentException("reconnectInterval ["
+                        + reconnectInterval + "] must be a non-negative 
integer value.");
+            }
+
+            this.reconnectInterval = reconnectInterval;
+
+            return this;
+        }
+
         /**
          * Sets the async continuation executor.
          *
@@ -296,6 +323,7 @@ public interface IgniteClient extends Ignite {
                     connectTimeout,
                     reconnectThrottlingPeriod,
                     reconnectThrottlingRetries,
+                    reconnectInterval,
                     asyncContinuationExecutor,
                     heartbeatInterval,
                     heartbeatTimeout,
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
 
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 646c9a86cd..2511e8841c 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -42,12 +42,15 @@ public interface IgniteClientConfiguration {
     /** Default heartbeat interval, in milliseconds. */
     int DFLT_HEARTBEAT_INTERVAL = 30_000;
 
-    /** Default reconnect throttling period. */
+    /** Default reconnect throttling period, in milliseconds. */
     long DFLT_RECONNECT_THROTTLING_PERIOD = 30_000L;
 
     /** Default reconnect throttling retries. */
     int DFLT_RECONNECT_THROTTLING_RETRIES = 3;
 
+    /** Default reconnect interval, in milliseconds. */
+    long DFLT_RECONNECT_INTERVAL = 30_000L;
+
     /**
      * Gets the address finder.
      *
@@ -60,6 +63,9 @@ public interface IgniteClientConfiguration {
      * port is not set then Ignite will generate multiple addresses for 
default port range. See {@link IgniteClientConfiguration#DFLT_PORT},
      * {@link IgniteClientConfiguration#DFLT_PORT_RANGE}.
      *
+     * <p>Providing addresses of multiple nodes in the cluster will improve 
performance: Ignite will balance requests across all
+     * connections, and use partition awareness to send key-based requests 
directly to the primary node.
+     *
      * @return Addresses.
      */
     String[] addresses();
@@ -93,10 +99,23 @@ public interface IgniteClientConfiguration {
      */
     int reconnectThrottlingRetries();
 
+    /**
+     * Gets the background reconnect interval, in milliseconds. Set to {@code 
0} to disable background reconnect.
+     * Default is {@link #DFLT_RECONNECT_INTERVAL}.
+     *
+     * <p>Ignite balances requests across all healthy connections (when 
multiple endpoints are configured).
+     * Ignite also repairs connections on demand (when a request is made).
+     * However, "secondary" connections can be lost (due to network issues, or 
node restarts). This property controls how ofter Ignite
+     * client will check all configured endpoints and try to reconnect them in 
case of failure.
+     *
+     * @return Background reconnect interval, in milliseconds.
+     */
+    long reconnectInterval();
+
     /**
      * Gets the async continuation executor.
      *
-     * <p>When <code>null</code> (default), {@link ForkJoinPool#commonPool()} 
is used.
+     * <p>When {@code null} (default), {@link ForkJoinPool#commonPool()} is 
used.
      *
      * <p>When async client operation completes, corresponding {@link 
java.util.concurrent.CompletableFuture} continuations
      * (such as {@link 
java.util.concurrent.CompletableFuture#thenApply(Function)}) will be invoked 
using this executor.
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index 777e6dcd9d..b665722f49 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -44,6 +44,9 @@ public final class IgniteClientConfigurationImpl implements 
IgniteClientConfigur
     /** Reconnect throttling retries. */
     private final int reconnectThrottlingRetries;
 
+    /** Reconnect interval, in milliseconds. */
+    private final long reconnectInterval;
+
     /** Async continuation executor. */
     private final Executor asyncContinuationExecutor;
 
@@ -68,6 +71,7 @@ public final class IgniteClientConfigurationImpl implements 
IgniteClientConfigur
      * @param connectTimeout Socket connect timeout.
      * @param reconnectThrottlingPeriod Reconnect throttling period, in 
milliseconds.
      * @param reconnectThrottlingRetries Reconnect throttling retries.
+     * @param reconnectInterval Reconnect throttling retries.
      * @param asyncContinuationExecutor Async continuation executor.
      * @param heartbeatInterval Heartbeat message interval.
      * @param heartbeatTimeout Heartbeat message timeout.
@@ -80,6 +84,7 @@ public final class IgniteClientConfigurationImpl implements 
IgniteClientConfigur
             long connectTimeout,
             long reconnectThrottlingPeriod,
             int reconnectThrottlingRetries,
+            long reconnectInterval,
             Executor asyncContinuationExecutor,
             long heartbeatInterval,
             long heartbeatTimeout,
@@ -95,6 +100,7 @@ public final class IgniteClientConfigurationImpl implements 
IgniteClientConfigur
         this.connectTimeout = connectTimeout;
         this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
         this.reconnectThrottlingRetries = reconnectThrottlingRetries;
+        this.reconnectInterval = reconnectInterval;
         this.asyncContinuationExecutor = asyncContinuationExecutor;
         this.heartbeatInterval = heartbeatInterval;
         this.heartbeatTimeout = heartbeatTimeout;
@@ -133,6 +139,12 @@ public final class IgniteClientConfigurationImpl 
implements IgniteClientConfigur
         return reconnectThrottlingRetries;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public long reconnectInterval() {
+        return reconnectInterval;
+    }
+
     /** {@inheritDoc} */
     @Override
     public @Nullable Executor asyncContinuationExecutor() {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 65b03d33d3..09fb936c88 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -156,7 +157,7 @@ public final class ReliableChannel implements AutoCloseable 
{
             if (chFut != null) {
                 var ch = ClientFutureUtils.getNowSafe(chFut);
 
-                if (ch != null) {
+                if (ch != null && !ch.closed()) {
                     res.add(ch.protocolContext().clusterNode());
                 }
             }
@@ -484,7 +485,7 @@ public final class ReliableChannel implements AutoCloseable 
{
         var fut = getDefaultChannelAsync();
 
         // Establish secondary connections in the background.
-        fut.thenAccept(unused -> initAllChannelsAsync());
+        fut.thenAccept(unused -> 
ForkJoinPool.commonPool().submit(this::initAllChannelsAsync));
 
         return fut;
     }
@@ -606,26 +607,33 @@ public final class ReliableChannel implements 
AutoCloseable {
     }
 
     /**
-     * Asynchronously try to establish a connection to all configured servers.
+     * Establish or repair connections to all configured servers.
      */
     private void initAllChannelsAsync() {
-        ForkJoinPool.commonPool().submit(
-                () -> {
-                    List<ClientChannelHolder> holders = channels;
+        List<ClientChannelHolder> holders = channels;
+        List<CompletableFuture<ClientChannel>> futs = new 
ArrayList<>(holders.size());
 
-                    for (ClientChannelHolder hld : holders) {
-                        if (closed) {
-                            return; // New reinit task scheduled or channel is 
closed.
-                        }
+        for (ClientChannelHolder hld : holders) {
+            if (closed) {
+                return;
+            }
 
-                        try {
-                            hld.getOrCreateChannelAsync(true);
-                        } catch (Exception e) {
-                            log.warn("Failed to establish connection to " + 
hld.chCfg.getAddress() + ": " + e.getMessage(), e);
-                        }
-                    }
-                }
-        );
+            try {
+                futs.add(hld.getOrCreateChannelAsync(true));
+            } catch (Exception e) {
+                log.warn("Failed to establish connection to " + 
hld.chCfg.getAddress() + ": " + e.getMessage(), e);
+            }
+        }
+
+        long interval = clientCfg.reconnectInterval();
+
+        if (interval > 0 && !closed) {
+            // After current round of connection attempts is finished, 
schedule the next one with a configured delay.
+            CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new))
+                    .whenCompleteAsync(
+                            (res, err) -> initAllChannelsAsync(),
+                            CompletableFuture.delayedExecutor(interval, 
TimeUnit.MILLISECONDS));
+        }
     }
 
     private void onTopologyAssignmentChanged(ClientChannel clientChannel) {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index c54d6365bf..c505deaf30 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.UUID;
 import java.util.function.Function;
@@ -43,9 +44,10 @@ public class HeartbeatTest {
                     .loggerFactory(loggerFactory);
 
             try (var ignored = builder.build()) {
-                IgniteTestUtils.waitForCondition(
-                        () -> 
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Disconnected 
from server")),
-                        1000);
+                assertTrue(
+                        IgniteTestUtils.waitForCondition(
+                                () -> 
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Disconnected 
from server")),
+                                1000));
             }
         }
     }
@@ -98,9 +100,11 @@ public class HeartbeatTest {
                     .loggerFactory(loggerFactory);
 
             try (var ignored = builder.build()) {
-                IgniteTestUtils.waitForCondition(
-                        () -> 
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Heartbeat 
timeout, closing the channel")),
-                        3000);
+                assertTrue(
+                        IgniteTestUtils.waitForCondition(
+                                () -> loggerFactory.logger.entries().stream()
+                                        .anyMatch(x -> x.contains("Heartbeat 
timeout, closing the channel")),
+                                3000));
             }
         }
     }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index 163f95c5ff..d91faa53d0 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -18,23 +18,30 @@
 package org.apache.ignite.client;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Tests thin client reconnect.
  */
 public class ReconnectTest {
     /** Test server. */
-    TestServer server;
+    private TestServer server;
 
     /** Test server 2. */
-    TestServer server2;
+    private TestServer server2;
 
     @AfterEach
     void tearDown() throws Exception {
@@ -95,4 +102,81 @@ public class ReconnectTest {
 
         assertThrowsWithCause(() -> client.tables().tables(), 
IgniteClientConnectionException.class);
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testClientRepairsBackgroundConnectionsPeriodically(boolean 
reconnectEnabled) throws Exception {
+        startTwoServers();
+
+        Builder builder = IgniteClient.builder()
+                .addresses("127.0.0.1:10900..10902")
+                .reconnectInterval(reconnectEnabled ? 50 : 0)
+                .heartbeatInterval(50);
+
+        try (var client = builder.build()) {
+            waitForConnections(client, 2);
+
+            server2.close();
+            waitForConnections(client, 1);
+
+            server2 = AbstractClientTest.startServer(
+                    10902,
+                    0,
+                    0,
+                    new FakeIgnite(),
+                    "node3");
+
+            if (reconnectEnabled) {
+                waitForConnections(client, 2);
+
+                String[] nodeNames = 
client.connections().stream().map(ClusterNode::name).sorted().toArray(String[]::new);
+                assertArrayEquals(new String[]{"node1", "node3"}, nodeNames);
+            } else {
+                Thread.sleep(100);
+                assertEquals(1, client.connections().size());
+            }
+        }
+    }
+
+    @Test
+    public void testFullClusterRestart() throws Exception {
+        startTwoServers();
+
+        Builder builder = IgniteClient.builder()
+                .addresses("127.0.0.1:10900..10902")
+                .reconnectInterval(50)
+                .heartbeatInterval(50);
+
+        try (var client = builder.build()) {
+            waitForConnections(client, 2);
+
+            IgniteUtils.closeAll(server, server2);
+            waitForConnections(client, 0);
+
+            startTwoServers();
+            waitForConnections(client, 2);
+        }
+    }
+
+    private void startTwoServers() {
+        server = AbstractClientTest.startServer(
+                10900,
+                0,
+                0,
+                new FakeIgnite(),
+                "node1");
+
+        server2 = AbstractClientTest.startServer(
+                10901,
+                0,
+                0,
+                new FakeIgnite(),
+                "node2");
+    }
+
+    private static void waitForConnections(IgniteClient client, int 
expectedConnections) throws InterruptedException {
+        assertTrue(IgniteTestUtils.waitForCondition(
+                        () -> client.connections().size() == 
expectedConnections, 5000),
+                () -> "Client should have " + expectedConnections + " 
connections: " + client.connections().size());
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index d3ba937871..b154503415 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -228,7 +228,7 @@ public class RetryPolicyTest {
     @Test
     public void testRetryReadPolicyAllOperationsSupported() {
         var plc = new RetryReadPolicy();
-        var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null, 
0, 0, null, null, null);
+        var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, 0, 
null, 0, 0, null, null, null);
 
         for (var op : ClientOperationType.values()) {
             var ctx = new RetryPolicyContextImpl(cfg, op, 0, null);

Reply via email to