Repository: ignite Updated Branches: refs/heads/ignite-zk a3a625699 -> 8735efda6
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8735efda Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8735efda Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8735efda Branch: refs/heads/ignite-zk Commit: 8735efda660e707cb0c50e83d15891650eeccc78 Parents: a3a6256 Author: sboikov <[email protected]> Authored: Wed Dec 20 12:57:40 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 20 13:21:47 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZookeeperDiscoveryImpl.java | 14 ++++ .../ZookeeperDiscoverySpiBasicTest.java | 80 +++++++++++++++++++- 2 files changed, 92 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8735efda/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index c54d7c6..91d8e3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -1209,6 +1209,17 @@ public class ZookeeperDiscoveryImpl { Integer internalId = e.getKey(); if (!rtState.top.nodesByInternalId.containsKey(internalId)) { + UUID rslvFutId = rtState.evtsData.communicationErrorResolveFutureId(); + + if (rslvFutId != null) { + if (log.isInfoEnabled()) { + log.info("Delay alive nodes change process while communication error resolve " + + "is in progress [reqId=" + rslvFutId + ']'); + } + + break; + } + if (processJoinOnCoordinator(curTop, internalId, e.getValue())) { newEvts++; @@ -2467,6 +2478,9 @@ public class ZookeeperDiscoveryImpl { evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); saveAndProcessNewEvents(); + + // Need re-check alive nodes in case join was delayed. + rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, rtState.watcher, rtState.watcher); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8735efda/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 6d61ac2..829d3a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -94,6 +94,7 @@ import org.apache.zookeeper.ZooKeeper; import org.jetbrains.annotations.Nullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; @@ -1841,10 +1842,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * Tests case when one node fails before sending communication status. + * * @throws Exception If failed. */ public void testNoOpCommunicationErrorResolve_3() throws Exception { - // One node fails before sending communication status. sesTimeout = 2000; commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; @@ -1886,10 +1888,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * Tests case when Coordinator fails while resolve process is in progress. + * * @throws Exception If failed. */ public void testNoOpCommunicationErrorResolve_4() throws Exception { - // Coordinator fails while resolve process is in progress. testCommSpi = true; sesTimeout = 2000; @@ -1927,6 +1930,69 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * Tests that nodes join is delayed while resolve is in progress. + * + * @throws Exception If failed. + */ + public void testNoOpCommunicationErrorResolve_5() throws Exception { + testCommSpi = true; + + sesTimeout = 2000; + commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; + + startGrid(0); + + startGridsMultiThreaded(1, 3); + + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3)); + + commSpi.pingStartLatch = new CountDownLatch(1); + commSpi.pingLatch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() { + ZookeeperDiscoverySpi spi = spi(ignite(1)); + + spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test")); + + return null; + } + }); + + assertTrue(commSpi.pingStartLatch.await(10, SECONDS)); + + try { + assertFalse(fut.isDone()); + + final AtomicInteger nodeIdx = new AtomicInteger(3); + + IgniteInternalFuture<?> startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(nodeIdx.incrementAndGet()); + + return null; + } + }, 3, "start-node"); + + U.sleep(1000); + + assertFalse(startFut.isDone()); + + assertEquals(4, ignite(0).cluster().nodes().size()); + + commSpi.pingLatch.countDown(); + + startFut.get(); + fut.get(); + + waitForTopology(7); + } + finally { + commSpi.pingLatch.countDown(); + } + } + + /** * @throws Exception If failed. */ public void testCommunicationErrorResolve_KillNode_1() throws Exception { @@ -2099,6 +2165,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { spi = spi(node); } + assert spi != null; + try { spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new Exception("test")); } @@ -3028,6 +3096,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { */ static class ZkTestCommunicationSpi extends TcpCommunicationSpi { /** */ + private volatile CountDownLatch pingStartLatch; + + /** */ private volatile CountDownLatch pingLatch; /** */ @@ -3054,6 +3125,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { + CountDownLatch pingStartLatch = this.pingStartLatch; + + if (pingStartLatch != null) + pingStartLatch.countDown(); + CountDownLatch pingLatch = this.pingLatch; try {
