This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 16d6bfd64c9 IGNITE-23118 Fixed connection check to the previous node
in the ring.
16d6bfd64c9 is described below
commit 16d6bfd64c9dae0b0ac9e75f3e754bbb871b232f
Author: Vladimir Steshin <[email protected]>
AuthorDate: Tue Oct 1 15:41:13 2024 +0300
IGNITE-23118 Fixed connection check to the previous node in the ring.
---
.../spi/IgniteSpiOperationTimeoutHelper.java | 13 ++-
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 54 ++++++---
.../tcp/TcpDiscoveryNetworkIssuesTest.java | 123 ++++++++++++++++++---
4 files changed, 159 insertions(+), 33 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index d5129317815..b95bf22427d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -74,6 +74,17 @@ public class IgniteSpiOperationTimeoutHelper {
}
}
+ /**
+ * Creates timeout helper with an absolute time threshold. Sets {@code
timeoutEnabled} to {@code false}.
+ *
+ * @param timeout Timeout in milliseconds.
+ */
+ public IgniteSpiOperationTimeoutHelper(long timeout) {
+ timeoutEnabled = false;
+
+ timeoutThreshold = System.nanoTime() + U.millisToNanos(timeout);
+ }
+
/**
* Returns a timeout value to use for the next network operation.
*
@@ -111,7 +122,7 @@ public class IgniteSpiOperationTimeoutHelper {
* @param e Exception to check.
* @return {@code True} if given exception is a timeout. {@code False}
otherwise.
*/
- public boolean checkFailureTimeoutReached(Exception e) {
+ public static boolean checkFailureTimeoutReached(Exception e) {
return X.hasCause(e, IgniteSpiOperationTimeoutException.class,
SocketTimeoutException.class, SocketException.class);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e02d591550f..57aeb328bdd 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -846,7 +846,7 @@ class ClientImpl extends TcpDiscoveryImpl {
break;
}
- if (timeoutHelper.checkFailureTimeoutReached(e))
+ if
(IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt ==
spi.getReconnectCount())
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0ba98ae7097..acbccbb09a6 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -787,7 +787,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (InetSocketAddress addr : spi.getEffectiveNodeAddresses(node)) {
try {
// ID returned by the node should be the same as ID of the
parameter for ping to succeed.
- IgniteBiTuple<UUID, Boolean> t = pingNode(addr, node.id(),
clientNodeId);
+ IgniteBiTuple<UUID, Boolean> t = pingNode(addr, node.id(),
clientNodeId, 0);
if (t == null)
// Remote node left topology.
@@ -818,18 +818,25 @@ class ServerImpl extends TcpDiscoveryImpl {
* @param addr Address of the node.
* @param nodeId Node ID to ping. In case when client node ID is not null
this node ID is an ID of the router node.
* @param clientNodeId Client node ID.
+ * @param timeout Timeout on operation in milliseconds. If 0, a value
based on {@link TcpDiscoverySpi} is used.
* @return ID of the remote node and "client exists" flag if node alive or
{@code null} if the remote node has
* left a topology during the ping process.
* @throws IgniteCheckedException If an error occurs.
*/
- @Nullable private IgniteBiTuple<UUID, Boolean> pingNode(InetSocketAddress
addr, @Nullable UUID nodeId,
- @Nullable UUID clientNodeId) throws IgniteCheckedException {
+ @Nullable private IgniteBiTuple<UUID, Boolean> pingNode(
+ InetSocketAddress addr,
+ @Nullable UUID nodeId,
+ @Nullable UUID clientNodeId,
+ long timeout
+ ) throws IgniteCheckedException {
assert addr != null;
+ assert timeout >= 0;
- UUID locNodeId = getLocalNodeId();
+ IgniteSpiOperationTimeoutHelper timeoutHelper = timeout == 0
+ ? new IgniteSpiOperationTimeoutHelper(spi, clientNodeId == null)
+ : new IgniteSpiOperationTimeoutHelper(timeout);
- IgniteSpiOperationTimeoutHelper timeoutHelper = new
IgniteSpiOperationTimeoutHelper(spi,
- clientNodeId == null);
+ UUID locNodeId = getLocalNodeId();
if (F.contains(spi.locNodeAddrs, addr)) {
if (clientNodeId == null)
@@ -928,9 +935,10 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e)) {
+ if
(IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e)
+ && (spi.failureDetectionTimeoutEnabled() ||
timeout != 0)) {
log.warning("Failed to ping node [nodeId=" +
nodeId + "]. Reached the timeout " +
- spi.failureDetectionTimeout() + "ms. Cause: "
+ e.getMessage());
+ (timeout == 0 ? spi.failureDetectionTimeout()
: timeout) + "ms. Cause: " + e.getMessage());
break;
}
@@ -1579,7 +1587,7 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e))
+ if (spi.failureDetectionTimeoutEnabled() &&
IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt ==
spi.getReconnectCount())
@@ -2260,7 +2268,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (res == null) {
try {
- res = pingNode(addr, null, null) != null;
+ res = pingNode(addr, null, null, 0) != null;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -3630,7 +3638,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!spi.failureDetectionTimeoutEnabled() &&
++reconCnt == spi.getReconnectCount())
break;
- if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e))
+ if (spi.failureDetectionTimeoutEnabled() &&
IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled()
&& (e instanceof
SocketTimeoutException || X.hasCause(e,
SocketTimeoutException.class))) {
@@ -3801,7 +3809,7 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node
[next=" + next.id() + ", msg=" + msg + ']',
e);
- if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e))
+ if (spi.failureDetectionTimeoutEnabled() &&
IgniteSpiOperationTimeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled()) {
@@ -7246,8 +7254,17 @@ class ServerImpl extends TcpDiscoveryImpl {
lastRingMsgReceivedTime = System.nanoTime();
}
- /** @return Alive address if was able to connected to. {@code Null}
otherwise. */
+ /**
+ * Asynchronously searches for an alive address of a node using a
maximal timeout.
+ *
+ * @param node Node to ping.
+ * @param timeout Overall operation timeout.
+ * @return An address successfully connected to. {@code Null} if no
alive address was detected within the timeout.
+ * @see #pingNode(InetSocketAddress, UUID, UUID, long)
+ */
private InetSocketAddress checkConnection(TcpDiscoveryNode node, int
timeout) {
+ IgniteSpiOperationTimeoutHelper timeoutHelper = new
IgniteSpiOperationTimeoutHelper(timeout);
+
AtomicReference<InetSocketAddress> liveAddrHolder = new
AtomicReference<>();
List<InetSocketAddress> addrs = new
ArrayList<>(spi.getEffectiveNodeAddresses(node));
@@ -7277,11 +7294,14 @@ class ServerImpl extends TcpDiscoveryImpl {
for (int i = 0; i < addrsToCheck; ++i) {
InetSocketAddress addr =
addrs.get(addrIdx.getAndIncrement());
- try (Socket sock = new Socket()) {
+ try {
if (liveAddrHolder.get() == null) {
- sock.connect(addr, perAddrTimeout);
+ UUID id = pingNode(addr, node.id(), null,
timeoutHelper.nextTimeoutChunk(perAddrTimeout)).get1();
+
+ assert id == null || id.equals(node.id());
- liveAddrHolder.compareAndSet(null, addr);
+ if (id != null)
+ liveAddrHolder.compareAndSet(null,
addr);
}
}
catch (Exception e) {
@@ -7514,7 +7534,7 @@ class ServerImpl extends TcpDiscoveryImpl {
for (InetSocketAddress addr : spi.getEffectiveNodeAddresses(node))
{
try {
if (!(addr.getAddress().isLoopbackAddress() &&
locNode.socketAddresses().contains(addr))) {
- IgniteBiTuple<UUID, Boolean> t = pingNode(addr,
node.id(), null);
+ IgniteBiTuple<UUID, Boolean> t = pingNode(addr,
node.id(), null, 0);
if (t != null)
return true;
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index ea4ca2f0353..7162083af9c 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -29,14 +29,18 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -124,6 +128,9 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
/** */
private String localhost;
+ /** */
+ private IgniteLogger gridLog;
+
/** */
private final GridConcurrentHashSet<Integer> segmentedNodes = new
GridConcurrentHashSet<>();
@@ -156,6 +163,9 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
cfg.setLocalHost(localhost);
+ if (gridLog != null)
+ cfg.setGridLogger(gridLog);
+
return cfg;
}
@@ -249,6 +259,92 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
doTestBackwardNodeCheckWithSameLoopback("0.0.0.0");
}
+ /**
+ * Tests backward ping when the discovery threads of the malfunction node
is simulated to hang at GC.
+ * But the JVM is able to accept socket connections.
+ */
+ @Test
+ public void testBackwardConnectionCheckWhenDiscoveryThreadsSuspended()
throws Exception {
+ ListeningTestLogger testLog = new ListeningTestLogger(log);
+
+ gridLog = testLog;
+
+ localhost = "127.0.0.1";
+
+ failureDetectionTimeout = 3000;
+
+ specialSpi = new TestDiscoverySpi();
+
+ // This node suspects its next failed.
+ Ignite doubtNode0 = startGrid(0);
+
+ // Simulates frozen threads on node 1 but answering sockets. I.e.
Socket#connect() works to node 1 but
+ // reading anything with Socket#read() from it would fail with the
timeout.
+ specialSpi = new TestDiscoverySpi();
+
+ // Node simulated 'frozen'. Can accept connections (socket accept) but
won't write anything to a discovery socket.
+ Ignite frozenNode1 = startGrid(1);
+
+ UUID frozenNodeId = frozenNode1.cluster().localNode().id();
+
+ specialSpi = new TestDiscoverySpi();
+
+ setLoggerDebugLevel();
+
+ // Node which does the backward connection check to its previous
'frozen'.
+ Ignite pingingNode2 = startGrid(2);
+
+ LogListener node1SegmentedLogLsnr = LogListener.matches("Local node
SEGMENTED: TcpDiscoveryNode [id=" + frozenNode1).build();
+
+ // Node1 must leave the cluster.
+ LogListener backwardPingLogLsnr = LogListener.matches("Remote node
requests topology change. Checking connection to " +
+ "previous [TcpDiscoveryNode [id=" + frozenNodeId).build();
+
+ testLog.registerListener(node1SegmentedLogLsnr);
+ testLog.registerListener(backwardPingLogLsnr);
+
+ // Result of the ping from node2 ot node1.
+ AtomicReference<Boolean> backwardPingResult = new AtomicReference<>();
+
+ // Request to establish new permanent cluster connection from doubting
node0 to node2.
+ testSpi(doubtNode0).hsRqLsnr.set((s, hsRq) -> {
+ if (hsRq.changeTopology() &&
frozenNodeId.equals(hsRq.checkPreviousNodeId())) {
+ // Continue simulation of node1 freeze at GC and processes no
discovery messages.
+ testSpi(frozenNode1).addrsToBlock = Collections.emptyList();
+ }
+ });
+
+ // Response from node2 to node0 with negative check of frozen node1.
+ testSpi(pingingNode2).hsRespLsnr.set((s, hsResp) -> {
+ backwardPingResult.set(hsResp.previousNodeAlive());
+ });
+
+ // Begin simulation of node1 freeze at GC and processes no discovery
messages and wait till
+ // the discovery traffic node0->node1 stops.
+ testSpi(doubtNode0).addrsToBlock = spi(frozenNode1).locNodeAddrs;
+ assertTrue(waitForCondition(() -> testSpi(doubtNode0).blocked,
getTestTimeout()));
+
+ // Wait till the discovery traffic node1->node2 stops too.
+ assertTrue(waitForCondition(() -> testSpi(frozenNode1).blocked,
getTestTimeout()));
+
+ // Wait till the backward connection check and ensure the result is
negative (node1 confirmed failed).
+ assertTrue(backwardPingLogLsnr.check(getTestTimeout()));
+ assertTrue(waitForCondition(() -> backwardPingResult.get() != null,
getTestTimeout()));
+
+ assertFalse(backwardPingResult.get());
+
+ assertTrue(backwardPingLogLsnr.check(getTestTimeout()));
+
+ // Node0 and node2 must survive.
+ assertTrue(waitForCondition(() -> doubtNode0.cluster().nodes().size()
== 2
+ &&
!doubtNode0.cluster().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet()).contains(frozenNodeId),
+ getTestTimeout()));
+
+ assertTrue(waitForCondition(() ->
pingingNode2.cluster().nodes().size() == 2
+ &&
!pingingNode2.cluster().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet()).contains(frozenNodeId),
+ getTestTimeout()));
+ }
+
/**
* Performs Tests backward node ping if {@link
TcpDiscoveryNode#socketAddresses()} contains same loopback address as of local
node.
* Assumes several local address are resolved.
@@ -396,18 +492,17 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
*/
@Test
public void testBackwardConnectionCheckFailedLogMessage() throws Exception
{
+ startGrid(0);
+
ListeningTestLogger testLog = new ListeningTestLogger(log);
LogListener lsnr0 = LogListener.matches("Failed to check connection to
previous node").times(2).build();
testLog.registerListener(lsnr0);
- startGrid(0);
-
- IgniteConfiguration cfg =
getConfiguration(getTestIgniteInstanceName(1));
- cfg.setGridLogger(testLog);
+ gridLog = testLog;
- startGrid(cfg);
+ startGrid(1);
startGrid(2);
@@ -507,15 +602,6 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
impl = new ServerImpl(this, 1);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(TcpDiscoveryAbstractMessage
msg, Socket sock, int res,
- long timeout) throws IOException {
- if (dropMsg(sock))
- return;
-
- super.writeToSocket(msg, sock, res, timeout);
- }
-
/** */
private boolean dropMsg(Socket sock) {
Collection<InetSocketAddress> addrsToBlock = this.addrsToBlock;
@@ -531,6 +617,15 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
return false;
}
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage
msg, Socket sock, int res,
+ long timeout) throws IOException {
+ if (dropMsg(sock))
+ return;
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {