ignite-1524 Fixed processing of ClientReconnectMessage
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04f4f54a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04f4f54a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04f4f54a Branch: refs/heads/ignite-1093-2 Commit: 04f4f54a7ff1d43fa3baf4fa07865a8163796a82 Parents: 1942d75 Author: sboikov <[email protected]> Authored: Wed Sep 23 09:31:59 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 23 09:31:59 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 97 +++++++++++--------- ...lientDiscoverySpiFailureTimeoutSelfTest.java | 33 +++++-- 2 files changed, 81 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/04f4f54a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 4ce46e8..8a205d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2958,70 +2958,81 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Client reconnect message. */ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg) { + UUID nodeId = msg.creatorNodeId(); + UUID locNodeId = getLocalNodeId(); boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId()); if (!msg.verified()) { - assert isLocNodeRouter; - - msg.verify(locNodeId); + TcpDiscoveryNode node = ring.node(nodeId); - if (ring.hasRemoteNodes()) { - sendMessageAcrossRing(msg); + assert node == null || node.isClient(); - return; + if (node != null) { + node.clientRouterNodeId(msg.routerNodeId()); + node.aliveCheck(spi.maxMissedClientHbs); } - } - - UUID nodeId = msg.creatorNodeId(); - TcpDiscoveryNode node = ring.node(nodeId); + if (isLocalNodeCoordinator()) { + msg.verify(locNodeId); - assert node == null || node.isClient(); + if (node != null) { + Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node); - if (node != null) { - assert node.isClient(); + if (pending != null) { + msg.pendingMessages(pending); + msg.success(true); - node.clientRouterNodeId(msg.routerNodeId()); - node.aliveCheck(spi.maxMissedClientHbs); + if (log.isDebugEnabled()) + log.debug("Accept client reconnect, restored pending messages " + + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + } + else { + if (log.isDebugEnabled()) + log.debug("Failing reconnecting client node because failed to restore pending " + + "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); - if (isLocalNodeCoordinator()) { - Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(), node); + processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, + node.id(), node.internalOrder())); + } + } + else if (log.isDebugEnabled()) + log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); - if (pending != null) { - msg.pendingMessages(pending); - msg.success(true); + if (isLocNodeRouter) { + ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); - if (log.isDebugEnabled()) - log.debug("Accept client reconnect, restored pending messages " + - "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); + if (wrk != null) + wrk.addMessage(msg); + else if (log.isDebugEnabled()) + log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + locNodeId + ", clientNodeId=" + nodeId + ']'); } else { - if (log.isDebugEnabled()) - log.debug("Failing reconnecting client node because failed to restore pending " + - "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); - - processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, - node.id(), node.internalOrder())); + if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); } } - } - else if (log.isDebugEnabled()) - log.debug("Reconnecting client node is already failed [nodeId=" + nodeId + ']'); - - if (isLocNodeRouter) { - ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); - - if (wrk != null) - wrk.addMessage(msg); - else if (log.isDebugEnabled()) - log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + - locNodeId + ", clientNodeId=" + nodeId + ']'); + else { + if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); + } } else { - if (ring.hasRemoteNodes()) - sendMessageAcrossRing(msg); + if (isLocNodeRouter) { + ClientMessageWorker wrk = clientMsgWorkers.get(nodeId); + + if (wrk != null) + wrk.addMessage(msg); + else if (log.isDebugEnabled()) + log.debug("Failed to reconnect client node (disconnected during the process) [locNodeId=" + + locNodeId + ", clientNodeId=" + nodeId + ']'); + } + else { + if (ring.hasRemoteNodes() && !locNodeId.equals(msg.verifierNodeId())) + sendMessageAcrossRing(msg); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/04f4f54a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java index 14417c1..344efc0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -170,11 +172,26 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov } /** + * @throws Exception If failed. + */ + public void testClientReconnectOnCoordinatorRouterFail1() throws Exception { + clientReconnectOnCoordinatorRouterFail(1); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectOnCoordinatorRouterFail2() throws Exception { + clientReconnectOnCoordinatorRouterFail(2); + } + + /** * Test tries to provoke scenario when client sends reconnect message before router failure detected. * + * @param srvNodes Number of additional server nodes. * @throws Exception If failed. */ - public void _testClientReconnectOnCoordinatorRouterFail() throws Exception { + public void clientReconnectOnCoordinatorRouterFail(int srvNodes) throws Exception { startServerNodes(1); Ignite srv = G.ignite("server-0"); @@ -189,24 +206,28 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1))); failureThreshold = 1000L; - netTimeout = 500L; + netTimeout = 1000L; startClientNodes(1); // Client should connect to coordinator. failureThreshold = 10_000L; netTimeout = 5000L; - for (int i = 0; i < 2; i++) { + List<String> nodes = new ArrayList<>(); + + for (int i = 0; i < srvNodes; i++) { Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + nodes.add(g.name()); + srvNodeIds.add(g.cluster().localNode().id()); } - checkNodes(3, 1); + checkNodes(1 + srvNodes, 1); - final CountDownLatch latch = new CountDownLatch(3); + nodes.add("client-0"); - String nodes[] = {"server-1", "server-2", "client-0"}; + final CountDownLatch latch = new CountDownLatch(nodes.size()); final AtomicBoolean err = new AtomicBoolean();
