This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3db1761 IGNITE-13465 Connection recovery timeout is fixed when
recovery protocol is executed. - Fixes #8262.
3db1761 is described below
commit 3db17612d7c433ddfbb404f92eebca6dd2f4fefe
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Oct 26 09:40:19 2020 +0300
IGNITE-13465 Connection recovery timeout is fixed when recovery protocol is
executed. - Fixes #8262.
Signed-off-by: Sergey Chugunov <[email protected]>
---
.../spi/IgniteSpiOperationTimeoutHelper.java | 86 ++++++++---------
.../ignite/spi/discovery/tcp/ServerImpl.java | 103 ++++++++++++++++-----
.../tcp/internal/TcpDiscoveryNodesRing.java | 11 +++
...cheContinuousQueryFailoverAbstractSelfTest.java | 2 +-
.../tcp/TcpDiscoveryNetworkIssuesTest.java | 91 +++++++++++++++++-
5 files changed, 216 insertions(+), 77 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index 48161ba..96fea9e 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -30,20 +30,11 @@ import org.apache.ignite.internal.util.typedef.internal.U;
*
*/
public class IgniteSpiOperationTimeoutHelper {
- // https://issues.apache.org/jira/browse/IGNITE-11221
- // We need to reuse new logic ExponentialBackoffTimeout logic in
TcpDiscovery instead of this class.
+ /** Flag whether to use timeout. */
+ private final boolean timeoutEnabled;
- /** */
- private long lastOperStartNanos;
-
- /** */
- private long timeout;
-
- /** */
- private final boolean failureDetectionTimeoutEnabled;
-
- /** */
- private final long failureDetectionTimeout;
+ /** Time in nanos which cannot be reached for current operation. */
+ private final long timeoutThreshold;
/**
* Constructor.
@@ -52,9 +43,7 @@ public class IgniteSpiOperationTimeoutHelper {
* @param srvOp {@code True} if communicates with server node.
*/
public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean
srvOp) {
- failureDetectionTimeoutEnabled =
adapter.failureDetectionTimeoutEnabled();
- failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() :
- adapter.clientFailureDetectionTimeout();
+ this(adapter, srvOp, -1, -1);
}
/**
@@ -62,15 +51,26 @@ public class IgniteSpiOperationTimeoutHelper {
*
* @param adapter SPI adapter.
* @param srvOp {@code True} if communicates with server node.
- * @param lastOperStartNanos Time of last related operation in nanos.
+ * @param lastRelatedOperationTime Time of last related operation in
nanos. Ignored if negative, 0 or
+ * {@code adapter.failureDetectionTimeoutEnabled()} is false.
+ * @param absoluteThreshold Absolute time threshold (nanos) which must not
be reached. Ignored if negative or 0.
*/
- public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean
srvOp, long lastOperStartNanos) {
- this(adapter, srvOp);
+ public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean
srvOp, long lastRelatedOperationTime,
+ long absoluteThreshold) {
+ timeoutEnabled = adapter.failureDetectionTimeoutEnabled();
- this.lastOperStartNanos = lastOperStartNanos;
+ if (timeoutEnabled) {
+ long timeout = (lastRelatedOperationTime > 0 ?
lastRelatedOperationTime : System.nanoTime()) +
+ U.millisToNanos(srvOp ? adapter.failureDetectionTimeout() :
adapter.clientFailureDetectionTimeout());
- if (lastOperStartNanos > 0)
- timeout = failureDetectionTimeout;
+ if (absoluteThreshold > 0 && timeout > absoluteThreshold)
+ timeout = absoluteThreshold;
+
+ timeoutThreshold = timeout;
+ } else {
+ // Save absolute threshold if it is set.
+ timeoutThreshold = absoluteThreshold > 0 ? absoluteThreshold : 0;
+ }
}
/**
@@ -85,42 +85,32 @@ public class IgniteSpiOperationTimeoutHelper {
* this {@code IgniteSpiOperationTimeoutController}.
*/
public long nextTimeoutChunk(long dfltTimeout) throws
IgniteSpiOperationTimeoutException {
- if (!failureDetectionTimeoutEnabled)
- return dfltTimeout;
+ long now = System.nanoTime();
- if (lastOperStartNanos == 0) {
- timeout = failureDetectionTimeout;
- lastOperStartNanos = System.nanoTime();
- }
+ long left;
+
+ if (timeoutEnabled)
+ left = timeoutThreshold - now;
else {
- long curNanos = System.nanoTime();
+ left = U.millisToNanos(dfltTimeout);
- timeout -= U.nanosToMillis(curNanos - lastOperStartNanos);
+ if (timeoutThreshold > 0 && now + left >= timeoutThreshold)
+ left = timeoutThreshold - now;
+ }
- lastOperStartNanos = curNanos;
+ if (left <= 0)
+ throw new IgniteSpiOperationTimeoutException("Network operation
timed out.");
- if (timeout <= 0)
- throw new IgniteSpiOperationTimeoutException("Network
operation timed out. Increase " +
- "'failureDetectionTimeout' configuration property
[failureDetectionTimeout="
- + failureDetectionTimeout + ']');
- }
-
- return timeout;
+ return U.nanosToMillis(left);
}
/**
- * Checks whether the given {@link Exception} is generated because failure
detection timeout has been reached.
+ * Checks whether the given {@link Exception} is a timeout.
*
- * @param e Exception.
- * @return {@code true} if failure detection timeout is reached, {@code
false} otherwise.
+ * @param e Exception to check.
+ * @return {@code True} if given exception is a timeout. {@code False}
otherwise.
*/
public boolean checkFailureTimeoutReached(Exception e) {
- if (!failureDetectionTimeoutEnabled)
- return false;
-
- if (X.hasCause(e, IgniteSpiOperationTimeoutException.class,
SocketTimeoutException.class, SocketException.class))
- return true;
-
- return (timeout - U.millisSinceNanos(lastOperStartNanos) <= 0);
+ return X.hasCause(e, IgniteSpiOperationTimeoutException.class,
SocketTimeoutException.class, SocketException.class);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9d5e3ca..d0c8e8e 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -215,6 +215,9 @@ class ServerImpl extends TcpDiscoveryImpl {
/** Maximal interval of connection check to next node in the ring. */
private static final long MAX_CON_CHECK_INTERVAL = 500;
+ /** Minimal timeout to find connection to some next node in the ring while
connection recovering. */
+ private static final long MIN_RECOVERY_TIMEOUT = 100;
+
/** Interval of checking connection to next node in the ring. */
private long connCheckInterval;
@@ -922,7 +925,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!openedSock && reconCnt == 2)
break;
- if (timeoutHelper.checkFailureTimeoutReached(e))
+ if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled() &&
reconCnt == spi.getReconnectCount())
break;
@@ -1576,7 +1579,7 @@ class ServerImpl extends TcpDiscoveryImpl {
break;
}
- if (timeoutHelper.checkFailureTimeoutReached(e))
+ if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled() && ++reconCnt ==
spi.getReconnectCount())
@@ -3477,8 +3480,12 @@ class ServerImpl extends TcpDiscoveryImpl {
while (true) {
if (sock == null) {
- if (timeoutHelper == null)
- timeoutHelper = new
IgniteSpiOperationTimeoutHelper(spi, true);
+ // We re-create the helper here because it could
be created earlier with wrong timeout on
+ // message sending like
IgniteConfiguration.failureDetectionTimeout. Here we are in the
+ // state of conenction recovering and have to work
with
+ //
TcpDiscoverSpi.getEffectiveConnectionRecoveryTimeout()
+ if (timeoutHelper == null || sndState != null)
+ timeoutHelper =
serverOperationTimeoutHelper(sndState, -1);
boolean success = false;
@@ -3486,8 +3493,6 @@ class ServerImpl extends TcpDiscoveryImpl {
// Restore ring.
try {
- long tsNanos = System.nanoTime();
-
sock = spi.openSocket(addr, timeoutHelper);
out = spi.socketStream(sock);
@@ -3517,6 +3522,8 @@ class ServerImpl extends TcpDiscoveryImpl {
// We should take previousNodeAlive flag into
account only if we received the response from the correct node.
if (res.creatorNodeId().equals(next.id()) &&
res.previousNodeAlive() && sndState != null) {
+ sndState.checkTimeout();
+
// Remote node checked connection to it's
previous and got success.
boolean previousNode =
sndState.markLastFailedNodeAlive();
@@ -3624,13 +3631,20 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to connect to next node
[msg=" + msg + ", err=" + e + ']', e);
+ // Fastens failure detection.
+ if (sndState != null &&
sndState.checkTimeout()) {
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return; // Nothing to do here.
+ }
+
if (!openSock)
break; // Don't retry if we can not
establish connection.
if (!spi.failureDetectionTimeoutEnabled() &&
++reconCnt == spi.getReconnectCount())
break;
- if
(timeoutHelper.checkFailureTimeoutReached(e))
+ if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e))
break;
else if (!spi.failureDetectionTimeoutEnabled()
&& (e instanceof
SocketTimeoutException || X.hasCause(e,
SocketTimeoutException.class))) {
@@ -3689,10 +3703,8 @@ class ServerImpl extends TcpDiscoveryImpl {
addFailedNodes(pendingMsg, failedNodes);
- if (timeoutHelper == null) {
- timeoutHelper = new
IgniteSpiOperationTimeoutHelper(spi, true,
- lastRingMsgSentTime);
- }
+ if (timeoutHelper == null)
+ timeoutHelper =
serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
try {
spi.writeToSocket(sock, out,
pendingMsg, timeoutHelper.nextTimeoutChunk(
@@ -3736,7 +3748,7 @@ class ServerImpl extends TcpDiscoveryImpl {
long tsNanos = System.nanoTime();
if (timeoutHelper == null)
- timeoutHelper = new
IgniteSpiOperationTimeoutHelper(spi, true, lastRingMsgSentTime);
+ timeoutHelper =
serverOperationTimeoutHelper(sndState, lastRingMsgSentTime);
addFailedNodes(msg, failedNodes);
@@ -3803,7 +3815,7 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to send message to next node
[next=" + next.id() + ", msg=" + msg + ']',
e);
- if (timeoutHelper.checkFailureTimeoutReached(e))
+ if (spi.failureDetectionTimeoutEnabled() &&
timeoutHelper.checkFailureTimeoutReached(e))
break;
if (!spi.failureDetectionTimeoutEnabled()) {
@@ -3840,6 +3852,11 @@ class ServerImpl extends TcpDiscoveryImpl {
if (!sent) {
if (sndState == null &&
spi.getEffectiveConnectionRecoveryTimeout() > 0)
sndState = new CrossRingMessageSendState();
+ else if (sndState != null && sndState.checkTimeout()) {
+ segmentLocalNodeOnSendFail(failedNodes);
+
+ return; // Nothing to do here.
+ }
boolean failedNextNode = sndState == null ||
sndState.markNextNodeFailed();
@@ -3873,12 +3890,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
- if (sndState != null && sndState.isFailed()) {
- segmentLocalNodeOnSendFail(failedNodes);
-
- return; // Nothing to do here.
- }
-
next = null;
errs = null;
@@ -6501,6 +6512,38 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ /**
+ * Creates proper timeout helper taking in account current send state and
ring state.
+ *
+ * @param sndState Current connection recovering state. Ignored if {@code
null}.
+ * @param lastOperationNanos Time of last related operation. Ignored if
negative or 0.
+ * @return Timeout helper.
+ */
+ private IgniteSpiOperationTimeoutHelper
serverOperationTimeoutHelper(@Nullable CrossRingMessageSendState sndState,
+ long lastOperationNanos) {
+ long absoluteThreshold = -1;
+
+ // Active send-state means we lost connection to next node and have to
find another.
+ // We don't know how many nodes failed. May be several failed in a
row. But we got only one
+ // connectionRecoveryTimeout to establish new connection to the ring.
We can't spend this timeout wholly on one
+ // or two next nodes. We should slice it and try to travers as many as
we can.
+ if (sndState != null) {
+ int nodesLeft = ring.serverNodes().size() - 1 -
sndState.failedNodes;
+
+ assert nodesLeft > 0;
+
+ long now = System.nanoTime();
+
+ // In case of large cluster and small connectionRecoveryTimeout we
have to provide reasonable minimal
+ // timeout per one of the next nodes. It should not appear too
small like 1, 5 or 10ms.
+ long perNodeTimeout = Math.max((sndState.failTimeNanos - now) /
nodesLeft, MIN_RECOVERY_TIMEOUT);
+
+ absoluteThreshold = Math.min(sndState.failTimeNanos, now +
perNodeTimeout);
+ }
+
+ return new IgniteSpiOperationTimeoutHelper(spi, true,
lastOperationNanos, absoluteThreshold);
+ }
+
/** Fixates time of last sent message. */
private void updateLastSentMessageTime() {
lastRingMsgSentTime = System.nanoTime();
@@ -8197,6 +8240,22 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
+ * Checks if message sending has completely failed due to the timeout.
Sets {@code RingMessageSendState#FAILED}
+ * if the timeout is reached.
+ *
+ * @return {@code True} if passed timeout is reached. {@code False}
otherwise.
+ */
+ boolean checkTimeout() {
+ if (System.nanoTime() >= failTimeNanos) {
+ state = RingMessageSendState.FAILED;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* Marks last failed node as alive.
*
* @return {@code False} if all failed nodes marked as alive or
incorrect state.
@@ -8208,12 +8267,6 @@ class ServerImpl extends TcpDiscoveryImpl {
if (--failedNodes <= 0) {
failedNodes = 0;
- if (System.nanoTime() - failTimeNanos >= 0) {
- state = RingMessageSendState.FAILED;
-
- return false;
- }
-
state = RingMessageSendState.STARTING_POINT;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index fdb9997..aaa3165 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -173,6 +173,17 @@ public class TcpDiscoveryNodesRing {
}
/**
+ * @return Server nodes.
+ */
+ public Collection<TcpDiscoveryNode> serverNodes() {
+ return nodes(new PN() {
+ @Override public boolean apply(ClusterNode node) {
+ return ((TcpDiscoveryNode)node).clientRouterNodeId() == null;
+ }
+ });
+ }
+
+ /**
* Checks whether the topology has remote nodes in.
*
* @return {@code true} if the topology has remote nodes in.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 3d6a99e..5b872ca 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -745,7 +745,7 @@ public abstract class
CacheContinuousQueryFailoverAbstractSelfTest extends GridC
return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /*
client node */)
- 1 /* Primary node */ - backups;
}
- }, 5000L);
+ }, getConfiguration("").getFailureDetectionTimeout() * 2);
awaitPartitionMapExchange();
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
index dfc15e0..a751ac4 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNetworkIssuesTest.java
@@ -22,20 +22,31 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+
/**
*
*/
@@ -88,6 +99,12 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
/** */
private int connectionRecoveryTimeout = -1;
+ /** */
+ private int failureDetectionTimeout = 2_000;
+
+ /** */
+ private final GridConcurrentHashSet<Integer> segmentedNodes = new
GridConcurrentHashSet<>();
+
/** {@inheritDoc} */
@Override protected void afterTest() {
stopAllGrids();
@@ -107,10 +124,14 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
if (connectionRecoveryTimeout >= 0)
spi.setConnectionRecoveryTimeout(connectionRecoveryTimeout);
- cfg.setFailureDetectionTimeout(2_000);
+ cfg.setFailureDetectionTimeout(failureDetectionTimeout);
cfg.setDiscoverySpi(spi);
+ cfg.setIncludeEventTypes(EVT_NODE_SEGMENTED);
+
+ cfg.setSystemWorkerBlockedTimeout(10_000);
+
return cfg;
}
@@ -162,7 +183,7 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
illNodeSegmented.set(true);
return false;
- }, EventType.EVT_NODE_SEGMENTED);
+ }, EVT_NODE_SEGMENTED);
specialSpi = null;
@@ -187,6 +208,51 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
}
/**
+ * Ensures sequential failure of two nodes has no additional issues.
+ */
+ @Test
+ public void testFailTwoNodes() throws Exception {
+ failureDetectionTimeout = 1000;
+
+ startGrids(5);
+
+ awaitPartitionMapExchange();
+
+ final CountDownLatch failLatch = new CountDownLatch(2);
+
+ for (int i = 0; i < 5; i++) {
+ ignite(i).events().localListen(evt -> {
+ failLatch.countDown();
+
+ return true;
+ }, EVT_NODE_FAILED);
+
+ int nodeIdx = i;
+
+ ignite(i).events().localListen(evt -> {
+ segmentedNodes.add(nodeIdx);
+
+ return true;
+ }, EVT_NODE_SEGMENTED);
+ }
+
+ processNetworkThreads(ignite(2), t -> t.suspend());
+ processNetworkThreads(ignite(3), t -> t.suspend());
+
+ try {
+ failLatch.await(10, TimeUnit.SECONDS);
+ }
+ finally {
+ processNetworkThreads(ignite(2), t -> t.resume());
+ processNetworkThreads(ignite(3), t -> t.resume());
+ }
+
+ assertFalse(segmentedNodes.contains(0));
+ assertFalse(segmentedNodes.contains(1));
+ assertFalse(segmentedNodes.contains(4));
+ }
+
+ /**
* @param ig Ignite instance to get failedNodes collection from.
*/
private Map getFailedNodesCollection(IgniteEx ig) {
@@ -211,4 +277,23 @@ public class TcpDiscoveryNetworkIssuesTest extends
GridCommonAbstractTest {
out.close();
}
+
+ /**
+ * Simulates network failure on certain node.
+ */
+ private void processNetworkThreads(Ignite ignite, Consumer<Thread> proc) {
+ DiscoverySpi disco = ignite.configuration().getDiscoverySpi();
+
+ ServerImpl serverImpl = U.field(disco, "impl");
+
+ for (Thread thread : serverImpl.threads())
+ proc.accept(thread);
+
+ CommunicationSpi<?> comm =
ignite.configuration().getCommunicationSpi();
+
+ GridNioServerWrapper nioServerWrapper = U.field(comm, "nioSrvWrapper");
+
+ for (GridWorker worker : nioServerWrapper.nio().workers())
+ proc.accept(worker.runner());
+ }
}