This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new add61eb5887 IGNITE-22964 Java thin: fix client init hang on
unreachable discovered address (#11486)
add61eb5887 is described below
commit add61eb58874a6199b4f23f739efad55155f6bcd
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Aug 21 15:31:41 2024 +0300
IGNITE-22964 Java thin: fix client init hang on unreachable discovered
address (#11486)
* Do not perform cluster discovery synchronously while initializing the
client - do `applyOnDefaultChannel` before checking `channelsCnt.get() == 0` in
`channelsInit`
* Do not disconnect active channels when performing discovery, even if
those addresses are not in the new list to avoid unnecessary reconnects
---
.../internal/client/thin/ReliableChannel.java | 22 +++++++--
.../org/apache/ignite/client/ReliabilityTest.java | 6 ++-
.../internal/client/thin/ComputeTaskTest.java | 4 +-
.../ThinClientAbstractPartitionAwarenessTest.java | 56 ++++++++++++++++++++--
.../thin/ThinClientEnpointsDiscoveryTest.java | 45 +++++++++++++++++
...ientPartitionAwarenessUnstableTopologyTest.java | 4 +-
6 files changed, 124 insertions(+), 13 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 6559ea8f739..1d0115630a5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -631,6 +631,19 @@ final class ReliableChannel implements AutoCloseable {
return;
}
+ // Add connected channels to the list to avoid unnecessary reconnects,
unless address finder is used.
+ if (holders != null && clientCfg.getAddressesFinder() == null) {
+ // Do not modify the original list.
+ newAddrs = new ArrayList<>(newAddrs);
+
+ for (ClientChannelHolder h : holders) {
+ ClientChannel ch = h.ch;
+
+ if (ch != null && !ch.closed())
+ newAddrs.add(h.getAddresses());
+ }
+ }
+
Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
Set<InetSocketAddress> newAddrsSet =
newAddrs.stream().flatMap(Collection::stream).collect(Collectors.toSet());
@@ -744,13 +757,16 @@ final class ReliableChannel implements AutoCloseable {
initChannelHolders();
if (failures == null || failures.size() < attemptsLimit) {
+ // Establish default channel connection.
+ applyOnDefaultChannel(channel -> null, null, failures);
+
if (channelsCnt.get() == 0) {
// Establish default channel connection and retrive nodes
endpoints if applicable.
- if (applyOnDefaultChannel(discoveryCtx::refresh, null,
failures))
+ boolean discoveryUpdated =
applyOnDefaultChannel(discoveryCtx::refresh, null, failures);
+
+ if (discoveryUpdated)
initChannelHolders();
}
- else // Apply no-op function. Establish default channel connection.
- applyOnDefaultChannel(channel -> null, null, failures);
}
if (partitionAwarenessEnabled)
diff --git
a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 81127e8d091..6803f3c2d47 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -205,9 +205,11 @@ public class ReliabilityTest extends
AbstractThinClientTest {
// Fail.
dropAllThinClientConnections(Ignition.allGrids().get(0));
- Throwable ex = GridTestUtils.assertThrowsWithCause(() ->
cachePut(cache, 0, 0), ClientConnectionException.class);
+ if (!partitionAware) {
+ Throwable ex = GridTestUtils.assertThrowsWithCause(() ->
cachePut(cache, 0, 0), ClientConnectionException.class);
- GridTestUtils.assertContains(null, ex.getMessage(),
F.first(cluster.clientAddresses()));
+ GridTestUtils.assertContains(null, ex.getMessage(),
F.first(cluster.clientAddresses()));
+ }
// Recover after fail.
cachePut(cache, 0, 0);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index 38a3da9f779..6d6b96dc5cd 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -420,13 +420,13 @@ public class ComputeTaskTest extends
AbstractThinClientTest {
Future<Object> fut1 =
compute.executeAsync(TestLatchTask.class.getName(), null);
// Wait for the task to start, then drop connections.
- TestLatchTask.startLatch.await();
+ assertTrue(TestLatchTask.startLatch.await(TIMEOUT,
TimeUnit.MILLISECONDS));
dropAllThinClientConnections();
TestLatchTask.startLatch = new CountDownLatch(1);
Future<Object> fut2 =
compute.executeAsync(TestLatchTask.class.getName(), null);
- TestLatchTask.startLatch.await();
+ assertTrue(TestLatchTask.startLatch.await(TIMEOUT,
TimeUnit.MILLISECONDS));
dropAllThinClientConnections();
TestLatchTask.latch = new CountDownLatch(1);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index 93dc575d8cc..8430fc5ff81 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.client.thin;
+import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -54,7 +56,7 @@ import static
org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_
@SuppressWarnings("rawtypes")
public abstract class ThinClientAbstractPartitionAwarenessTest extends
GridCommonAbstractTest {
/** Wait timeout. */
- protected static final long WAIT_TIMEOUT = 5_000L;
+ protected static final long WAIT_TIMEOUT = 15_000L;
/** Replicated cache name. */
protected static final String REPL_CACHE_NAME = "replicated_cache";
@@ -153,16 +155,31 @@ public abstract class
ThinClientAbstractPartitionAwarenessTest extends GridCommo
* Checks that operation goes through specified channel.
*/
protected void assertOpOnChannel(@Nullable TestTcpClientChannel expCh,
ClientOperation expOp) {
+ assertOpOnChannel(expCh, expOp, null);
+ }
+
+ /**
+ * Checks that operation goes through specified channel.
+ */
+ protected void assertOpOnChannel(
+ @Nullable TestTcpClientChannel expCh,
+ ClientOperation expOp,
+ @Nullable ClientOperation ignoreOp) {
+ while (opsQueue.peek() != null && opsQueue.peek().get2() == ignoreOp) {
+ opsQueue.poll();
+ }
+
T2<TestTcpClientChannel, ClientOperation> nextChOp = opsQueue.poll();
+ T2<TestTcpClientChannel, ClientOperation> queuedOp = opsQueue.peek();
assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ",
expOp=" + expOp + ']', nextChOp);
assertEquals("Unexpected operation on channel [expCh=" + expCh + ",
expOp=" + expOp +
- ", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
+ ", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']',
expOp, nextChOp.get2());
if (expCh != null) {
assertEquals("Unexpected channel for operation [expCh=" + expCh +
", expOp=" + expOp +
- ", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
+ ", nextOpCh=" + nextChOp + ", queuedOp=" + queuedOp + ']',
expCh, nextChOp.get1());
}
}
@@ -245,10 +262,41 @@ public abstract class
ThinClientAbstractPartitionAwarenessTest extends GridCommo
// Wait until all channels initialized.
for (int ch : chIdxs) {
assertTrue("Failed to wait for channel[" + ch + "] init",
- GridTestUtils.waitForCondition(() -> channels[ch] != null,
WAIT_TIMEOUT));
+ GridTestUtils.waitForCondition(() -> isConnected(ch),
WAIT_TIMEOUT));
}
}
+ /**
+ * Gets a value indicating whether the channel is connected at the
specified index (port offset).
+ *
+ * @param chIdx Channel index (port offset).
+ * @return {@code true} if the channel is connected, {@code false}
otherwise.
+ */
+ protected boolean isConnected(int chIdx) {
+ List<ReliableChannel.ClientChannelHolder> channelHolders =
((TcpIgniteClient)client).reliableChannel().getChannelHolders();
+ int chPort = DFLT_PORT + chIdx;
+
+ for (ReliableChannel.ClientChannelHolder holder : channelHolders) {
+ if (holder == null || holder.isClosed()) {
+ continue;
+ }
+
+ ClientChannel ch = GridTestUtils.getFieldValue(holder, "ch");
+
+ if (ch == null || ch.closed()) {
+ continue;
+ }
+
+ for (InetSocketAddress addr : holder.getAddresses()) {
+ if (addr.getPort() == chPort) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
/**
*
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
index 6c91e3014f1..bcb7627f8ec 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientEnpointsDiscoveryTest.java
@@ -17,10 +17,24 @@
package org.apache.ignite.internal.client.thin;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
+import static
org.apache.ignite.configuration.ClientConnectorConfiguration.DFLT_PORT;
+
/**
* Test endpoints discovery by thin client.
*/
@@ -122,4 +136,35 @@ public class ThinClientEnpointsDiscoveryTest extends
ThinClientAbstractPartition
awaitChannelsInit(0);
}
+
+ /** */
+ @Test
+ public void testUnreachableAddressDiscoveredDoesNotPreventClientInit()
throws Exception {
+ try (ServerSocket sock = new ServerSocket()) {
+ sock.bind(new InetSocketAddress("127.0.0.1", 0));
+
+ ArrayList<String> addrs = new ArrayList<>();
+ addrs.add("127.0.0.1:" + sock.getLocalPort());
+
+ IgniteEx server = startGrid(0);
+ ClusterNode serverNode = server.cluster().localNode();
+
+ // Override node attributes - set local port of the "fake server"
socket which does not work.
+ Map<String, Object> attrsFiltered = serverNode.attributes();
+ Map<String, Object> attrsSealed =
GridTestUtils.getFieldValue(attrsFiltered, "map");
+ Map<String, Object> attrs =
GridTestUtils.getFieldValue(attrsSealed, "m");
+ attrs.put(ClientListenerProcessor.CLIENT_LISTENER_PORT,
sock.getLocalPort());
+
+ // Config has good server address, client discovery returns
unreachable address.
+ // We expect the client to connect to the good address and ignore
the unreachable one.
+ ClientConfiguration ccfg = new ClientConfiguration()
+ .setTimeout(2000)
+ .setAddresses("127.0.0.1:" + DFLT_PORT);
+
+ IgniteClient client = Ignition.startClient(ccfg);
+
+ Collection<String> cacheNames = client.cacheNames();
+ assertFalse(cacheNames.isEmpty());
+ }
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
index fe49303bbeb..83081ffce83 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessUnstableTopologyTest.java
@@ -205,12 +205,12 @@ public class
ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClient
cache.put(key, 0);
// Request goes to the connected channel, since affinity node is
disconnected.
- assertOpOnChannel(channels[1], ClientOperation.CACHE_PUT);
+ assertOpOnChannel(null, ClientOperation.CACHE_PUT);
cache.put(key, 0);
// Connection to disconnected node should be restored after retry.
- assertOpOnChannel(channels[disconnectNodeIdx],
ClientOperation.CACHE_PUT);
+ assertOpOnChannel(channels[disconnectNodeIdx],
ClientOperation.CACHE_PUT, ClientOperation.CACHE_PARTITIONS);
// Test partition awareness.
testPartitionAwareness(false);