Repository: ignite Updated Branches: refs/heads/ignite-zk 5671b68a7 -> d56163c0e
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d56163c0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d56163c0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d56163c0 Branch: refs/heads/ignite-zk Commit: d56163c0e5d87062687ec2325a8b11fb9e8682cc Parents: 5671b68 Author: sboikov <[email protected]> Authored: Fri Nov 10 11:34:00 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri Nov 10 11:34:00 2017 +0300 ---------------------------------------------------------------------- .../tcp/ipfinder/zk/ZKClusterNodeNew.java | 111 ++++++++++++++----- .../java/org/apache/zookeeper/ZKSimpleTest.java | 19 +++- 2 files changed, 97 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d56163c0/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java index 4287bb6..bc2620e 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.ipfinder.zk; +import java.io.Serializable; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; @@ -25,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -89,7 +89,7 @@ public class ZKClusterNodeNew implements Watcher { } /** */ - static int nodeIdGen; + static int nodeIdGen = 1; public ZKClusterNodeNew(String nodeName) { this.nodeName = nodeName; @@ -117,27 +117,35 @@ public class ZKClusterNodeNew implements Watcher { /** * */ - static class NodeData { + static class NodeData implements Serializable { /** */ + @GridToStringInclude final int order; /** */ + @GridToStringInclude final String nodeId; NodeData(int order, String nodeId) { this.order = order; this.nodeId = nodeId; } + + @Override public String toString() { + return S.toString(NodeData.class, this); + } } /** * */ - static class AliveNodes { + static class AliveNodes implements Serializable { /** */ + @GridToStringInclude final int ver; /** */ + @GridToStringInclude final TreeMap<Integer, NodeData> nodesByOrder; /** */ @@ -156,6 +164,10 @@ public class ZKClusterNodeNew implements Watcher { for (NodeData nodeData : nodesByOrder.values()) nodesById.put(nodeData.nodeId, nodeData); } + + @Override public String toString() { + return S.toString(AliveNodes.class, this); + } } /** */ @@ -173,25 +185,33 @@ public class ZKClusterNodeNew implements Watcher { /** * */ - static class DiscoveryEvents { + static class DiscoveryEvents implements Serializable { /** */ + @GridToStringInclude + int ver; + + /** */ + @GridToStringInclude final AliveNodes aliveNodes; /** */ + @GridToStringInclude final TreeMap<Integer, DiscoveryEvent> evts; - int ver; - DiscoveryEvents(AliveNodes aliveNodes, TreeMap<Integer, DiscoveryEvent> evts) { this.aliveNodes = aliveNodes; this.evts = evts; } + + @Override public String toString() { + return S.toString(DiscoveryEvents.class, this); + } } /** * */ - static class DiscoveryEvent { + static class DiscoveryEvent implements Serializable { /** */ @GridToStringInclude final DiscoveryEventType evtType; @@ -261,9 +281,6 @@ public class ZKClusterNodeNew implements Watcher { generateEvents(curAlive, newAlive); curAlive = newAlive; - - if (connectLatch.getCount() > 0 && newAlive.nodesById.containsKey(nodeId)) - connectLatch.countDown(); } } } @@ -271,30 +288,41 @@ public class ZKClusterNodeNew implements Watcher { private void generateEvents(AliveNodes oldNodes, AliveNodes newNodes) { assert newNodes != null; - if (oldNodes == null) { - NodeData locNode = newNodes.nodesById.get(nodeId); + NodeData locNode = newNodes.nodesById.get(nodeId); - if (locNode == null) - return; + if (locNode == null) + return; + + if (!crd && newNodes.nodesByOrder.firstKey() == locNode.order) { + log("Node become coordinator [oldNodes=" + oldNodes + ", curEvts=" + curEvts + ']'); - if (newNodes.nodesByOrder.firstKey() == locNode.order) { - log("Coordinator joined"); + if (curEvts != null) { + assert curEvts.aliveNodes != null; - curCrdEvts = curEvts; + oldNodes = curEvts.aliveNodes; - crd = true; + log("Node coordinator use old nodes from last events [oldNodes=" + oldNodes + ']'); } + else if (oldNodes == null) { + oldNodes = new AliveNodes(0, new TreeMap<Integer, NodeData>()); - return; + log("Node coordinator init old nodes [oldNodes=" + oldNodes + ']'); + } + + curCrdEvts = curEvts; + + crd = true; } if (!crd) return; + log("Generate discovery events [oldNodes=" + oldNodes + ", newNodes=" + newNodes + ']'); + if (oldNodes.ver == newNodes.ver) return; - int nextJoinOrder = oldNodes.nodesByOrder.lastKey() + 1; + int nextJoinOrder = oldNodes.nodesByOrder.isEmpty() ? 1 : oldNodes.nodesByOrder.lastKey() + 1; TreeMap<Integer, DiscoveryEvent> evts = new TreeMap<>(); @@ -320,12 +348,14 @@ public class ZKClusterNodeNew implements Watcher { failed.add(oldData.order); evts.put(v, new DiscoveryEvent(DiscoveryEventType.NODE_FAILED, v, oldData.nodeId)); + + break; } } } } - log("Generate events on coordinator: " + evts); + log("Generated discovery events on coordinator: " + evts); DiscoveryEvents newEvents; @@ -344,15 +374,17 @@ public class ZKClusterNodeNew implements Watcher { newEvents = new DiscoveryEvents(newNodes, evts); expVer = curCrdEvts.ver; - - newEvents.ver = expVer + 1; } + newEvents.ver = expVer + 1; + try { zk.setData(EVENTS_PATH, marshal(newEvents), expVer); } catch (Exception e) { - e.printStackTrace(); + log("Events update error: " + e); + + e.printStackTrace(System.out); } curCrdEvts = newEvents; @@ -360,7 +392,7 @@ public class ZKClusterNodeNew implements Watcher { static NodeData parseNodePath(String path) { String nodeId = path.substring(0, ID_LEN); - int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)); + int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)) + 1; return new NodeData(nodeOrder, nodeId); } @@ -371,6 +403,9 @@ public class ZKClusterNodeNew implements Watcher { /** */ private DiscoveryEvents curCrdEvts; + /** */ + private DiscoveryEvent lastEvt; + /** * */ @@ -388,8 +423,28 @@ public class ZKClusterNodeNew implements Watcher { newEvts.ver = stat.getVersion(); - for (DiscoveryEvent e : newEvts.evts.values()) - log("Event update: " + e) + for (DiscoveryEvent e : newEvts.evts.values()) { + boolean fireEvt; + + if (lastEvt == null) + fireEvt = e.evtType == DiscoveryEventType.NODE_JOINED && e.nodeId.equals(nodeId); + else + fireEvt = e.topVer > lastEvt.topVer; + + if (fireEvt) { + assert lastEvt == null || lastEvt.topVer + 1 == e.topVer : "lastEvt=" + lastEvt + ", nextEvt=" + e; + + log("Received discovery event: " + e); + + if (e.evtType == DiscoveryEventType.NODE_JOINED && e.nodeId.equals(nodeId)) { + log("Local node joined: " + e); + + connectLatch.countDown(); + } + + lastEvt = e; + } + } curEvts = newEvts; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d56163c0/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java index f0ab6e3..c48e6d1 100644 --- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java +++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java @@ -30,18 +30,27 @@ public class ZKSimpleTest { System.out.println("ZK started\n"); - ZKClusterNodeNew node0 = new ZKClusterNodeNew("n0"); - node0.join(zkCluster.getConnectString()); - ZKClusterNodeNew node1 = new ZKClusterNodeNew("n1"); node1.join(zkCluster.getConnectString()); ZKClusterNodeNew node2 = new ZKClusterNodeNew("n2"); node2.join(zkCluster.getConnectString()); - System.out.println("Stop node"); + ZKClusterNodeNew node3 = new ZKClusterNodeNew("n3"); + node3.join(zkCluster.getConnectString()); + +// ZKClusterNodeNew node4 = new ZKClusterNodeNew("n4"); +// node4.join(zkCluster.getConnectString()); + + System.out.println("Stop n2"); + + node2.stop(); + + //Thread.sleep(5000); + + System.out.println("Stop n3"); - node1.stop(); + node3.stop(); System.out.println("Done");
