Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33fe30da Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33fe30da Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33fe30da Branch: refs/heads/ignite-1093-2 Commit: 33fe30da620e4f08cee959104805f3527b597700 Parents: e51fb42 Author: sboikov <[email protected]> Authored: Tue Sep 22 12:55:18 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Sep 22 12:55:18 2015 +0300 ---------------------------------------------------------------------- ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++- .../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +- 2 files changed, 119 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java index 66275b3..14417c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -21,12 +21,25 @@ import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketTimeoutException; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; + /** * Client-based discovery SPI test with failure detection timeout enabled. */ @@ -60,7 +73,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov /** {@inheritDoc} */ @Override protected TcpDiscoverySpi getDiscoverySpi() { - return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi(); + return useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi(); } /** @@ -117,16 +130,16 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov private void checkFailureThresholdWorkability() throws Exception { useTestSpi = true; - TestTcpDiscoverySpi firstSpi = null; - TestTcpDiscoverySpi secondSpi = null; + TestTcpDiscoverySpi2 firstSpi = null; + TestTcpDiscoverySpi2 secondSpi = null; try { startServerNodes(2); checkNodes(2, 0); - firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi()); - secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi()); + firstSpi = (TestTcpDiscoverySpi2)(G.ignite("server-0").configuration().getDiscoverySpi()); + secondSpi = (TestTcpDiscoverySpi2)(G.ignite("server-1").configuration().getDiscoverySpi()); assert firstSpi.err == null; @@ -157,9 +170,102 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov } /** + * Test tries to provoke scenario when client sends reconnect message before router failure detected. + * + * @throws Exception If failed. + */ + public void _testClientReconnectOnCoordinatorRouterFail() throws Exception { + startServerNodes(1); + + Ignite srv = G.ignite("server-0"); + + final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode(); + + final UUID srvNodeId = srvNode.id(); + + clientIpFinder = new TcpDiscoveryVmIpFinder(); + + clientIpFinder.setAddresses( + Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1))); + + failureThreshold = 1000L; + netTimeout = 500L; + + startClientNodes(1); // Client should connect to coordinator. + + failureThreshold = 10_000L; + netTimeout = 5000L; + + for (int i = 0; i < 2; i++) { + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + } + + checkNodes(3, 1); + + final CountDownLatch latch = new CountDownLatch(3); + + String nodes[] = {"server-1", "server-2", "client-0"}; + + final AtomicBoolean err = new AtomicBoolean(); + + for (String node : nodes) { + G.ignite(node).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + DiscoveryEvent disoEvt = (DiscoveryEvent)evt; + + if (disoEvt.eventNode().id().equals(srvNodeId)) { + info("Expected node failed event: " + ((DiscoveryEvent) evt).eventNode()); + + latch.countDown(); + } + else { + log.info("Unexpected node failed event: " + evt); + + err.set(true); + } + + return true; + } + }, EVT_NODE_FAILED); + } + + Thread.sleep(5000); + + Ignite client = G.ignite("client-0"); + + UUID nodeId = client.cluster().localNode().id(); + + log.info("Fail coordinator: " + srvNodeId); + + TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi(); + + srvSpi.pauseAll(false); + + try { + Thread.sleep(2000); + } + finally { + srvSpi.simulateNodeFailure(); + srvSpi.resumeAll(); + } + + try { + assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS)); + + assertFalse("Unexpected event, see log for details.", err.get()); + assertEquals(nodeId, client.cluster().localNode().id()); + } + finally { + srvSpi.resumeAll(); + } + } + + /** * */ - private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi { /** */ private long readDelay; http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index c86f06a..9fbf5b1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -89,13 +89,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ - private static final AtomicInteger srvIdx = new AtomicInteger(); + protected static final AtomicInteger srvIdx = new AtomicInteger(); /** */ private static final AtomicInteger clientIdx = new AtomicInteger(); /** */ - private static Collection<UUID> srvNodeIds; + protected static Collection<UUID> srvNodeIds; /** */ private static Collection<UUID> clientNodeIds; @@ -128,13 +128,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { private UUID nodeId; /** */ - private TcpDiscoveryVmIpFinder clientIpFinder; + protected TcpDiscoveryVmIpFinder clientIpFinder; /** */ private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT; /** */ - private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; + protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; /** */ private boolean longSockTimeouts; @@ -466,7 +466,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override public void apply(Socket sock) { try { latch.await(); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { throw new RuntimeException(e); } } @@ -2056,7 +2057,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * */ - private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** */ private final Object mux = new Object();
