Repository: ignite Updated Branches: refs/heads/ignite-1758 25609b33a -> 8b5529818
ignite-1758 debug Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8b552981 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8b552981 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8b552981 Branch: refs/heads/ignite-1758 Commit: 8b5529818d4f69d79fbcc6b0aecd3db324e403a2 Parents: 25609b3 Author: sboikov <[email protected]> Authored: Tue Nov 10 11:43:46 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Nov 10 11:43:46 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 24 ++- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 147 ++++++++++++++++++- 2 files changed, 165 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8b552981/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0a36e1c..9ab80f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2272,6 +2272,8 @@ class ServerImpl extends TcpDiscoveryImpl { checkHeartbeatsReceiving(); checkPendingCustomMessages(); + + checkFailedNodesList(); } /** @@ -4184,7 +4186,7 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to respond to status check message (connection refused) " + "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); } - else { + else if (!spi.isNodeStopping0()){ if (pingNode(msg.creatorNode())) // Node exists and accepts incoming connections. U.error(log, "Failed to respond to status check message [recipient=" + @@ -4573,6 +4575,26 @@ class ServerImpl extends TcpDiscoveryImpl { } } + private void checkFailedNodesList() { + List<TcpDiscoveryNodeFailedMessage> msgs = null; + + synchronized (mux) { + for (TcpDiscoveryNode node : failedNodes) { + if (ring.node(node.id()) != null) { + if (msgs == null) + msgs = new ArrayList<>(failedNodes.size()); + + msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder())); + } + } + } + + if (msgs != null) { + for (TcpDiscoveryNodeFailedMessage msg : msgs) + addMessage(msg); + } + } + /** * Checks and flushes custom event messages if no nodes are attempting to join the grid. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8b552981/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 13f40d4..f401448 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -39,6 +40,8 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -49,6 +52,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.io.GridByteArrayOutputStream; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; @@ -65,6 +69,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.testframework.GridTestUtils; @@ -1393,7 +1398,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); - startGrid(0); + final Ignite ignite0 = startGrid(0); nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); @@ -1401,7 +1406,13 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); - startGrid(2); + Ignite ignite2 = startGrid(2); + + assertEquals(2, ignite2.cluster().nodes().size()); + + waitNodeStop(ignite0.name()); + + tryCreateCache(2); } finally { stopAllGrids(); @@ -1417,7 +1428,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); - startGrid(0); + Ignite ignite0 = startGrid(0); nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); @@ -1440,6 +1451,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { Ignite ignite2 = ignite(2); waitForRemoteNodes(ignite2, 3); + + waitNodeStop(ignite0.name()); + + tryCreateCache(4); } finally { stopAllGrids(); @@ -1453,11 +1468,23 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { try { nodeSpi.set(new TestFailedNodesSpi(-1)); - startGrid(0); + Ignite ignite0 = startGrid(0); nodeSpi.set(new TestFailedNodesSpi(2)); - startGrid(1); + Ignite ignite1 = startGrid(1); + + assertEquals(1, ignite1.cluster().nodes().size()); + + waitNodeStop(ignite0.name()); + + ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1); + + startGrid(2); + + assertEquals(2, ignite1.cluster().nodes().size()); + + tryCreateCache(2); } finally { stopAllGrids(); @@ -1465,6 +1492,87 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testFailedNodes4() throws Exception { + final int FAIL_ORDER = 3; + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + final Ignite ignite0 = startGrid(0); + + nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER)); + + Ignite ignite1 = startGrid(1); + + TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER); + + spi.stopBeforeSndFail = true; + + nodeSpi.set(spi); + + Ignite ignite2 = startGrid(2); + + waitNodeStop(ignite2.name()); + + log.info("Try start new node."); + + Ignite ignite3 = startGrid(3); + + waitNodeStop(ignite0.name()); + + assertEquals(2, ignite1.cluster().nodes().size()); + assertEquals(2, ignite3.cluster().nodes().size()); + + tryCreateCache(2); + } + + /** + * @param nodeName Node name. + * @throws Exception If failed. + */ + private void waitNodeStop(final String nodeName) throws Exception { + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + Ignition.ignite(nodeName); + + return false; + } + catch (IgniteIllegalStateException e) { + return true; + } + } + }, 10_000); + + if (!wait) + U.dumpThreads(log); + + assertTrue("Failed to wait for node stop.", wait); + } + + /** + * @param expNodes Expected nodes number. + */ + private void tryCreateCache(int expNodes) { + List<Ignite> allNodes = G.allGrids(); + + assertEquals(expNodes, allNodes.size()); + + int cntr = 0; + + for (Ignite ignite : allNodes) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("cache-" + cntr++); + + log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName() + ']'); + + ignite.getOrCreateCache(ccfg).put(1, 1); + } + } + + /** * Simulate scenario when node detects node failure trying to send message, but node still alive. */ private static class TestFailedNodesSpi extends TcpDiscoverySpi { @@ -1474,6 +1582,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** */ private int failOrder; + /** */ + private boolean stopBeforeSndFail; + + /** */ + private boolean stop; + /** * @param failOrder Spi fails connection if local node order equals to this order. */ @@ -1486,6 +1600,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException { + if (stop) + return; + if (locNode.internalOrder() == failOrder && (msg instanceof TcpDiscoveryNodeAddedMessage) && failMsg.compareAndSet(false, true)) { @@ -1496,6 +1613,26 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { throw new SocketTimeoutException(); } + if (stopBeforeSndFail && + locNode.internalOrder() == failOrder && + (msg instanceof TcpDiscoveryNodeFailedMessage)) { + stop = true; + + log.info("Skip messages send and stop node [locNode=" + locNode + ", msg=" + msg + ']'); + + sock.close(); + + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + ignite.close(); + + return null; + } + }, "stop-node"); + + return; + } + super.writeToSocket(sock, msg, bout, timeout); } }
