Repository: ignite Updated Branches: refs/heads/ignite-zk 69aedc5b5 -> fdd2c530a
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdd2c530 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdd2c530 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdd2c530 Branch: refs/heads/ignite-zk Commit: fdd2c530a370c815226d5eb60fc28e0c28325f65 Parents: 69aedc5 Author: sboikov <[email protected]> Authored: Mon Dec 25 16:36:42 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 25 16:59:10 2017 +0300 ---------------------------------------------------------------------- .../zk/internal/ZookeeperDiscoveryImpl.java | 34 ++-------- .../ZookeeperDiscoverySpiBasicTest.java | 71 ++++++++++++++++++-- 2 files changed, 71 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd2c530/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index d7e0a76..04eb607 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -32,7 +32,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -480,7 +479,7 @@ public class ZookeeperDiscoveryImpl { private void onSegmented(Exception e) { rtState.errForClose = e; - if (rtState.joined) { + if (rtState.joined || joinFut.isDone()) { synchronized (stateMux) { connState = ConnectionState.STOPPED; } @@ -495,12 +494,15 @@ public class ZookeeperDiscoveryImpl { * */ private void notifySegmented() { - assert rtState.evtsData != null; + List<ClusterNode> nodes = rtState.top.topologySnapshot(); + + if (nodes.isEmpty()) + nodes = Collections.singletonList((ClusterNode)locNode); lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED, - rtState.evtsData.topVer, + rtState.evtsData != null ? rtState.evtsData.topVer : 1L, locNode, - rtState.top.topologySnapshot(), + nodes, Collections.<Long, Collection<ClusterNode>>emptyMap(), null); } @@ -992,9 +994,6 @@ public class ZookeeperDiscoveryImpl { catch (IgniteCheckedException | ZookeeperClientFailedException e) { throw new IgniteSpiException("Failed to initialize Zookeeper nodes", e); } - finally { - connStartLatch.countDown(); - } } /** @@ -1196,18 +1195,6 @@ public class ZookeeperDiscoveryImpl { } } - /** TODO ZK */ - private final CountDownLatch connStartLatch = new CountDownLatch(1); - - /** - * For testing only. - * - * @throws Exception If failed. - */ - void waitConnectStart() throws Exception { - connStartLatch.await(); - } - /** * @param aliveNodes Alive nodes. * @throws Exception If failed. @@ -3846,13 +3833,6 @@ public class ZookeeperDiscoveryImpl { /** {@inheritDoc} */ @Override void onPreviousNodeFail() { - // TODO ZK: -// if (locInternalId == crdInternalId + 1) { -// if (log.isInfoEnabled()) -// log.info("Previous discovery coordinator failed [locId=" + locNode.id() + ']'); -// -// onBecomeCoordinator(aliveNodes, locInternalId); -// } if (log.isInfoEnabled()) log.info("Previous server node failed, check is node new coordinator [locId=" + locNode.id() + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd2c530/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index b31555e..a542a7a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -157,6 +157,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { private long joinTimeout; /** */ + private boolean clientReconnectDisabled; + + /** */ private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); /** */ @@ -200,6 +203,8 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); + zkSpi.setClientReconnectDisabled(clientReconnectDisabled); + // Set authenticator for basic sanity tests. if (auth != null) { zkSpi.setAuthenticator(auth.apply()); @@ -821,7 +826,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { final CountDownLatch l = new CountDownLatch(1); node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event event) { + @Override public boolean apply(Event evt) { l.countDown(); return false; @@ -857,7 +862,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { final CountDownLatch l = new CountDownLatch(1); node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event event) { + @Override public boolean apply(Event evt) { l.countDown(); return false; @@ -887,7 +892,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { final CountDownLatch l = new CountDownLatch(1); node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event event) { + @Override public boolean apply(Event evt) { l.countDown(); return false; @@ -1085,7 +1090,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @param failCnt Number of nodes to stop after coordinator loose connection. * @throws Exception If failed. */ - private void connectionRestore_Coordinator(int initNodes, int startNodes, int failCnt) throws Exception { + private void connectionRestore_Coordinator(final int initNodes, int startNodes, int failCnt) throws Exception { sesTimeout = 30_000; testSockNio = true; @@ -1123,11 +1128,15 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { final List<String> failedZkNodes = new ArrayList<>(failCnt); for (int i = initNodes; i < initNodes + startNodes; i++) { - ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); + final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); - ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, "impl"); + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + long internalOrder = GridTestUtils.getFieldValue(spi, "impl", "rtState", "internalOrder"); - impl.waitConnectStart(); + return internalOrder > 0; + } + }, 10_000)); if (cnt++ < failCnt) { ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i)); @@ -2581,6 +2590,45 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testServersLeft_FailOnTimeout() throws Exception { + startGrid(0); + + final int CLIENTS = 5; + + joinTimeout = 3000; + + clientMode(true); + + startGridsMultiThreaded(1, CLIENTS); + + waitForTopology(CLIENTS + 1); + + final CountDownLatch l = new CountDownLatch(CLIENTS); + + for (int i = 0; i < CLIENTS; i++) { + Ignite node = ignite(i + 1); + + node.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + info("Segmented!"); + + l.countDown(); + + return false; + } + }, EventType.EVT_NODE_SEGMENTED); + } + + stopGrid(getTestIgniteInstanceName(0), true, false); + + assertTrue(l.await(10, SECONDS)); + + evts.clear(); + } + + /** * */ public void testStartNoServers_FailOnTimeout() { @@ -2679,6 +2727,15 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testDisconnectOnServersLeft_5() throws Exception { + joinTimeout = 10_000; + + disconnectOnServersLeft(5, 10); + } + + /** * @param srvs Number of servers. * @param clients Number of clients. * @throws Exception If failed.
