This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new add61eb5887 IGNITE-22964 Java thin: fix client init hang on 
unreachable discovered address (#11486)
add61eb5887 is described below

commit add61eb58874a6199b4f23f739efad55155f6bcd
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Aug 21 15:31:41 2024 +0300

    IGNITE-22964 Java thin: fix client init hang on unreachable discovered 
address (#11486)
    
    * Do not perform cluster discovery synchronously while initializing the 
client - do `applyOnDefaultChannel` before checking `channelsCnt.get() == 0` in 
`channelsInit`
    * Do not disconnect active channels when performing discovery, even if 
those addresses are not in the new list to avoid unnecessary reconnects
---
 .../internal/client/thin/ReliableChannel.java      | 22 +++++++--
 .../org/apache/ignite/client/ReliabilityTest.java  |  6 ++-
 .../internal/client/thin/ComputeTaskTest.java      |  4 +-
 .../ThinClientAbstractPartitionAwarenessTest.java  | 56 ++++++++++++++++++++--
 .../thin/ThinClientEnpointsDiscoveryTest.java      | 45 +++++++++++++++++
 ...ientPartitionAwarenessUnstableTopologyTest.java |  4 +-
 6 files changed, 124 insertions(+), 13 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 6559ea8f739..1d0115630a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -631,6 +631,19 @@ final class ReliableChannel implements AutoCloseable {
             return;
         }
 
+        // Add connected channels to the list to avoid unnecessary reconnects, 
unless address finder is used.
+        if (holders != null && clientCfg.getAddressesFinder() == null) {
+            // Do not modify the original list.
+            newAddrs = new ArrayList<>(newAddrs);
+
+            for (ClientChannelHolder h : holders) {
+                ClientChannel ch = h.ch;
+
+                if (ch != null && !ch.closed())
+                    newAddrs.add(h.getAddresses());
+            }
+        }
+
         Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
 
         Set<InetSocketAddress> newAddrsSet = 
newAddrs.stream().flatMap(Collection::stream).collect(Collectors.toSet());
@@ -744,13 +757,16 @@ final class ReliableChannel implements AutoCloseable {
         initChannelHolders();
 
         if (failures == null || failures.size() < attemptsLimit) {
+            // Establish default channel connection.
+            applyOnDefaultChannel(channel -> null, null, failures);
+
             if (channelsCnt.get() == 0) {
                 // Establish default channel connection and retrive nodes 
endpoints if applicable.
-                if (applyOnDefaultChannel(discoveryCtx::refresh, null, 
failures))
+                boolean discoveryUpdated = 
applyOnDefaultChannel(discoveryCtx::refresh, null, failures);
+
+                if (discoveryUpdated)
                     initChannelHolders();
             }
-            else // Apply no-op function. Establish default channel connection.
-                applyOnDefaultChannel(channel -> null, null, failures);
         }
 
         if (partitionAwarenessEnabled)
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 81127e8d091..6803f3c2d47 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -205,9 +205,11 @@ public class ReliabilityTest extends 
AbstractThinClientTest {
             // Fail.
             dropAllThinClientConnections(Ignition.allGrids().get(0));
 
-            Throwable ex = GridTestUtils.assertThrowsWithCause(() -> 
cachePut(cache, 0, 0), ClientConnectionException.class);
+            if (!partitionAware) {
+                Throwable ex = GridTestUtils.assertThrowsWithCause(() -> 
cachePut(cache, 0, 0), ClientConnectionException.class);
 
-            GridTestUtils.assertContains(null, ex.getMessage(), 
F.first(cluster.clientAddresses()));
+                GridTestUtils.assertContains(null, ex.getMessage(), 
F.first(cluster.clientAddresses()));
+            }
 
             // Recover after fail.
             cachePut(cache, 0, 0);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 38a3da9f779..6d6b96dc5cd 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -420,13 +420,13 @@ public class ComputeTaskTest extends 
AbstractThinClientTest {
             Future<Object> fut1 = 
compute.executeAsync(TestLatchTask.class.getName(), null);
 
             // Wait for the task to start, then drop connections.
-            TestLatchTask.startLatch.await();
+            assertTrue(TestLatchTask.startLatch.await(TIMEOUT, 
TimeUnit.MILLISECONDS));
             dropAllThinClientConnections();
 
             TestLatchTask.startLatch = new CountDownLatch(1);
             Future<Object> fut2 = 
compute.executeAsync(TestLatchTask.class.getName(), null);
 
-            TestLatchTask.startLatch.await();
+            assertTrue(TestLatchTask.startLatch.await(TIMEOUT, 
TimeUnit.MILLISECONDS));
             dropAllThinClientConnections();
 
             TestLatchTask.latch = new CountDownLatch(1);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 93dc575d8cc..8430fc5ff81 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -54,7 +56,7 @@ import static 
org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_
 @SuppressWarnings("rawtypes")
 public abstract class ThinClientAbstractPartitionAwarenessTest extends 
GridCommonAbstractTest {
     /** Wait timeout. */
-    protected static final long WAIT_TIMEOUT = 5_000L;
+    protected static final long WAIT_TIMEOUT = 15_000L;
 
     /** Replicated cache name. */
     protected static final String REPL_CACHE_NAME = "replicated_cache";
@@ -153,16 +155,31 @@ public abstract class 
ThinClientAbstractPartitionAwarenessTest extends GridCommo
      * Checks that operation goes through specified channel.
      */
     protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh, 
ClientOperation expOp) {
+        assertOpOnChannel(expCh, expOp, null);
+    }
+
+    /**
+     * Checks that operation goes through specified channel.
+     */
+    protected void assertOpOnChannel(
+            @Nullable TestTcpClientChannel expCh,
+            ClientOperation expOp,
+            @Nullable ClientOperation ignoreOp) {
+        while (opsQueue.peek() != null && opsQueue.peek().get2() == ignoreOp) {
+            opsQueue.poll();
+        }
+
         T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
+        T2<TestTcpClientChannel, ClientOperation> queuedOp = opsQueue.peek();
 
         assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", 
expOp=" + expOp + ']', nextChOp);
 
         assertEquals("Unexpected operation on channel [expCh=" + expCh + ", 
expOp=" + expOp +
-                ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
+                ", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']', 
expOp, nextChOp.get2());
 
         if (expCh != null) {
             assertEquals("Unexpected channel for operation [expCh=" + expCh + 
", expOp=" + expOp +
-                ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+                ", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']', 
expCh, nextChOp.get1());
         }
     }
 
@@ -245,10 +262,41 @@ public abstract class 
ThinClientAbstractPartitionAwarenessTest extends GridCommo
         // Wait until all channels initialized.
         for (int ch : chIdxs) {
             assertTrue("Failed to wait for channel[" + ch + "] init",
-                GridTestUtils.waitForCondition(() -> channels[ch] != null, 
WAIT_TIMEOUT));
+                GridTestUtils.waitForCondition(() -> isConnected(ch), 
WAIT_TIMEOUT));
         }
     }
 
+    /**
+     * Gets a value indicating whether the channel is connected at the 
specified index (port offset).
+     *
+     * @param chIdx Channel index (port offset).
+     * @return {@code true} if the channel is connected, {@code false} 
otherwise.
+     */
+    protected boolean isConnected(int chIdx) {
+        List<ReliableChannel.ClientChannelHolder> channelHolders = 
((TcpIgniteClient)client).reliableChannel().getChannelHolders();
+        int chPort = DFLT_PORT + chIdx;
+
+        for (ReliableChannel.ClientChannelHolder holder : channelHolders) {
+            if (holder == null || holder.isClosed()) {
+                continue;
+            }
+
+            ClientChannel ch = GridTestUtils.getFieldValue(holder, "ch");
+
+            if (ch == null || ch.closed()) {
+                continue;
+            }
+
+            for (InetSocketAddress addr : holder.getAddresses()) {
+                if (addr.getPort() == chPort) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
     /**
      *
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
index 6c91e3014f1..bcb7627f8ec 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
@@ -17,10 +17,24 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.ignite.Ignition;
 import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
+import static 
org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
+
 /**
  * Test endpoints discovery by thin client.
  */
@@ -122,4 +136,35 @@ public class ThinClientEnpointsDiscoveryTest extends 
ThinClientAbstractPartition
 
         awaitChannelsInit(0);
     }
+
+    /** */
+    @Test
+    public void testUnreachableAddressDiscoveredDoesNotPreventClientInit() 
throws Exception {
+        try (ServerSocket sock = new ServerSocket()) {
+            sock.bind(new InetSocketAddress("127.0.0.1", 0));
+
+            ArrayList<String> addrs = new ArrayList<>();
+            addrs.add("127.0.0.1:" + sock.getLocalPort());
+
+            IgniteEx server = startGrid(0);
+            ClusterNode serverNode = server.cluster().localNode();
+
+            // Override node attributes - set local port of the "fake server" 
socket which does not work.
+            Map<String, Object> attrsFiltered = serverNode.attributes();
+            Map<String, Object> attrsSealed = 
GridTestUtils.getFieldValue(attrsFiltered, "map");
+            Map<String, Object> attrs = 
GridTestUtils.getFieldValue(attrsSealed, "m");
+            attrs.put(ClientListenerProcessor.CLIENT_LISTENER_PORT, 
sock.getLocalPort());
+
+            // Config has good server address, client discovery returns 
unreachable address.
+            // We expect the client to connect to the good address and ignore 
the unreachable one.
+            ClientConfiguration ccfg = new ClientConfiguration()
+                    .setTimeout(2000)
+                    .setAddresses("127.0.0.1:" + DFLT_PORT);
+
+            IgniteClient client = Ignition.startClient(ccfg);
+
+            Collection<String> cacheNames = client.cacheNames();
+            assertFalse(cacheNames.isEmpty());
+        }
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
index fe49303bbeb..83081ffce83 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
@@ -205,12 +205,12 @@ public class 
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
         cache.put(key, 0);
 
         // Request goes to the connected channel, since affinity node is 
disconnected.
-        assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT);
+        assertOpOnChannel(null, ClientOperation.CACHE_PUT);
 
         cache.put(key, 0);
 
         // Connection to disconnected node should be restored after retry.
-        assertOpOnChannel(channels[disconnectNodeIdx], 
ClientOperation.CACHE_PUT);
+        assertOpOnChannel(channels[disconnectNodeIdx], 
ClientOperation.CACHE_PUT, ClientOperation.CACHE_PARTITIONS);
 
         // Test partition awareness.
         testPartitionAwareness(false);

Reply via email to