Repository: ignite Updated Branches: refs/heads/ignite-zk 7611371b9 -> adebbf075
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa78f5c4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa78f5c4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa78f5c4 Branch: refs/heads/ignite-zk Commit: aa78f5c43639526c1f914ca7e3b2d455e012a358 Parents: 9ffd603 Author: sboikov <[email protected]> Authored: Thu Nov 23 13:52:06 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Nov 23 14:36:04 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 2 +- .../discovery/zk/internal/ZkIgnitePaths.java | 5 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 15 +- .../ZookeeperDiscoverySpiBasicTest.java | 204 +++++++++++++++---- 4 files changed, 174 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index cc7e266..59012bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -155,7 +155,7 @@ import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_J */ public class IgnitionEx { /** */ - public static final boolean TEST_ZK = true; + public static volatile boolean TEST_ZK = true; /** */ public static TestingCluster zkCluster; http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java index 9f1b859..2936876 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java @@ -232,7 +232,10 @@ class ZkIgnitePaths { * @return Event node ID. */ static UUID customEventSendNodeId(String path) { - String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN); + // <uuid prefix>:<node id>|<seq> + int startIdx = ZkIgnitePaths.UUID_LEN + 1; + + String idStr = path.substring(startIdx, startIdx + ZkIgnitePaths.UUID_LEN); return UUID.fromString(idStr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index a04314d..5cbb474 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -249,8 +249,13 @@ public class ZookeeperDiscoveryImpl { } try { - // TODO ZK: handle retries. - zkClient.createIfNeeded(zkPaths.customEvtsDir + "/" + locNode.id() + '|', msgBytes, CreateMode.PERSISTENT_SEQUENTIAL); + String prefix = UUID.randomUUID().toString(); + + zkClient.createSequential(prefix, + zkPaths.customEvtsDir, + prefix + ":" + locNode.id() + '|', + msgBytes, + CreateMode.PERSISTENT_SEQUENTIAL); } catch (ZookeeperClientFailedException e) { throw new IgniteException(e); @@ -485,7 +490,8 @@ public class ZookeeperDiscoveryImpl { if (log.isInfoEnabled()) log.info("Previous node watch event: " + evt); - zkClient.existsAsync(evt.getPath(), this, this); + if (evt.getType() != Event.EventType.None) + zkClient.existsAsync(evt.getPath(), this, this); } } @@ -585,9 +591,8 @@ public class ZookeeperDiscoveryImpl { */ private class AliveNodeDataWatcher implements Watcher { @Override public void process(WatchedEvent evt) { - if (evt.getType() == Event.EventType.NodeDataChanged) { + if (evt.getType() == Event.EventType.NodeDataChanged) zkClient.getDataAsync(evt.getPath(), this, aliveNodeDataUpdateCallback); - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 39f9fbf..2c998d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.curator.test.TestingCluster; import org.apache.ignite.Ignite; @@ -50,11 +51,8 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; -import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient; -import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; @@ -71,6 +69,9 @@ import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; */ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ + private static final int ZK_SRVS = 3; + + /** */ private static TestingCluster zkCluster; /** */ @@ -181,8 +182,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); + IgnitionEx.TEST_ZK = false; + if (USE_TEST_CLUSTER) { - zkCluster = new TestingCluster(3); + zkCluster = new TestingCluster(ZK_SRVS); + zkCluster.start(); } } @@ -860,6 +864,32 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testRandomTopologyChanges() throws Exception { + randomTopologyChanges(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testRandomTopologyChangesRestartZk() throws Exception { + randomTopologyChanges(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testRandomTopologyChangesCloseClients() throws Exception { + randomTopologyChanges(false, true); + } + + /** + * @param restartZk If {@code true} in background restarts on of ZK servers. + * @param closeClientSock If {@code true} in background closes zk clients' sockets. + * @throws Exception If failed. + */ + private void randomTopologyChanges(boolean restartZk, boolean closeClientSock) throws Exception { + if (closeClientSock) + testSockNio = true; + List<Integer> startedNodes = new ArrayList<>(); List<String> startedCaches = new ArrayList<>(); @@ -871,72 +901,90 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { int MAX_NODES = 20; int MAX_CACHES = 10; - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - while (System.currentTimeMillis() < stopTime) { - if (startedNodes.size() > 0 && rnd.nextInt(10) == 0) { - boolean startCache = startedCaches.size() < 2 || - (startedCaches.size() < MAX_CACHES && rnd.nextInt(5) != 0); + AtomicBoolean stop = new AtomicBoolean(); - int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size())); + IgniteInternalFuture<?> fut1 = restartZk ? startRestartZkServers(stopTime, stop) : null; + IgniteInternalFuture<?> fut2 = closeClientSock ? startCloseZkClientSocket(stopTime, stop) : null; - if (startCache) { - String cacheName = "cache-" + nextCacheIdx++; + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - log.info("Next, start new cache [cacheName=" + cacheName + - ", node=" + nodeIdx + - ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + - ", curCaches=" + startedCaches.size() + ']'); + while (System.currentTimeMillis() < stopTime) { + if (startedNodes.size() > 0 && rnd.nextInt(10) == 0) { + boolean startCache = startedCaches.size() < 2 || + (startedCaches.size() < MAX_CACHES && rnd.nextInt(5) != 0); - ignite(nodeIdx).createCache(new CacheConfiguration<>(cacheName)); + int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size())); - startedCaches.add(cacheName); - } - else { - if (startedCaches.size() > 1) { - String cacheName = startedCaches.get(rnd.nextInt(startedCaches.size())); + if (startCache) { + String cacheName = "cache-" + nextCacheIdx++; - log.info("Next, stop cache [nodeIdx=" + nodeIdx + + log.info("Next, start new cache [cacheName=" + cacheName + ", node=" + nodeIdx + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + - ", cacheName=" + startedCaches.size() + ']'); + ", curCaches=" + startedCaches.size() + ']'); - ignite(nodeIdx).destroyCache(cacheName); + ignite(nodeIdx).createCache(new CacheConfiguration<>(cacheName)); - assertTrue(startedCaches.remove(cacheName)); + startedCaches.add(cacheName); } - } - } - else { - boolean startNode = startedNodes.size() < 2 || - (startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 0); - - if (startNode) { - int nodeIdx = nextNodeIdx++; + else { + if (startedCaches.size() > 1) { + String cacheName = startedCaches.get(rnd.nextInt(startedCaches.size())); - log.info("Next, start new node [nodeIdx=" + nodeIdx + - ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + - ", curNodes=" + startedNodes.size() + ']'); + log.info("Next, stop cache [nodeIdx=" + nodeIdx + + ", node=" + nodeIdx + + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + + ", cacheName=" + startedCaches.size() + ']'); - startGrid(nodeIdx); + ignite(nodeIdx).destroyCache(cacheName); - assertTrue(startedNodes.add(nodeIdx)); + assertTrue(startedCaches.remove(cacheName)); + } + } } else { - if (startedNodes.size() > 1) { - int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size())); + boolean startNode = startedNodes.size() < 2 || + (startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 0); + + if (startNode) { + int nodeIdx = nextNodeIdx++; - log.info("Next, stop [nodeIdx=" + nodeIdx + + log.info("Next, start new node [nodeIdx=" + nodeIdx + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + ", curNodes=" + startedNodes.size() + ']'); - stopGrid(nodeIdx); + startGrid(nodeIdx); - assertTrue(startedNodes.remove((Integer)nodeIdx)); + assertTrue(startedNodes.add(nodeIdx)); + } + else { + if (startedNodes.size() > 1) { + int nodeIdx = startedNodes.get(rnd.nextInt(startedNodes.size())); + + log.info("Next, stop [nodeIdx=" + nodeIdx + + ", crd=" + (startedNodes.isEmpty() ? null : Collections.min(startedNodes)) + + ", curNodes=" + startedNodes.size() + ']'); + + stopGrid(nodeIdx); + + assertTrue(startedNodes.remove((Integer)nodeIdx)); + } } } + + U.sleep(rnd.nextInt(100) + 1); } } + finally { + stop.set(true); + } + + if (fut1 != null) + fut1.get(); + + if (fut2 != null) + fut2.get(); } /** @@ -955,6 +1003,72 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** + * @param stopTime Stop time. + * @param stop Stop flag. + * @return Future. + */ + private IgniteInternalFuture<?> startRestartZkServers(final long stopTime, final AtomicBoolean stop) { + return GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get() && System.currentTimeMillis() < stopTime) { + U.sleep(rnd.nextLong(500) + 500); + + int idx = rnd.nextInt(ZK_SRVS); + + log.info("Restart ZK server: " + idx); + + zkCluster.getServers().get(idx).restart(); + + } + + return null; + } + }, "zk-restart-thread"); + } + + /** + * @param stopTime Stop time. + * @param stop Stop flag. + * @return Future. + */ + private IgniteInternalFuture<?> startCloseZkClientSocket(final long stopTime, final AtomicBoolean stop) { + assert testSockNio; + + return GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get() && System.currentTimeMillis() < stopTime) { + U.sleep(rnd.nextLong(100) + 50); + + List<Ignite> nodes = G.allGrids(); + + if (nodes.size() > 0) { + Ignite node = nodes.get(rnd.nextInt(nodes.size())); + + ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(node); + + if (nio != null) { + info("Close zk client socket for node: " + node.name()); + + try { + nio.closeSocket(false); + } + catch (Exception e) { + info("Failed to close zk client socket for node: " + node.name()); + } + } + } + } + + return null; + } + }, "zk-restart-thread"); + } + + /** * @param node Node. * @throws Exception If failed. */
