Repository: ignite Updated Branches: refs/heads/ignite-1758 f96781d8b -> 7661e3ee1
ignite-1758 debug Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7661e3ee Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7661e3ee Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7661e3ee Branch: refs/heads/ignite-1758 Commit: 7661e3ee1d6651cef8399cb05fedae0663b7d327 Parents: f96781d Author: sboikov <[email protected]> Authored: Mon Nov 9 16:53:39 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 9 16:53:39 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 67 +++++---- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 142 +++++++++++++++++-- 2 files changed, 173 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7661e3ee/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 5db1e34..af385e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1087,9 +1087,20 @@ class ServerImpl extends TcpDiscoveryImpl { openSock = true; + TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId); + + if (msg instanceof TcpDiscoveryJoinRequestMessage) { + synchronized (failedNodes) { + for (TcpDiscoveryNode node : failedNodes) { + debugLog(null, "Add failed node [node=" + node + ", msg=" + req + ']'); + + req.addFailedNode(node); + } + } + } + // Handshake. - spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk( - spi.getSocketTimeout())); + spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk( ackTimeout0)); @@ -1760,6 +1771,27 @@ class ServerImpl extends TcpDiscoveryImpl { } } + private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { + if (msg.failedNodes() != null) { + for (UUID nodeId : msg.failedNodes()) { + TcpDiscoveryNode failedNode = ring.node(nodeId); + + if (failedNode != null) { + boolean add; + + synchronized (mux) { + add = failedNodes.add(failedNode); + } + + if (add) + debugLog(null, "New failed node [node=" + failedNode + ", msg=" + msg + ']'); + } + else + debugLog(null, "Unknown failed node [nodeId=" + nodeId + ", msg=" + msg + ']'); + } + } + } + /** * Discovery messages history used for client reconnect. */ @@ -2141,27 +2173,6 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Connection check frequency is calculated: " + connCheckFreq); } - private void addMessageFailedNodes(TcpDiscoveryAbstractMessage msg) { - if (msg.failedNodes() != null) { - for (UUID nodeId : msg.failedNodes()) { - TcpDiscoveryNode failedNode = ring.node(nodeId); - - if (failedNode != null) { - boolean add; - - synchronized (mux) { - add = failedNodes.add(failedNode); - } - - if (add) - debugLog(null, "New failed node [node=" + failedNode + ", msg=" + msg + ']'); - } - else - debugLog(null, "Unknown failed node [nodeId=" + nodeId + ", msg=" + msg + ']'); - } - } - } - /** * @param msg Message to process. */ @@ -2175,7 +2186,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageProcessingStarted(msg); - addMessageFailedNodes(msg); + processMessageFailedNodes(msg); if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); @@ -3589,7 +3600,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader()); } - addMessageFailedNodes(msg); + processMessageFailedNodes(msg); } if (sendMessageToRemotes(msg)) @@ -4916,6 +4927,12 @@ class ServerImpl extends TcpDiscoveryImpl { // Handshake. TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg; + if (req.failedNodes() != null && req.failedNodes().contains(getLocalNodeId())) { + debugLog(msg, "Ignore handshake request: " + msg); + + return; + } + UUID nodeId = req.creatorNodeId(); this.nodeId = nodeId; http://git-wip-us.apache.org/repos/asf/ignite/blob/7661e3ee/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 0280e9c..29cc169 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -94,7 +95,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private UUID nodeId; /** */ - private TcpDiscoverySpi nodeSpi; + private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>(); /** * @throws Exception If fails. @@ -108,11 +109,14 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi spi = nodeSpi; + TcpDiscoverySpi spi = nodeSpi.get(); - if (spi == null) + if (spi == null) { spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ? new TestTcpDiscoverySpi() : new TcpDiscoverySpi(); + } + else + nodeSpi.set(null); discoMap.put(gridName, spi); @@ -1219,11 +1223,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws Exception { TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi(); - nodeSpi = spi0; + nodeSpi.set(spi0); final Ignite ignite0 = startGrid(0); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); final Ignite ignite1 = startGrid(1); @@ -1238,7 +1242,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { @Override public Void call() throws Exception { log.info("Start 2"); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); Ignite ignite2 = startGrid(2); @@ -1288,7 +1292,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { assertEquals(1, cache.get(1)); - nodeSpi = new TestCustomEventRaceSpi(); + nodeSpi.set(new TestCustomEventRaceSpi()); Ignite ignite = startGrid(3); @@ -1331,15 +1335,15 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { private void customEventCoordinatorFailure(boolean twoNodes) throws Exception { TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi(); - nodeSpi = spi0; + nodeSpi.set(spi0); Ignite ignite0 = startGrid(0); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite1 = startGrid(1); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite2 = twoNodes ? null : startGrid(2); @@ -1383,7 +1387,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { log.info("Try start one more node."); - nodeSpi = new TestCustomEventCoordinatorFailureSpi(); + nodeSpi.set(new TestCustomEventCoordinatorFailureSpi()); Ignite ignite = startGrid(twoNodes ? 2 : 3); @@ -1398,6 +1402,122 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testFailedNodes1() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(1); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(2); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFailedNodes2() throws Exception { + try { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(1); + + final AtomicInteger nodeIdx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = nodeIdx.incrementAndGet(); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + startGrid(idx); + + return null; + } + }, 3, "start-node"); + + Ignite ignite2 = ignite(2); + + waitForRemoteNodes(ignite2, 3); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFailedNodes3() throws Exception { + try { + nodeSpi.set(new TestFailedNodesSpi(-1)); + + startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(2)); + + startGrid(1); + } + finally { + stopAllGrids(); + } + } + + /** + * Simulate scenario when node detects node failure trying to send message, but node still alive. + */ + private static class TestFailedNodesSpi extends TcpDiscoverySpi { + /** */ + private AtomicBoolean failMsg = new AtomicBoolean(); + + /** */ + private int failOrder; + + /** + * @param failOrder Spi fails connection if local node order equals to this order. + */ + TestFailedNodesSpi(int failOrder) { + this.failOrder = failOrder; + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, + TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout, + long timeout) throws IOException, IgniteCheckedException { + if (locNode.internalOrder() == failOrder && + (msg instanceof TcpDiscoveryNodeAddedMessage) && + failMsg.compareAndSet(false, true)) { + log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg + ']'); + + sock.close(); + + throw new SocketTimeoutException(); + } + + super.writeToSocket(sock, msg, bout, timeout); + } + } + + /** * */ private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi {
