Repository: ignite Updated Branches: refs/heads/ignite-zk 54211bfae -> bedc4e99e
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bedc4e99 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bedc4e99 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bedc4e99 Branch: refs/heads/ignite-zk Commit: bedc4e99e14bd597616b134d99ea75cb4d22ea08 Parents: 54211bf Author: sboikov <[email protected]> Authored: Tue Nov 14 22:37:24 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Nov 14 23:25:47 2017 +0300 ---------------------------------------------------------------------- .../spi/discovery/zk/ZookeeperDiscoverySpi.java | 136 ++++++++++++------- .../zk/ZookeeperDiscoverySpiBasicTest.java | 17 ++- 2 files changed, 103 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bedc4e99/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index cc0e6a4..a924b49 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -370,6 +370,30 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery !zk.getChildren(ALIVE_NODES_PATH, false).isEmpty(); } + private void startConnect(DiscoveryDataBag discoDataBag) throws Exception { + ZKClusterData clusterData = unmarshal(zk.getData(CLUSTER_PATH, false, null)); + + gridStartTime = clusterData.gridStartTime; + + zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null); + zk.getChildren(JOIN_HIST_PATH, zkWatcher, zkChildrenUpdateCallback, null); + zk.getChildren(ALIVE_NODES_PATH, zkWatcher, zkChildrenUpdateCallback, null); + + ZKJoiningNodeData joinData = new ZKJoiningNodeData(locNode, discoDataBag.joiningNodeData()); + + byte[] nodeData = marshal(joinData); + + String zkNode = "/" + locNode.id().toString() + "-"; + + zkCurator.inTransaction(). + create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH + zkNode, nodeData). + and(). + create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH + zkNode). + and().commit(); + + connectStart.countDown(); + } + /** {@inheritDoc} */ @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException { try { @@ -413,6 +437,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery Thread.currentThread().setName(threadName); } + boolean startedConnect = false; + for (;;) { boolean started = igniteClusterStarted(); @@ -434,19 +460,27 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (zkCurator.checkExists().forPath(IGNITE_PATH) == null) { log.info("Initialize Zookeeper nodes."); - List<Op> initOps = new ArrayList<>(); - ZKClusterData clusterData = new ZKClusterData(U.currentTimeMillis()); - initOps.add(Op.create(IGNITE_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(CLUSTER_PATH, marshal(clusterData), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(JOIN_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(ALIVE_NODES_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(EVENTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(DISCO_EVTS_HIST_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - initOps.add(Op.create(CUSTOM_EVTS_PATH, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - - zk.multi(initOps); + zkCurator.inTransaction(). + create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(IGNITE_PATH, EMPTY_BYTES). + and(). + create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(CLUSTER_PATH, marshal(clusterData)). + and(). + create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH, EMPTY_BYTES). + and(). + create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH, EMPTY_BYTES). + and(). + create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(EVENTS_PATH, EMPTY_BYTES). + and(). + create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(DISCO_EVTS_HIST_PATH, EMPTY_BYTES). + and(). + create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(CUSTOM_EVTS_PATH, EMPTY_BYTES). + and().commit(); + + startConnect(discoDataBag); + + startedConnect = true; } break; @@ -460,33 +494,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery break; } - ZKClusterData clusterData = unmarshal(zk.getData(CLUSTER_PATH, false, null)); - - gridStartTime = clusterData.gridStartTime; - - zk.getData(EVENTS_PATH, zkWatcher, dataUpdateCallback, null); - zk.getChildren(JOIN_HIST_PATH, zkWatcher, zkChildrenUpdateCallback, null); - zk.getChildren(ALIVE_NODES_PATH, zkWatcher, zkChildrenUpdateCallback, null); - - List<Op> joinOps = new ArrayList<>(); - - ZKJoiningNodeData joinData = new ZKJoiningNodeData(locNode, discoDataBag.joiningNodeData()); - - byte[] nodeData = marshal(joinData); - - String zkNode = "/" + locNode.id().toString() + "-"; - -// joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL)); -// joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, EMPTY_BYTES, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL)); -// List<OpResult> res = zk.multi(joinOps); - - zkCurator.inTransaction(). - create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(JOIN_HIST_PATH + zkNode, nodeData). - and(). - create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(ALIVE_NODES_PATH + zkNode). - and().commit(); - - connectStart.countDown(); + if (!startedConnect) + startConnect(discoDataBag); log.info("Waiting for local join event [nodeId=" + locNode.id() + ", name=" + igniteInstanceName + ']'); @@ -663,11 +672,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery data.joinData = joinData; } - private void processJoinedNodesHistory(List<String> children) { + private void processJoinedNodesHistory(List<String> children, long joinOrder) { for (String child : children) { ZKNodeData data = parseNodePath(child); - if (!joinHist.hist.containsKey(data.order)) { + if (data.order >= joinOrder && !joinHist.hist.containsKey(data.order)) { try { Object old = joinHist.hist.put(data.order, data); @@ -712,11 +721,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery ", path=" + path + ", nodes=" + children + ", ver=" + (stat != null ? stat.getCversion() : null) + ']'); - - if (stat != null) - joinHist.stat = stat; - - processJoinedNodesHistory(children); +// +// if (stat != null) +// joinHist.stat = stat; +// +// processJoinedNodesHistory(children); } else if (path.equals(ALIVE_NODES_PATH)) { log.info("Alive nodes changed [rc=" + rc + @@ -820,7 +829,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery // TODO ZK: check version. List<String> children = zkCurator.getChildren().forPath(JOIN_HIST_PATH); - processJoinedNodesHistory(children); + processJoinedNodesHistory(children, nextJoinOrder); joined = joinHist.hist.get(nextJoinOrder); } @@ -877,6 +886,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery v, failedNode, new ArrayList<>(curTop.values()))); + + joinHist.hist.remove(joined.order); } nextJoinOrder++; @@ -898,6 +909,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery v, failedNode, new ArrayList<>(curTop.values()))); + + joinHist.hist.remove(oldData.order); } break; @@ -969,6 +982,7 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery /** */ private ZKDiscoveryEvent lastEvt; + /** */ private int lastProcessed = -1; /** @@ -1028,6 +1042,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery if (fireEvt) { assert lastEvt == null || lastEvt.topVer + 1 == e.topVer : "lastEvt=" + lastEvt + ", nextEvt=" + e; + ZookeeperClusterNode evtNode = e.node; + if (!crd) { synchronized (curTop) { if (locJoin) { @@ -1039,6 +1055,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery assert old == null : node; } + evtNode = locNode; + DiscoveryDataBag dataBag = new DiscoveryDataBag(e.node.id()); dataBag.joiningNodeData(e.joiningNodeData); @@ -1060,6 +1078,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery Object old = curTop.put(node.internalOrder(), node); + evtNode = node; + assert old == null : node; break; @@ -1068,9 +1088,9 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery case EventType.EVT_NODE_FAILED: { ZookeeperClusterNode node = e.node; - Object failedNode = curTop.remove(node.internalOrder()); + evtNode = curTop.remove(node.internalOrder()); - assert failedNode != null : node; + assert evtNode != null : node; break; } @@ -1081,17 +1101,30 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery } } } + else + evtNode = curTop.containsKey(e.node.internalOrder()) ? curTop.get(e.node.internalOrder()) : e.node; log.info("Received discovery event, notify listener: " + e); List<ClusterNode> allNodes = allNodesForEvent(e.allNodes); - lsnr.onDiscovery(e.evtType, e.topVer, e.node, allNodes, null, null); + evtNode.local(locNode.id().equals(evtNode.id())); + + lsnr.onDiscovery(e.evtType, e.topVer, evtNode, allNodes, null, null); if (locJoin) { log.info("Local node joined: " + e); joinLatch.countDown(); + + try { + String zkNode = JOIN_HIST_PATH + "/" + locNode.id().toString() + "-" + String.format("%010d", locNode.internalOrder() - 1); + + zkCurator.delete().forPath(zkNode); + } + catch (Exception err) { + U.error(log, "Failed to delete join history data"); + } } lastEvt = e; @@ -1109,6 +1142,11 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements Discovery for (int i = 0; i < allNodes.size(); i++) { ZookeeperClusterNode node = allNodes.get(i); + ZookeeperClusterNode node0 = curTop.get(node.internalOrder()); + + if (node0 != null) + node = node0; + node.local(locNode.id().equals(node.id())); res.add(node); http://git-wip-us.apache.org/repos/asf/ignite/blob/bedc4e99/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java index b4db065..4cd86db 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java @@ -486,7 +486,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @param nodeId Node ID. * @param expEvts Expected events. - * @throws Exception If fialed. + * @throws Exception If failed. */ private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -521,6 +521,21 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testClusterRestart() throws Exception { + startGridsMultiThreaded(3, false); + + stopAllGrids(); + + evts.clear(); + + startGridsMultiThreaded(3, false); + + waitForTopology(3); + } + + /** + * @throws Exception If failed. + */ public void testConnectionRestore4() throws Exception { testSockNio = true;
