ignite-6700 Tcp discovery: ignore message's failedNodes list received from failed nodes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f69d262 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f69d262 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f69d262 Branch: refs/heads/ignite-5937 Commit: 5f69d262331f60ed62822c3cbad5c643b8f9a025 Parents: 62cb4fb Author: sboikov <[email protected]> Authored: Tue Oct 24 11:24:06 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 24 11:24:06 2017 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 45 ++++++-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 18 +++ .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 110 +++++++++++++++++++ 3 files changed, 163 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5f69d262/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 58e1ba4..4c2a0ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1959,8 +1959,36 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { - if (msg.failedNodes() != null) { - for (UUID nodeId : msg.failedNodes()) { + Collection<UUID> msgFailedNodes = msg.failedNodes(); + + if (msgFailedNodes != null) { + UUID sndId = msg.senderNodeId(); + + if (sndId != null) { + if (ring.node(sndId) == null) { + if (log.isDebugEnabled()) { + log.debug("Ignore message failed nodes, sender node is not alive [nodeId=" + sndId + + ", failedNodes=" + msgFailedNodes + ']'); + } + + return; + } + + synchronized (mux) { + for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { + if (failedNode.id().equals(sndId)) { + if (log.isDebugEnabled()) { + log.debug("Ignore message failed nodes, sender node is in fail list [nodeId=" + sndId + + ", failedNodes=" + msgFailedNodes + ']'); + } + + return; + } + } + } + } + + for (UUID nodeId : msgFailedNodes) { TcpDiscoveryNode failedNode = ring.node(nodeId); if (failedNode != null) { @@ -2817,9 +2845,6 @@ class ServerImpl extends TcpDiscoveryImpl { log.trace("Next node remains the same [nextId=" + next.id() + ", nextOrder=" + next.internalOrder() + ']'); - // Flag that shows whether next node exists and accepts incoming connections. - boolean nextNodeExists = sock != null; - final boolean sameHost = U.sameMacs(locNode, next); List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses()); @@ -2844,8 +2869,6 @@ class ServerImpl extends TcpDiscoveryImpl { if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); - nextNodeExists = false; - boolean success = false; boolean openSock = false; @@ -2974,8 +2997,6 @@ class ServerImpl extends TcpDiscoveryImpl { sock = null; } else { - // Next node exists and accepts incoming messages. - nextNodeExists = true; // Resetting timeout control object to let the code below to use a new one // for the next bunch of operations. timeoutHelper = null; @@ -3067,7 +3088,11 @@ class ServerImpl extends TcpDiscoveryImpl { if (latencyCheck && log.isInfoEnabled()) log.info("Latency check message has been written to socket: " + msg.id()); - spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); + spi.writeToSocket(newNextNode ? newNext : next, + sock, + out, + msg, + timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); long tstamp0 = U.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/ignite/blob/5f69d262/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index eb8ee30..922e787 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1478,6 +1478,24 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } /** + * @param node Target node. + * @param sock Socket. + * @param out Stream to write to. + * @param msg Message. + * @param timeout Timeout. + * @throws IOException If IO failed or write timed out. + * @throws IgniteCheckedException If marshalling failed. + */ + protected void writeToSocket( + ClusterNode node, + Socket sock, + OutputStream out, + TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + writeToSocket(sock, out, msg, timeout); + } + + /** * Writes message to the socket. * * @param sock Socket. http://git-wip-us.apache.org/repos/asf/ignite/blob/5f69d262/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index bf48fcc..d6d484c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteIllegalStateException; import org.apache.ignite.Ignition; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -52,6 +53,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; @@ -77,6 +79,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; @@ -2069,6 +2072,42 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testFailedNodeRestoreConnection() throws Exception { + try { + TestRestoreConnectedSpi.startTest = false; + + for (int i = 1; i < 5; i++) { + nodeSpi.set(new TestRestoreConnectedSpi(3)); + + startGrid(i); + } + + awaitPartitionMapExchange(); + + info("Start fail test"); + + TestRestoreConnectedSpi.startTest = true; + + waitNodeStop(getTestIgniteInstanceName(3)); + + U.sleep(5000); + + for (int i = 1; i < 5; i++) { + if (i != 3) { + Ignite node = ignite(i); + + assertEquals(3, node.cluster().nodes().size()); + } + } + } + finally { + stopAllGrids(); + } + } + + /** * @param nodeName Node name. * @throws Exception If failed. */ @@ -2171,6 +2210,77 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** * */ + private static class TestRestoreConnectedSpi extends TcpDiscoverySpi { + /** */ + static volatile boolean startTest; + + /** */ + private long sleepEndTime; + + /** */ + private long errNodeOrder; + + /** */ + private ClusterNode errNext; + + /** + * @param errNodeOrder + */ + TestRestoreConnectedSpi(long errNodeOrder) { + this.errNodeOrder = errNodeOrder; + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(ClusterNode node, + Socket sock, + OutputStream out, + TcpDiscoveryAbstractMessage msg, + long timeout) throws IOException, IgniteCheckedException { + if (startTest && !(msg instanceof TcpDiscoveryConnectionCheckMessage)) { + if (node.order() == errNodeOrder) { + log.info("Fail write on message send [node=" + node.id() + ", msg=" + msg + ']'); + + throw new SocketTimeoutException(); + } + else if (locNode.order() == errNodeOrder) { + if (sleepEndTime == 0) { + errNext = node; + + sleepEndTime = System.currentTimeMillis() + 3000; + } + + long sleepTime = sleepEndTime - System.currentTimeMillis(); + + if (sleepTime > 0) { + log.info("Start sleep on message send: " + msg); + + try { + U.sleep(sleepTime); + } + catch (IgniteInterruptedCheckedException e) { + log.error("Interrupted on socket write: " + e, e); + + throw new IOException(e); + } + + log.info("Stop sleep on message send: " + msg); + + if (node.equals(errNext)) { + log.info("Fail write after sleep [node=" + node.id() + ", msg=" + msg + ']'); + + throw new SocketTimeoutException(); + } + } + } + } + + super.writeToSocket(node, sock, out, msg, timeout); + } + } + + /** + * + */ private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi { /** */ static volatile boolean fail;
