Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 7611371b9 -> adebbf075


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa78f5c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa78f5c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa78f5c4

Branch: refs/heads/ignite-zk
Commit: aa78f5c43639526c1f914ca7e3b2d455e012a358
Parents: 9ffd603
Author: sboikov <[email protected]>
Authored: Thu Nov 23 13:52:06 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Nov 23 14:36:04 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |   2 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    |   5 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  15 +-
 .../ZookeeperDiscoverySpiBasicTest.java         | 204 +++++++++++++++----
 4 files changed, 174 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index cc7e266..59012bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -155,7 +155,7 @@ import static 
org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_J
  */
 public class IgnitionEx {
     /** */
-    public static final boolean TEST_ZK = true;
+    public static volatile boolean TEST_ZK = true;
 
     /** */
     public static TestingCluster zkCluster;

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
index 9f1b859..2936876 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -232,7 +232,10 @@ class ZkIgnitePaths {
      * @return Event node ID.
      */
     static UUID customEventSendNodeId(String path) {
-        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
+        // <uuid prefix>:<node id>|<seq>
+        int startIdx = ZkIgnitePaths.UUID_LEN + 1;
+
+        String idStr = path.substring(startIdx, startIdx + 
ZkIgnitePaths.UUID_LEN);
 
         return UUID.fromString(idStr);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index a04314d..5cbb474 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -249,8 +249,13 @@ public class ZookeeperDiscoveryImpl {
         }
 
         try {
-            // TODO ZK: handle retries.
-            zkClient.createIfNeeded(zkPaths.customEvtsDir + "/" + locNode.id() 
+ '|', msgBytes, CreateMode.PERSISTENT_SEQUENTIAL);
+            String prefix = UUID.randomUUID().toString();
+
+            zkClient.createSequential(prefix,
+                zkPaths.customEvtsDir,
+                prefix + ":" + locNode.id() + '|',
+                msgBytes,
+                CreateMode.PERSISTENT_SEQUENTIAL);
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteException(e);
@@ -485,7 +490,8 @@ public class ZookeeperDiscoveryImpl {
                 if (log.isInfoEnabled())
                     log.info("Previous node watch event: " + evt);
 
-                zkClient.existsAsync(evt.getPath(), this, this);
+                if (evt.getType() != Event.EventType.None)
+                    zkClient.existsAsync(evt.getPath(), this, this);
             }
         }
 
@@ -585,9 +591,8 @@ public class ZookeeperDiscoveryImpl {
      */
     private class AliveNodeDataWatcher implements Watcher {
         @Override public void process(WatchedEvent evt) {
-            if (evt.getType() == Event.EventType.NodeDataChanged) {
+            if (evt.getType() == Event.EventType.NodeDataChanged)
                 zkClient.getDataAsync(evt.getPath(), this, 
aliveNodeDataUpdateCallback);
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/aa78f5c4/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 39f9fbf..2c998d0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
@@ -50,11 +51,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
-import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClient;
-import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
@@ -71,6 +69,9 @@ import static 
org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
  */
 public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest {
     /** */
+    private static final int ZK_SRVS = 3;
+
+    /** */
     private static TestingCluster zkCluster;
 
     /** */
@@ -181,8 +182,11 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        IgnitionEx.TEST_ZK = false;
+
         if (USE_TEST_CLUSTER) {
-            zkCluster = new TestingCluster(3);
+            zkCluster = new TestingCluster(ZK_SRVS);
+
             zkCluster.start();
         }
     }
@@ -860,6 +864,32 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testRandomTopologyChanges() throws Exception {
+        randomTopologyChanges(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomTopologyChangesRestartZk() throws Exception {
+        randomTopologyChanges(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomTopologyChangesCloseClients() throws Exception {
+        randomTopologyChanges(false, true);
+    }
+
+    /**
+     * @param restartZk If {@code true} in background restarts on of ZK 
servers.
+     * @param closeClientSock If {@code true} in background closes zk clients' 
sockets.
+     * @throws Exception If failed.
+     */
+    private void randomTopologyChanges(boolean restartZk, boolean 
closeClientSock) throws Exception {
+        if (closeClientSock)
+            testSockNio = true;
+
         List<Integer> startedNodes = new ArrayList<>();
         List<String> startedCaches = new ArrayList<>();
 
@@ -871,72 +901,90 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         int MAX_NODES = 20;
         int MAX_CACHES = 10;
 
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-        while (System.currentTimeMillis() < stopTime) {
-            if (startedNodes.size() > 0 && rnd.nextInt(10) == 0) {
-                boolean startCache = startedCaches.size() < 2 ||
-                    (startedCaches.size() < MAX_CACHES && rnd.nextInt(5) != 0);
+        AtomicBoolean stop = new AtomicBoolean();
 
-                int nodeIdx = 
startedNodes.get(rnd.nextInt(startedNodes.size()));
+        IgniteInternalFuture<?> fut1 = restartZk ? 
startRestartZkServers(stopTime, stop) : null;
+        IgniteInternalFuture<?> fut2 = closeClientSock ? 
startCloseZkClientSocket(stopTime, stop) : null;
 
-                if (startCache) {
-                    String cacheName = "cache-" + nextCacheIdx++;
+        try {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                    log.info("Next, start new cache [cacheName=" + cacheName +
-                        ", node=" + nodeIdx +
-                        ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
-                        ", curCaches=" + startedCaches.size() + ']');
+            while (System.currentTimeMillis() < stopTime) {
+                if (startedNodes.size() > 0 && rnd.nextInt(10) == 0) {
+                    boolean startCache = startedCaches.size() < 2 ||
+                        (startedCaches.size() < MAX_CACHES && rnd.nextInt(5) 
!= 0);
 
-                    ignite(nodeIdx).createCache(new 
CacheConfiguration<>(cacheName));
+                    int nodeIdx = 
startedNodes.get(rnd.nextInt(startedNodes.size()));
 
-                    startedCaches.add(cacheName);
-                }
-                else {
-                    if (startedCaches.size() > 1) {
-                        String cacheName = 
startedCaches.get(rnd.nextInt(startedCaches.size()));
+                    if (startCache) {
+                        String cacheName = "cache-" + nextCacheIdx++;
 
-                        log.info("Next, stop cache [nodeIdx=" + nodeIdx +
+                        log.info("Next, start new cache [cacheName=" + 
cacheName +
                             ", node=" + nodeIdx +
                             ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
-                            ", cacheName=" + startedCaches.size() + ']');
+                            ", curCaches=" + startedCaches.size() + ']');
 
-                        ignite(nodeIdx).destroyCache(cacheName);
+                        ignite(nodeIdx).createCache(new 
CacheConfiguration<>(cacheName));
 
-                        assertTrue(startedCaches.remove(cacheName));
+                        startedCaches.add(cacheName);
                     }
-                }
-            }
-            else {
-                boolean startNode = startedNodes.size() < 2 ||
-                    (startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 0);
-
-                if (startNode) {
-                    int nodeIdx = nextNodeIdx++;
+                    else {
+                        if (startedCaches.size() > 1) {
+                            String cacheName = 
startedCaches.get(rnd.nextInt(startedCaches.size()));
 
-                    log.info("Next, start new node [nodeIdx=" + nodeIdx +
-                        ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
-                        ", curNodes=" + startedNodes.size() + ']');
+                            log.info("Next, stop cache [nodeIdx=" + nodeIdx +
+                                ", node=" + nodeIdx +
+                                ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
+                                ", cacheName=" + startedCaches.size() + ']');
 
-                    startGrid(nodeIdx);
+                            ignite(nodeIdx).destroyCache(cacheName);
 
-                    assertTrue(startedNodes.add(nodeIdx));
+                            assertTrue(startedCaches.remove(cacheName));
+                        }
+                    }
                 }
                 else {
-                    if (startedNodes.size() > 1) {
-                        int nodeIdx = 
startedNodes.get(rnd.nextInt(startedNodes.size()));
+                    boolean startNode = startedNodes.size() < 2 ||
+                        (startedNodes.size() < MAX_NODES && rnd.nextInt(5) != 
0);
+
+                    if (startNode) {
+                        int nodeIdx = nextNodeIdx++;
 
-                        log.info("Next, stop [nodeIdx=" + nodeIdx +
+                        log.info("Next, start new node [nodeIdx=" + nodeIdx +
                             ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
                             ", curNodes=" + startedNodes.size() + ']');
 
-                        stopGrid(nodeIdx);
+                        startGrid(nodeIdx);
 
-                        assertTrue(startedNodes.remove((Integer)nodeIdx));
+                        assertTrue(startedNodes.add(nodeIdx));
+                    }
+                    else {
+                        if (startedNodes.size() > 1) {
+                            int nodeIdx = 
startedNodes.get(rnd.nextInt(startedNodes.size()));
+
+                            log.info("Next, stop [nodeIdx=" + nodeIdx +
+                                ", crd=" + (startedNodes.isEmpty() ? null : 
Collections.min(startedNodes)) +
+                                ", curNodes=" + startedNodes.size() + ']');
+
+                            stopGrid(nodeIdx);
+
+                            assertTrue(startedNodes.remove((Integer)nodeIdx));
+                        }
                     }
                 }
+
+                U.sleep(rnd.nextInt(100) + 1);
             }
         }
+        finally {
+            stop.set(true);
+        }
+
+        if (fut1 != null)
+            fut1.get();
+
+        if (fut2 != null)
+            fut2.get();
     }
 
     /**
@@ -955,6 +1003,72 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @param stopTime Stop time.
+     * @param stop Stop flag.
+     * @return Future.
+     */
+    private IgniteInternalFuture<?> startRestartZkServers(final long stopTime, 
final AtomicBoolean stop) {
+        return GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (!stop.get() && System.currentTimeMillis() < stopTime) {
+                    U.sleep(rnd.nextLong(500) + 500);
+
+                    int idx = rnd.nextInt(ZK_SRVS);
+
+                    log.info("Restart ZK server: " + idx);
+
+                    zkCluster.getServers().get(idx).restart();
+
+                }
+
+                return null;
+            }
+        }, "zk-restart-thread");
+    }
+
+    /**
+     * @param stopTime Stop time.
+     * @param stop Stop flag.
+     * @return Future.
+     */
+    private IgniteInternalFuture<?> startCloseZkClientSocket(final long 
stopTime, final AtomicBoolean stop) {
+        assert testSockNio;
+
+        return GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (!stop.get() && System.currentTimeMillis() < stopTime) {
+                    U.sleep(rnd.nextLong(100) + 50);
+
+                    List<Ignite> nodes = G.allGrids();
+
+                    if (nodes.size() > 0) {
+                        Ignite node = nodes.get(rnd.nextInt(nodes.size()));
+
+                        ZkTestClientCnxnSocketNIO nio = 
ZkTestClientCnxnSocketNIO.forNode(node);
+
+                        if (nio != null) {
+                            info("Close zk client socket for node: " + 
node.name());
+
+                            try {
+                                nio.closeSocket(false);
+                            }
+                            catch (Exception e) {
+                                info("Failed to close zk client socket for 
node: " + node.name());
+                            }
+                        }
+                    }
+                }
+
+                return null;
+            }
+        }, "zk-restart-thread");
+    }
+
+    /**
      * @param node Node.
      * @throws Exception If failed.
      */

Reply via email to