Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 6c1fe28c7 -> 246478186


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/24647818
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/24647818
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/24647818

Branch: refs/heads/ignite-zk
Commit: 246478186aaef2f1e06deacb19d5198aeb1157fa
Parents: 6c1fe28
Author: sboikov <[email protected]>
Authored: Mon Nov 13 12:01:06 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 13 12:01:06 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 83 +++++++++++++++-----
 .../zk/ZookeeperDiscoverySpiBasicTest.java      | 19 +++--
 2 files changed, 79 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/24647818/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index c267cf2..41debd7 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -28,6 +28,10 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryForever;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -56,6 +60,7 @@ import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
@@ -72,6 +77,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     private static final String IGNITE_PATH = "/ignite";
 
     /** */
+    private static final String IGNITE_INIT_LOCK_PATH = "/igniteLock";
+
+    /** */
     private static final String CLUSTER_PATH = IGNITE_PATH + "/cluster";
 
     /** */
@@ -263,7 +271,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) 
throws IgniteException {
         // TODO ZK
-        throw new UnsupportedOperationException();
+        //throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
@@ -315,37 +323,76 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
             // ZK generates internal threads' names using current thread name.
             Thread.currentThread().setName("zk-" + igniteInstanceName);
 
+            CuratorFramework c;
+
             try {
-                zk = new ZooKeeper(connectString, sesTimeout, zkWatcher);
+                c = CuratorFrameworkFactory.newClient(connectString, 
sesTimeout, sesTimeout, new RetryForever(500));
+
+                c.start();
+
+                zk = c.getZookeeperClient().getZooKeeper();
+                // zk = new ZooKeeper(connectString, sesTimeout, zkWatcher);
             }
             finally {
                 Thread.currentThread().setName(threadName);
             }
 
-            // TODO ZK: properly handle first node start and init after full 
cluster restart.
-            if (zk.exists(IGNITE_PATH, false) == null) {
-                log.info("Initialize Zookeeper nodes.");
+            for (;;) {
+                boolean started = zk.exists(IGNITE_PATH, false) != null &&
+                    zk.exists(ALIVE_NODES_PATH, false) != null &&
+                    !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
+
+                if (!started) {
+                    InterProcessMutex mux = new InterProcessMutex(c, 
IGNITE_INIT_LOCK_PATH);
+
+                    mux.acquire();
+
+                    try {
+                        started = zk.exists(IGNITE_PATH, false) != null &&
+                            zk.exists(ALIVE_NODES_PATH, false) != null &&
+                            !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty();
+
+                        if (!started) {
+                            log.info("First node starts, reset ZK state");
 
-                List<Op> initOps = new ArrayList<>();
+                            if (zk.exists(IGNITE_PATH, false) != null)
+                                ZKUtil.deleteRecursive(zk, IGNITE_PATH);
 
-                ZKClusterData clusterData = new 
ZKClusterData(U.currentTimeMillis());
+                            // TODO ZK: properly handle first node start and 
init after full cluster restart.
+                            if (zk.exists(IGNITE_PATH, false) == null) {
+                                log.info("Initialize Zookeeper nodes.");
 
-                initOps.add(Op.create(IGNITE_PATH, EMPTY_BYTES, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                initOps.add(Op.create(CLUSTER_PATH, marshal(clusterData), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                initOps.add(Op.create(JOIN_HIST_PATH, EMPTY_BYTES, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                initOps.add(Op.create(ALIVE_NODES_PATH, EMPTY_BYTES, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-                initOps.add(Op.create(EVENTS_PATH, EMPTY_BYTES, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                                List<Op> initOps = new ArrayList<>();
 
-                zk.multi(initOps);
+                                ZKClusterData clusterData = new 
ZKClusterData(U.currentTimeMillis());
+
+                                initOps.add(Op.create(IGNITE_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                                initOps.add(Op.create(CLUSTER_PATH, 
marshal(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                                initOps.add(Op.create(JOIN_HIST_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                                initOps.add(Op.create(ALIVE_NODES_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                                initOps.add(Op.create(EVENTS_PATH, 
EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+                                zk.multi(initOps);
+                            }
+
+                            break;
+                        }
+                    }
+                    finally {
+                        mux.release();
+                    }
+                }
+                else
+                    break;
             }
 
             ZKClusterData clusterData = unmarshal(zk.getData(CLUSTER_PATH, 
false, null));
 
             gridStartTime = clusterData.gridStartTime;
 
-            zk.getData(EVENTS_PATH, true, dataUpdateCallback, null);
-            zk.getChildren(ALIVE_NODES_PATH, true, nodesUpdateCallback, null);
-            zk.getChildren(JOIN_HIST_PATH, true, nodesUpdateCallback, null);
+            zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null);
+            zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, 
null);
+            zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, 
null);
 
             List<Op> joinOps = new ArrayList<>();
 
@@ -942,9 +989,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
             log.info("Process event [type=" + event.getType() + ", state=" + 
event.getState() + ", path=" + event.getPath() + ']');
 
             if (event.getType() == Event.EventType.NodeChildrenChanged) {
-                zk.getChildren(event.getPath(), true, nodesUpdateCallback, 
null);
+                zk.getChildren(event.getPath(), this, nodesUpdateCallback, 
null);
             } else if (event.getType() == Event.EventType.NodeDataChanged) {
-                zk.getData(event.getPath(), true, dataUpdateCallback, null);
+                zk.getData(event.getPath(), this, dataUpdateCallback, null);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/24647818/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index 5078744..e8d13a1 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -39,17 +39,24 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /** */
     private TestingCluster zkCluster;
 
+    /** */
+    private static final boolean USE_TEST_CLUSTER = true;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         cfg.setConsistentId(igniteInstanceName);
 
-        assert zkCluster != null;
-
         ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
 
-        zkSpi.setConnectString(zkCluster.getConnectString());
+        if (USE_TEST_CLUSTER) {
+            assert zkCluster != null;
+
+            zkSpi.setConnectString(zkCluster.getConnectString());
+        }
+        else
+            zkSpi.setConnectString("localhost:2181");
 
         cfg.setDiscoverySpi(zkSpi);
 
@@ -68,8 +75,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        zkCluster = new TestingCluster(1);
-        zkCluster.start();
+        if (USE_TEST_CLUSTER) {
+            zkCluster = new TestingCluster(1);
+            zkCluster.start();
+        }
 
     }
 

Reply via email to