Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 5671b68a7 -> d56163c0e


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d56163c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d56163c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d56163c0

Branch: refs/heads/ignite-zk
Commit: d56163c0e5d87062687ec2325a8b11fb9e8682cc
Parents: 5671b68
Author: sboikov <[email protected]>
Authored: Fri Nov 10 11:34:00 2017 +0300
Committer: sboikov <[email protected]>
Committed: Fri Nov 10 11:34:00 2017 +0300

----------------------------------------------------------------------
 .../tcp/ipfinder/zk/ZKClusterNodeNew.java       | 111 ++++++++++++++-----
 .../java/org/apache/zookeeper/ZKSimpleTest.java |  19 +++-
 2 files changed, 97 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d56163c0/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
index 4287bb6..bc2620e 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
 
+import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -25,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -89,7 +89,7 @@ public class ZKClusterNodeNew implements Watcher {
     }
 
     /** */
-    static int nodeIdGen;
+    static int nodeIdGen = 1;
 
     public ZKClusterNodeNew(String nodeName) {
         this.nodeName = nodeName;
@@ -117,27 +117,35 @@ public class ZKClusterNodeNew implements Watcher {
     /**
      *
      */
-    static class NodeData {
+    static class NodeData implements Serializable {
         /** */
+        @GridToStringInclude
         final int order;
 
         /** */
+        @GridToStringInclude
         final String nodeId;
 
         NodeData(int order, String nodeId) {
             this.order = order;
             this.nodeId = nodeId;
         }
+
+        @Override public String toString() {
+            return S.toString(NodeData.class, this);
+        }
     }
 
     /**
      *
      */
-    static class AliveNodes {
+    static class AliveNodes implements Serializable {
         /** */
+        @GridToStringInclude
         final int ver;
 
         /** */
+        @GridToStringInclude
         final TreeMap<Integer, NodeData> nodesByOrder;
 
         /** */
@@ -156,6 +164,10 @@ public class ZKClusterNodeNew implements Watcher {
             for (NodeData nodeData : nodesByOrder.values())
                 nodesById.put(nodeData.nodeId, nodeData);
         }
+
+        @Override public String toString() {
+            return S.toString(AliveNodes.class, this);
+        }
     }
 
     /** */
@@ -173,25 +185,33 @@ public class ZKClusterNodeNew implements Watcher {
     /**
      *
      */
-    static class DiscoveryEvents {
+    static class DiscoveryEvents implements Serializable {
         /** */
+        @GridToStringInclude
+        int ver;
+
+        /** */
+        @GridToStringInclude
         final AliveNodes aliveNodes;
 
         /** */
+        @GridToStringInclude
         final TreeMap<Integer, DiscoveryEvent> evts;
 
-        int ver;
-
         DiscoveryEvents(AliveNodes aliveNodes, TreeMap<Integer, 
DiscoveryEvent> evts) {
             this.aliveNodes = aliveNodes;
             this.evts = evts;
         }
+
+        @Override public String toString() {
+            return S.toString(DiscoveryEvents.class, this);
+        }
     }
 
     /**
      *
      */
-    static class DiscoveryEvent {
+    static class DiscoveryEvent implements Serializable {
         /** */
         @GridToStringInclude
         final DiscoveryEventType evtType;
@@ -261,9 +281,6 @@ public class ZKClusterNodeNew implements Watcher {
                 generateEvents(curAlive, newAlive);
 
                 curAlive = newAlive;
-
-                if (connectLatch.getCount() > 0 && 
newAlive.nodesById.containsKey(nodeId))
-                    connectLatch.countDown();
             }
         }
     }
@@ -271,30 +288,41 @@ public class ZKClusterNodeNew implements Watcher {
     private void generateEvents(AliveNodes oldNodes, AliveNodes newNodes) {
         assert newNodes != null;
 
-        if (oldNodes == null) {
-            NodeData locNode = newNodes.nodesById.get(nodeId);
+        NodeData locNode = newNodes.nodesById.get(nodeId);
 
-            if (locNode == null)
-                return;
+        if (locNode == null)
+            return;
+
+        if (!crd && newNodes.nodesByOrder.firstKey() == locNode.order) {
+            log("Node become coordinator [oldNodes=" + oldNodes + ", curEvts=" 
+ curEvts + ']');
 
-            if (newNodes.nodesByOrder.firstKey() == locNode.order) {
-                log("Coordinator joined");
+            if (curEvts != null) {
+                assert curEvts.aliveNodes != null;
 
-                curCrdEvts = curEvts;
+                oldNodes = curEvts.aliveNodes;
 
-                crd = true;
+                log("Node coordinator use old nodes from last events 
[oldNodes=" + oldNodes + ']');
             }
+            else if (oldNodes == null) {
+                oldNodes = new AliveNodes(0, new TreeMap<Integer, NodeData>());
 
-            return;
+                log("Node coordinator init old nodes [oldNodes=" + oldNodes + 
']');
+            }
+
+            curCrdEvts = curEvts;
+
+            crd = true;
         }
 
         if (!crd)
             return;
 
+        log("Generate discovery events [oldNodes=" + oldNodes + ", newNodes=" 
+ newNodes + ']');
+
         if (oldNodes.ver == newNodes.ver)
             return;
 
-        int nextJoinOrder = oldNodes.nodesByOrder.lastKey() + 1;
+        int nextJoinOrder = oldNodes.nodesByOrder.isEmpty() ? 1 : 
oldNodes.nodesByOrder.lastKey() + 1;
 
         TreeMap<Integer, DiscoveryEvent> evts = new TreeMap<>();
 
@@ -320,12 +348,14 @@ public class ZKClusterNodeNew implements Watcher {
                         failed.add(oldData.order);
 
                         evts.put(v, new 
DiscoveryEvent(DiscoveryEventType.NODE_FAILED, v, oldData.nodeId));
+
+                        break;
                     }
                 }
             }
         }
 
-        log("Generate events on coordinator: " + evts);
+        log("Generated discovery events on coordinator: " + evts);
 
         DiscoveryEvents newEvents;
 
@@ -344,15 +374,17 @@ public class ZKClusterNodeNew implements Watcher {
             newEvents = new DiscoveryEvents(newNodes, evts);
 
             expVer = curCrdEvts.ver;
-
-            newEvents.ver = expVer + 1;
         }
 
+        newEvents.ver = expVer + 1;
+
         try {
             zk.setData(EVENTS_PATH, marshal(newEvents), expVer);
         }
         catch (Exception e) {
-            e.printStackTrace();
+            log("Events update error: " + e);
+
+            e.printStackTrace(System.out);
         }
 
         curCrdEvts = newEvents;
@@ -360,7 +392,7 @@ public class ZKClusterNodeNew implements Watcher {
 
     static NodeData parseNodePath(String path) {
         String nodeId = path.substring(0, ID_LEN);
-        int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1));
+        int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1)) + 1;
 
         return new NodeData(nodeOrder, nodeId);
     }
@@ -371,6 +403,9 @@ public class ZKClusterNodeNew implements Watcher {
     /** */
     private DiscoveryEvents curCrdEvts;
 
+    /** */
+    private DiscoveryEvent lastEvt;
+
     /**
      *
      */
@@ -388,8 +423,28 @@ public class ZKClusterNodeNew implements Watcher {
 
                 newEvts.ver = stat.getVersion();
 
-                for (DiscoveryEvent e : newEvts.evts.values())
-                    log("Event update: " + e)
+                for (DiscoveryEvent e : newEvts.evts.values()) {
+                    boolean fireEvt;
+
+                    if (lastEvt == null)
+                        fireEvt = e.evtType == DiscoveryEventType.NODE_JOINED 
&& e.nodeId.equals(nodeId);
+                    else
+                        fireEvt = e.topVer > lastEvt.topVer;
+
+                    if (fireEvt) {
+                        assert lastEvt == null || lastEvt.topVer + 1 == 
e.topVer : "lastEvt=" + lastEvt + ", nextEvt=" + e;
+
+                        log("Received discovery event: " + e);
+
+                        if (e.evtType == DiscoveryEventType.NODE_JOINED && 
e.nodeId.equals(nodeId)) {
+                            log("Local node joined: " + e);
+
+                            connectLatch.countDown();
+                        }
+
+                        lastEvt = e;
+                    }
+                }
 
                 curEvts = newEvts;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d56163c0/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java
index f0ab6e3..c48e6d1 100644
--- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java
@@ -30,18 +30,27 @@ public class ZKSimpleTest {
 
         System.out.println("ZK started\n");
 
-        ZKClusterNodeNew node0 = new ZKClusterNodeNew("n0");
-        node0.join(zkCluster.getConnectString());
-
         ZKClusterNodeNew node1 = new ZKClusterNodeNew("n1");
         node1.join(zkCluster.getConnectString());
 
         ZKClusterNodeNew node2 = new ZKClusterNodeNew("n2");
         node2.join(zkCluster.getConnectString());
 
-        System.out.println("Stop node");
+        ZKClusterNodeNew node3 = new ZKClusterNodeNew("n3");
+        node3.join(zkCluster.getConnectString());
+
+//        ZKClusterNodeNew node4 = new ZKClusterNodeNew("n4");
+//        node4.join(zkCluster.getConnectString());
+
+        System.out.println("Stop n2");
+
+        node2.stop();
+
+        //Thread.sleep(5000);
+
+        System.out.println("Stop n3");
 
-        node1.stop();
+        node3.stop();
 
         System.out.println("Done");
 

Reply via email to