This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 16d6bfd64c9 IGNITE-23118 Fixed connection check to the previous node 
in the ring.
16d6bfd64c9 is described below

commit 16d6bfd64c9dae0b0ac9e75f3e754bbb871b232f
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Oct 1 15:41:13 2024 +0300

    IGNITE-23118 Fixed connection check to the previous node in the ring.
---
 .../spi/IgniteSpiOperationTimeoutHelper.java       |  13 ++-
 .../ignite/spi/discovery/tcp/ClientImpl.java       |   2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |  54 ++++++---
 .../tcp/TcpDiscoveryNetworkIssuesTest.java         | 123 ++++++++++++++++++---
 4 files changed, 159 insertions(+), 33 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index d5129317815..b95bf22427d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -74,6 +74,17 @@ public class IgniteSpiOperationTimeoutHelper {
         }
     }
 
+    /**
+     * Creates timeout helper with an absolute time threshold. Sets {@code 
timeoutEnabled} to {@code false}.
+     *
+     * @param timeout Timeout in milliseconds.
+     */
+    public IgniteSpiOperationTimeoutHelper(long timeout) {
+        timeoutEnabled = false;
+
+        timeoutThreshold = System.nanoTime() + U.millisToNanos(timeout);
+    }
+
     /**
      * Returns a timeout value to use for the next network operation.
      *
@@ -111,7 +122,7 @@ public class IgniteSpiOperationTimeoutHelper {
      * @param e Exception to check.
      * @return {@code True} if given exception is a timeout. {@code False} 
otherwise.
      */
-    public boolean checkFailureTimeoutReached(Exception e) {
+    public static boolean checkFailureTimeoutReached(Exception e) {
         return X.hasCause(e, IgniteSpiOperationTimeoutException.class, 
SocketTimeoutException.class, SocketException.class);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e02d591550f..57aeb328bdd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -846,7 +846,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     break;
                 }
 
-                if (timeoutHelper.checkFailureTimeoutReached(e))
+                if 
(IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == 
spi.getReconnectCount())
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0ba98ae7097..acbccbb09a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -787,7 +787,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         for (InetSocketAddress addr : spi.getEffectiveNodeAddresses(node)) {
             try {
                 // ID returned by the node should be the same as ID of the 
parameter for ping to succeed.
-                IgniteBiTuple<UUID, Boolean> t = pingNode(addr, node.id(), 
clientNodeId);
+                IgniteBiTuple<UUID, Boolean> t = pingNode(addr, node.id(), 
clientNodeId, 0);
 
                 if (t == null)
                     // Remote node left topology.
@@ -818,18 +818,25 @@ class ServerImpl extends TcpDiscoveryImpl {
      * @param addr Address of the node.
      * @param nodeId Node ID to ping. In case when client node ID is not null 
this node ID is an ID of the router node.
      * @param clientNodeId Client node ID.
+     * @param timeout Timeout on operation in milliseconds. If 0, a value 
based on {@link TcpDiscoverySpi} is used.
      * @return ID of the remote node and "client exists" flag if node alive or 
{@code null} if the remote node has
      *         left a topology during the ping process.
      * @throws IgniteCheckedException If an error occurs.
      */
-    @Nullable private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress 
addr, @Nullable UUID nodeId,
-        @Nullable UUID clientNodeId) throws IgniteCheckedException {
+    @Nullable private IgniteBiTuple<UUID, Boolean> pingNode(
+        InetSocketAddress addr,
+        @Nullable UUID nodeId,
+        @Nullable UUID clientNodeId,
+        long timeout
+    ) throws IgniteCheckedException {
         assert addr != null;
+        assert timeout >= 0;
 
-        UUID locNodeId = getLocalNodeId();
+        IgniteSpiOperationTimeoutHelper timeoutHelper = timeout == 0
+            ? new IgniteSpiOperationTimeoutHelper(spi, clientNodeId == null)
+            : new IgniteSpiOperationTimeoutHelper(timeout);
 
-        IgniteSpiOperationTimeoutHelper timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(spi,
-            clientNodeId == null);
+        UUID locNodeId = getLocalNodeId();
 
         if (F.contains(spi.locNodeAddrs, addr)) {
             if (clientNodeId == null)
@@ -928,9 +935,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                             break;
                         }
 
-                        if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e)) {
+                        if 
(IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e)
+                            && (spi.failureDetectionTimeoutEnabled() || 
timeout != 0)) {
                             log.warning("Failed to ping node [nodeId=" + 
nodeId + "]. Reached the timeout " +
-                                spi.failureDetectionTimeout() + "ms. Cause: " 
+ e.getMessage());
+                                (timeout == 0 ? spi.failureDetectionTimeout() 
: timeout) + "ms. Cause: " + e.getMessage());
 
                             break;
                         }
@@ -1579,7 +1587,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     break;
                 }
 
-                if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e))
+                if (spi.failureDetectionTimeoutEnabled() && 
IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
                 if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt == 
spi.getReconnectCount())
@@ -2260,7 +2268,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (res == null) {
                             try {
-                                res = pingNode(addr, null, null) != null;
+                                res = pingNode(addr, null, null, 0) != null;
                             }
                             catch (IgniteCheckedException e) {
                                 if (log.isDebugEnabled())
@@ -3630,7 +3638,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (!spi.failureDetectionTimeoutEnabled() && 
++reconCnt == spi.getReconnectCount())
                                     break;
 
-                                if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e))
+                                if (spi.failureDetectionTimeoutEnabled() && 
IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                                     break;
                                 else if (!spi.failureDetectionTimeoutEnabled() 
&& (e instanceof
                                     SocketTimeoutException || X.hasCause(e, 
SocketTimeoutException.class))) {
@@ -3801,7 +3809,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             onException("Failed to send message to next node 
[next=" + next.id() + ", msg=" + msg + ']',
                                 e);
 
-                            if (spi.failureDetectionTimeoutEnabled() && 
timeoutHelper.checkFailureTimeoutReached(e))
+                            if (spi.failureDetectionTimeoutEnabled() && 
IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
                                 break;
 
                             if (!spi.failureDetectionTimeoutEnabled()) {
@@ -7246,8 +7254,17 @@ class ServerImpl extends TcpDiscoveryImpl {
             lastRingMsgReceivedTime = System.nanoTime();
         }
 
-        /** @return Alive address if was able to connected to. {@code Null} 
otherwise. */
+        /**
+         * Asynchronously searches for an alive address of a node using a 
maximal timeout.
+         *
+         * @param node Node to ping.
+         * @param timeout Overall operation timeout.
+         * @return An address successfully connected to. {@code Null} if no 
alive address was detected within the timeout.
+         * @see #pingNode(InetSocketAddress, UUID, UUID, long)
+         */
         private InetSocketAddress checkConnection(TcpDiscoveryNode node, int 
timeout) {
+            IgniteSpiOperationTimeoutHelper timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(timeout);
+
             AtomicReference<InetSocketAddress> liveAddrHolder = new 
AtomicReference<>();
 
             List<InetSocketAddress> addrs = new 
ArrayList<>(spi.getEffectiveNodeAddresses(node));
@@ -7277,11 +7294,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                         for (int i = 0; i < addrsToCheck; ++i) {
                             InetSocketAddress addr = 
addrs.get(addrIdx.getAndIncrement());
 
-                            try (Socket sock = new Socket()) {
+                            try {
                                 if (liveAddrHolder.get() == null) {
-                                    sock.connect(addr, perAddrTimeout);
+                                    UUID id = pingNode(addr, node.id(), null, 
timeoutHelper.nextTimeoutChunk(perAddrTimeout)).get1();
+
+                                    assert id == null || id.equals(node.id());
 
-                                    liveAddrHolder.compareAndSet(null, addr);
+                                    if (id != null)
+                                        liveAddrHolder.compareAndSet(null, 
addr);
                                 }
                             }
                             catch (Exception e) {
@@ -7514,7 +7534,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             for (InetSocketAddress addr : spi.getEffectiveNodeAddresses(node)) 
{
                 try {
                     if (!(addr.getAddress().isLoopbackAddress() && 
locNode.socketAddresses().contains(addr))) {
-                        IgniteBiTuple<UUID, Boolean> t = pingNode(addr, 
node.id(), null);
+                        IgniteBiTuple<UUID, Boolean> t = pingNode(addr, 
node.id(), null, 0);
 
                         if (t != null)
                             return true;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index ea4ca2f0353..7162083af9c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -29,14 +29,18 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -124,6 +128,9 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
     /** */
     private String localhost;
 
+    /** */
+    private IgniteLogger gridLog;
+
     /** */
     private final GridConcurrentHashSet<Integer> segmentedNodes = new 
GridConcurrentHashSet<>();
 
@@ -156,6 +163,9 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
 
         cfg.setLocalHost(localhost);
 
+        if (gridLog != null)
+            cfg.setGridLogger(gridLog);
+
         return cfg;
     }
 
@@ -249,6 +259,92 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
         doTestBackwardNodeCheckWithSameLoopback("0.0.0.0");
     }
 
+    /**
+     * Tests backward ping when the discovery threads of the malfunction node 
is simulated to hang at GC.
+     * But the JVM is able to accept socket connections.
+     */
+    @Test
+    public void testBackwardConnectionCheckWhenDiscoveryThreadsSuspended() 
throws Exception {
+        ListeningTestLogger testLog = new ListeningTestLogger(log);
+
+        gridLog = testLog;
+
+        localhost = "127.0.0.1";
+
+        failureDetectionTimeout = 3000;
+
+        specialSpi = new TestDiscoverySpi();
+
+        // This node suspects its next failed.
+        Ignite doubtNode0 = startGrid(0);
+
+        // Simulates frozen threads on node 1 but answering sockets. I.e. 
Socket#connect() works to node 1 but
+        // reading anything with Socket#read() from it would fail with the 
timeout.
+        specialSpi = new TestDiscoverySpi();
+
+        // Node simulated 'frozen'. Can accept connections (socket accept) but 
won't write anything to a discovery socket.
+        Ignite frozenNode1 = startGrid(1);
+
+        UUID frozenNodeId = frozenNode1.cluster().localNode().id();
+
+        specialSpi = new TestDiscoverySpi();
+
+        setLoggerDebugLevel();
+
+        // Node which does the backward connection check to its previous 
'frozen'.
+        Ignite pingingNode2 = startGrid(2);
+
+        LogListener node1SegmentedLogLsnr = LogListener.matches("Local node 
SEGMENTED: TcpDiscoveryNode [id=" + frozenNode1).build();
+
+        // Node1 must leave the cluster.
+        LogListener backwardPingLogLsnr = LogListener.matches("Remote node 
requests topology change. Checking connection to " +
+            "previous [TcpDiscoveryNode [id=" + frozenNodeId).build();
+
+        testLog.registerListener(node1SegmentedLogLsnr);
+        testLog.registerListener(backwardPingLogLsnr);
+
+        // Result of the ping from node2 ot node1.
+        AtomicReference<Boolean> backwardPingResult = new AtomicReference<>();
+
+        // Request to establish new permanent cluster connection from doubting 
node0 to node2.
+        testSpi(doubtNode0).hsRqLsnr.set((s, hsRq) -> {
+            if (hsRq.changeTopology() && 
frozenNodeId.equals(hsRq.checkPreviousNodeId())) {
+                // Continue simulation of node1 freeze at GC and processes no 
discovery messages.
+                testSpi(frozenNode1).addrsToBlock = Collections.emptyList();
+            }
+        });
+
+        // Response from node2 to node0 with negative check of frozen node1.
+        testSpi(pingingNode2).hsRespLsnr.set((s, hsResp) -> {
+            backwardPingResult.set(hsResp.previousNodeAlive());
+        });
+
+        // Begin simulation of node1 freeze at GC and processes no discovery 
messages and wait till
+        // the discovery traffic node0->node1 stops.
+        testSpi(doubtNode0).addrsToBlock = spi(frozenNode1).locNodeAddrs;
+        assertTrue(waitForCondition(() -> testSpi(doubtNode0).blocked, 
getTestTimeout()));
+
+        // Wait till the discovery traffic node1->node2 stops too.
+        assertTrue(waitForCondition(() -> testSpi(frozenNode1).blocked, 
getTestTimeout()));
+
+        // Wait till the backward connection check and ensure the result is 
negative (node1 confirmed failed).
+        assertTrue(backwardPingLogLsnr.check(getTestTimeout()));
+        assertTrue(waitForCondition(() -> backwardPingResult.get() != null, 
getTestTimeout()));
+
+        assertFalse(backwardPingResult.get());
+
+        assertTrue(backwardPingLogLsnr.check(getTestTimeout()));
+
+        // Node0 and node2 must survive.
+        assertTrue(waitForCondition(() -> doubtNode0.cluster().nodes().size() 
== 2
+                && 
!doubtNode0.cluster().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet()).contains(frozenNodeId),
+            getTestTimeout()));
+
+        assertTrue(waitForCondition(() -> 
pingingNode2.cluster().nodes().size() == 2
+                && 
!pingingNode2.cluster().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet()).contains(frozenNodeId),
+            getTestTimeout()));
+    }
+
     /**
      * Performs Tests backward node ping if {@link 
TcpDiscoveryNode#socketAddresses()} contains same loopback address as of local 
node.
      * Assumes several local address are resolved.
@@ -396,18 +492,17 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testBackwardConnectionCheckFailedLogMessage() throws Exception 
{
+        startGrid(0);
+
         ListeningTestLogger testLog = new ListeningTestLogger(log);
 
         LogListener lsnr0 = LogListener.matches("Failed to check connection to 
previous node").times(2).build();
 
         testLog.registerListener(lsnr0);
 
-        startGrid(0);
-
-        IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(1));
-        cfg.setGridLogger(testLog);
+        gridLog = testLog;
 
-        startGrid(cfg);
+        startGrid(1);
 
         startGrid(2);
 
@@ -507,15 +602,6 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
                 impl = new ServerImpl(this, 1);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage 
msg, Socket sock, int res,
-            long timeout) throws IOException {
-            if (dropMsg(sock))
-                return;
-
-            super.writeToSocket(msg, sock, res, timeout);
-        }
-
         /** */
         private boolean dropMsg(Socket sock) {
             Collection<InetSocketAddress> addrsToBlock = this.addrsToBlock;
@@ -531,6 +617,15 @@ public class TcpDiscoveryNetworkIssuesTest extends 
GridCommonAbstractTest {
             return false;
         }
 
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage 
msg, Socket sock, int res,
+            long timeout) throws IOException {
+            if (dropMsg(sock))
+                return;
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {

Reply via email to