Repository: ignite Updated Branches: refs/heads/ignite-zk 6c1fe28c7 -> 246478186
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/24647818 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/24647818 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/24647818 Branch: refs/heads/ignite-zk Commit: 246478186aaef2f1e06deacb19d5198aeb1157fa Parents: 6c1fe28 Author: sboikov <[email protected]> Authored: Mon Nov 13 12:01:06 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 13 12:01:06 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 83 +++++++++++++++----- .../zk/ZookeeperDiscoverySpiBasicTest.java | 19 +++-- 2 files changed, 79 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/24647818/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index c267cf2..41debd7 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -28,6 +28,10 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.RetryForever; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -56,6 +60,7 @@ import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; @@ -72,6 +77,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery private static final String IGNITE_PATH = "/ignite"; /** */ + private static final String IGNITE_INIT_LOCK_PATH = "/igniteLock"; + + /** */ private static final String CLUSTER_PATH = IGNITE_PATH + "/cluster"; /** */ @@ -263,7 +271,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { // TODO ZK - throw new UnsupportedOperationException(); + //throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @@ -315,37 +323,76 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery // ZK generates internal threads' names using current thread name. Thread.currentThread().setName("zk-" + igniteInstanceName); + CuratorFramework c; + try { - zk = new ZooKeeper(connectString, sesTimeout, zkWatcher); + c = CuratorFrameworkFactory.newClient(connectString, sesTimeout, sesTimeout, new RetryForever(500)); + + c.start(); + + zk = c.getZookeeperClient().getZooKeeper(); + // zk = new ZooKeeper(connectString, sesTimeout, zkWatcher); } finally { Thread.currentThread().setName(threadName); } - // TODO ZK: properly handle first node start and init after full cluster restart. - if (zk.exists(IGNITE_PATH, false) == null) { - log.info("Initialize Zookeeper nodes."); + for (;;) { + boolean started = zk.exists(IGNITE_PATH, false) != null && + zk.exists(ALIVE_NODES_PATH, false) != null && + !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty(); + + if (!started) { + InterProcessMutex mux = new InterProcessMutex(c, IGNITE_INIT_LOCK_PATH); + + mux.acquire(); + + try { + started = zk.exists(IGNITE_PATH, false) != null && + zk.exists(ALIVE_NODES_PATH, false) != null && + !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty(); + + if (!started) { + log.info("First node starts, reset ZK state"); - List<Op> initOps = new ArrayList<>(); + if (zk.exists(IGNITE_PATH, false) != null) + ZKUtil.deleteRecursive(zk, IGNITE_PATH); - ZKClusterData clusterData = new ZKClusterData(U.currentTimeMillis()); + // TODO ZK: properly handle first node start and init after full cluster restart. + if (zk.exists(IGNITE_PATH, false) == null) { + log.info("Initialize Zookeeper nodes."); - initOps.add(Op.create(IGNITE_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(CLUSTER_PATH, marshal(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(JOIN_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(ALIVE_NODES_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(EVENTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + List<Op> initOps = new ArrayList<>(); - zk.multi(initOps); + ZKClusterData clusterData = new ZKClusterData(U.currentTimeMillis()); + + initOps.add(Op.create(IGNITE_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(CLUSTER_PATH, marshal(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(JOIN_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(ALIVE_NODES_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + initOps.add(Op.create(EVENTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + + zk.multi(initOps); + } + + break; + } + } + finally { + mux.release(); + } + } + else + break; } ZKClusterData clusterData = unmarshal(zk.getData(CLUSTER_PATH, false, null)); gridStartTime = clusterData.gridStartTime; - zk.getData(EVENTS_PATH, true, dataUpdateCallback, null); - zk.getChildren(ALIVE_NODES_PATH, true, nodesUpdateCallback, null); - zk.getChildren(JOIN_HIST_PATH, true, nodesUpdateCallback, null); + zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null); + zk.getChildren(ALIVE_NODES_PATH, zkWatcher, nodesUpdateCallback, null); + zk.getChildren(JOIN_HIST_PATH, zkWatcher, nodesUpdateCallback, null); List<Op> joinOps = new ArrayList<>(); @@ -942,9 +989,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery log.info("Process event [type=" + event.getType() + ", state=" + event.getState() + ", path=" + event.getPath() + ']'); if (event.getType() == Event.EventType.NodeChildrenChanged) { - zk.getChildren(event.getPath(), true, nodesUpdateCallback, null); + zk.getChildren(event.getPath(), this, nodesUpdateCallback, null); } else if (event.getType() == Event.EventType.NodeDataChanged) { - zk.getData(event.getPath(), true, dataUpdateCallback, null); + zk.getData(event.getPath(), this, dataUpdateCallback, null); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/24647818/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index 5078744..e8d13a1 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -39,17 +39,24 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private TestingCluster zkCluster; + /** */ + private static final boolean USE_TEST_CLUSTER = true; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setConsistentId(igniteInstanceName); - assert zkCluster != null; - ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); - zkSpi.setConnectString(zkCluster.getConnectString()); + if (USE_TEST_CLUSTER) { + assert zkCluster != null; + + zkSpi.setConnectString(zkCluster.getConnectString()); + } + else + zkSpi.setConnectString("localhost:2181"); cfg.setDiscoverySpi(zkSpi); @@ -68,8 +75,10 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @Override protected void beforeTest() throws Exception { super.beforeTest(); - zkCluster = new TestingCluster(1); - zkCluster.start(); + if (USE_TEST_CLUSTER) { + zkCluster = new TestingCluster(1); + zkCluster.start(); + } }
