Repository: ignite
Updated Branches:
  refs/heads/ignite-zk bc766b04a -> 5671b68a7


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5671b68a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5671b68a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5671b68a

Branch: refs/heads/ignite-zk
Commit: 5671b68a71d7f8fad0ea34687295ad627de4952a
Parents: bc766b0
Author: sboikov <[email protected]>
Authored: Thu Nov 9 18:22:20 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Nov 9 18:53:00 2017 +0300

----------------------------------------------------------------------
 .../tcp/ipfinder/zk/ZKClusterNode.java          |   9 +-
 .../tcp/ipfinder/zk/ZKClusterNodeNew.java       | 483 +++++++++++++++++++
 .../org/apache/zookeeper/ZKDisconnectTest.java  |   7 +-
 .../java/org/apache/zookeeper/ZKSimpleTest.java |  50 ++
 4 files changed, 541 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5671b68a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNode.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNode.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNode.java
index fe40f07..c21da20 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNode.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNode.java
@@ -66,8 +66,8 @@ public class ZKClusterNode implements Watcher {
     @Override public void process(WatchedEvent event) {
         log("Process event [type=" + event.getType() + ", state=" + 
event.getState() + ", path=" + event.getPath() + ']');
 
-        if (event.getType() == Event.EventType.NodeChildrenChanged && 
event.getPath().equals(CLUSTER_PATH)) {
-            zk.getChildren(CLUSTER_PATH, true, nodesUpdateCallback, null);
+        if (event.getType() == Event.EventType.NodeChildrenChanged) {
+            zk.getChildren(event.getPath(), true, nodesUpdateCallback, null);
         }
     }
 
@@ -76,9 +76,7 @@ public class ZKClusterNode implements Watcher {
      */
     class NodesUpdateCallback implements AsyncCallback.Children2Callback {
         @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
-            log("Nodes changed: " + children);
-
-            curNodes = children;
+            log("Nodes changed [rc=" + rc + ", path=" + path + ", nodes=" + 
children + ", ver=" + (stat != null ? stat.getCversion() : null) + ']');
         }
     }
 
@@ -98,7 +96,6 @@ public class ZKClusterNode implements Watcher {
 
         zk.getChildren(CLUSTER_PATH, true, nodesUpdateCallback, null);
 
-
         zk.create(CLUSTER_PATH + "/node-", new byte[]{}, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
             new AsyncCallback.StringCallback() {
                 @Override public void processResult(int rc, String path, 
Object ctx, String name) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5671b68a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
new file mode 100644
index 0000000..4287bb6
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZKClusterNodeNew.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import sun.reflect.generics.tree.Tree;
+
+/**
+ *
+ */
+public class ZKClusterNodeNew implements Watcher {
+    /** */
+    private static final String CLUSTER_PATH = "/cluster";
+
+    /** */
+    private static final String EVENTS_PATH = CLUSTER_PATH + "/events";
+
+    /** */
+    private static final String JOIN_HIST_PATH = CLUSTER_PATH + "/joinHist";
+
+    /** */
+    private static final String ALIVE_NODES_PATH = CLUSTER_PATH + "/alive";
+
+    /** */
+    private ZooKeeper zk;
+
+    /** */
+    private final NodesUpdateCallback nodesUpdateCallback;
+
+    /** */
+    private final DataUpdateCallback dataUpdateCallback;
+
+    /** */
+    private final String nodeName;
+
+    /** */
+    private final CountDownLatch connectLatch = new CountDownLatch(1);
+
+    /** */
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+
+    private final String nodeId;
+
+    /** */
+    private static final int ID_LEN = 1;
+
+    static class Node {
+        final String name;
+
+        Node(String name) {
+            this.name = name;
+        }
+    }
+
+    /** */
+    static int nodeIdGen;
+
+    public ZKClusterNodeNew(String nodeName) {
+        this.nodeName = nodeName;
+
+        nodesUpdateCallback = new NodesUpdateCallback();
+        dataUpdateCallback = new DataUpdateCallback();
+
+        nodeId = String.valueOf(nodeIdGen++);//UUID.randomUUID().toString();
+    }
+
+    private void log(String msg) {
+        System.out.println(nodeName + ": " + msg);
+    }
+
+    @Override public void process(WatchedEvent event) {
+        log("Process event [type=" + event.getType() + ", state=" + 
event.getState() + ", path=" + event.getPath() + ']');
+
+        if (event.getType() == Event.EventType.NodeChildrenChanged) {
+            zk.getChildren(event.getPath(), true, nodesUpdateCallback, null);
+        } else if (event.getType() == Event.EventType.NodeDataChanged) {
+            zk.getData(event.getPath(), true, dataUpdateCallback, null);
+        }
+    }
+
+    /**
+     *
+     */
+    static class NodeData {
+        /** */
+        final int order;
+
+        /** */
+        final String nodeId;
+
+        NodeData(int order, String nodeId) {
+            this.order = order;
+            this.nodeId = nodeId;
+        }
+    }
+
+    /**
+     *
+     */
+    static class AliveNodes {
+        /** */
+        final int ver;
+
+        /** */
+        final TreeMap<Integer, NodeData> nodesByOrder;
+
+        /** */
+        final TreeMap<String, NodeData> nodesById;
+
+        /**
+         * @param ver
+         * @param nodesByOrder
+         */
+        AliveNodes(int ver, TreeMap<Integer, NodeData> nodesByOrder) {
+            this.ver = ver;
+            this.nodesByOrder = nodesByOrder;
+
+            nodesById = new TreeMap<>();
+
+            for (NodeData nodeData : nodesByOrder.values())
+                nodesById.put(nodeData.nodeId, nodeData);
+        }
+    }
+
+    /** */
+    private Map<Integer, NodeData> joinHist = new HashMap<>();
+
+    /** */
+    private boolean crd;
+
+    /** */
+    private final JdkMarshaller jdkMarshaller = new JdkMarshaller();
+
+    /** */
+    private AliveNodes curAlive;
+
+    /**
+     *
+     */
+    static class DiscoveryEvents {
+        /** */
+        final AliveNodes aliveNodes;
+
+        /** */
+        final TreeMap<Integer, DiscoveryEvent> evts;
+
+        int ver;
+
+        DiscoveryEvents(AliveNodes aliveNodes, TreeMap<Integer, 
DiscoveryEvent> evts) {
+            this.aliveNodes = aliveNodes;
+            this.evts = evts;
+        }
+    }
+
+    /**
+     *
+     */
+    static class DiscoveryEvent {
+        /** */
+        @GridToStringInclude
+        final DiscoveryEventType evtType;
+
+        /** */
+        @GridToStringInclude
+        final String nodeId;
+
+        /** */
+        @GridToStringInclude
+        final int topVer;
+
+        DiscoveryEvent(DiscoveryEventType evtType, int topVer, String nodeId) {
+            this.evtType = evtType;
+            this.topVer = topVer;
+            this.nodeId = nodeId;
+        }
+
+        @Override public String toString() {
+            return S.toString(DiscoveryEvent.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    enum DiscoveryEventType {
+        /** */
+        NODE_FAILED,
+
+        /** */
+        NODE_JOINED
+    }
+
+    /**
+     *
+     */
+    class NodesUpdateCallback implements AsyncCallback.Children2Callback {
+        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            if (children == null || children.isEmpty())
+                return;
+
+            if (path.equals(JOIN_HIST_PATH)) {
+                log("Join nodes changed [rc=" + rc + ", path=" + path + ", 
nodes=" + children + ", ver=" + (stat != null ? stat.getCversion() : null) + 
']');
+
+                for (String child : children) {
+                    NodeData data = parseNodePath(child);
+
+                    joinHist.put(data.order, data);
+                }
+            }
+            else if (path.equals(ALIVE_NODES_PATH)) {
+                log("Alive nodes changed [rc=" + rc + ", path=" + path + ", 
nodes=" + children + ", ver=" + (stat != null ? stat.getCversion() : null) + 
']');
+
+                assert stat != null;
+
+                TreeMap<Integer, NodeData> nodes = new TreeMap<>();
+
+                for (String child : children) {
+                    NodeData data = parseNodePath(child);
+
+                    nodes.put(data.order, data);
+                }
+
+                AliveNodes newAlive = new AliveNodes(stat.getCversion(), 
nodes);
+
+                generateEvents(curAlive, newAlive);
+
+                curAlive = newAlive;
+
+                if (connectLatch.getCount() > 0 && 
newAlive.nodesById.containsKey(nodeId))
+                    connectLatch.countDown();
+            }
+        }
+    }
+
+    private void generateEvents(AliveNodes oldNodes, AliveNodes newNodes) {
+        assert newNodes != null;
+
+        if (oldNodes == null) {
+            NodeData locNode = newNodes.nodesById.get(nodeId);
+
+            if (locNode == null)
+                return;
+
+            if (newNodes.nodesByOrder.firstKey() == locNode.order) {
+                log("Coordinator joined");
+
+                curCrdEvts = curEvts;
+
+                crd = true;
+            }
+
+            return;
+        }
+
+        if (!crd)
+            return;
+
+        if (oldNodes.ver == newNodes.ver)
+            return;
+
+        int nextJoinOrder = oldNodes.nodesByOrder.lastKey() + 1;
+
+        TreeMap<Integer, DiscoveryEvent> evts = new TreeMap<>();
+
+        Set<Integer> failed = new HashSet<>();
+
+        for (int v = oldNodes.ver + 1; v <= newNodes.ver; v++) {
+            NodeData data = joinHist.get(nextJoinOrder);
+
+            if (data != null) {
+                evts.put(v, new DiscoveryEvent(DiscoveryEventType.NODE_JOINED, 
v, data.nodeId));
+
+                if (!newNodes.nodesByOrder.containsKey(data.order)) {
+                    v++;
+
+                    evts.put(v, new 
DiscoveryEvent(DiscoveryEventType.NODE_FAILED, v, data.nodeId));
+                }
+
+                nextJoinOrder++;
+            }
+            else {
+                for (NodeData oldData : oldNodes.nodesByOrder.values()) {
+                    if (!failed.contains(oldData.order) && 
!newNodes.nodesByOrder.containsKey(oldData.order)) {
+                        failed.add(oldData.order);
+
+                        evts.put(v, new 
DiscoveryEvent(DiscoveryEventType.NODE_FAILED, v, oldData.nodeId));
+                    }
+                }
+            }
+        }
+
+        log("Generate events on coordinator: " + evts);
+
+        DiscoveryEvents newEvents;
+
+        int expVer;
+
+        if (curCrdEvts == null) {
+            expVer = 0;
+
+            newEvents = new DiscoveryEvents(newNodes, evts);
+        }
+        else {
+            TreeMap<Integer, DiscoveryEvent> evts0 = new 
TreeMap<>(curCrdEvts.evts);
+
+            evts0.putAll(evts);
+
+            newEvents = new DiscoveryEvents(newNodes, evts);
+
+            expVer = curCrdEvts.ver;
+
+            newEvents.ver = expVer + 1;
+        }
+
+        try {
+            zk.setData(EVENTS_PATH, marshal(newEvents), expVer);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        curCrdEvts = newEvents;
+    }
+
+    static NodeData parseNodePath(String path) {
+        String nodeId = path.substring(0, ID_LEN);
+        int nodeOrder = Integer.parseInt(path.substring(ID_LEN + 1));
+
+        return new NodeData(nodeOrder, nodeId);
+    }
+
+    /** */
+    private DiscoveryEvents curEvts;
+
+    /** */
+    private DiscoveryEvents curCrdEvts;
+
+    /**
+     *
+     */
+    class DataUpdateCallback implements AsyncCallback.DataCallback {
+        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            log("Data changed [path=" + path + ", ver=" + (stat != null ? 
stat.getVersion() : null) + ']');
+
+            if (data.length == 0)
+                return;
+
+            if (path.equals(EVENTS_PATH)) {
+                assert stat != null;
+
+                DiscoveryEvents newEvts = unmarshal(data);
+
+                newEvts.ver = stat.getVersion();
+
+                for (DiscoveryEvent e : newEvts.evts.values())
+                    log("Event update: " + e)
+
+                curEvts = newEvts;
+            }
+        }
+    }
+
+    private <T> T unmarshal(byte[] data) {
+        try {
+            return jdkMarshaller.unmarshal(data, null);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+
+            throw new RuntimeException(e);
+        }
+    }
+
+    private byte[] marshal(Object obj) {
+        try {
+            return jdkMarshaller.marshal(obj);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void join(String connectString) throws Exception {
+        log("Start connect " + connectString);
+
+        try {
+            zk = new ZooKeeper(connectString, 5000, this);
+
+            if (zk.exists(CLUSTER_PATH, false) == null) {
+                List<Op> initOps = new ArrayList<>();
+
+                initOps.add(Op.create(CLUSTER_PATH, new byte[]{}, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                initOps.add(Op.create(JOIN_HIST_PATH, new byte[]{}, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                initOps.add(Op.create(ALIVE_NODES_PATH, new byte[]{}, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+                initOps.add(Op.create(EVENTS_PATH, new byte[]{}, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+                zk.multi(initOps);
+            }
+
+            zk.getData(EVENTS_PATH, true, dataUpdateCallback, null);
+            zk.getChildren(ALIVE_NODES_PATH, true, nodesUpdateCallback, null);
+            zk.getChildren(JOIN_HIST_PATH, true, nodesUpdateCallback, null);
+
+            log("Start join: " + nodeId);
+
+            List<Op> joinOps = new ArrayList<>();
+
+            byte[] nodeData = nodeName.getBytes(UTF8);
+
+            String zkNode = "/" + nodeId + "-";
+
+            joinOps.add(Op.create(JOIN_HIST_PATH + zkNode, nodeData, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL));
+            joinOps.add(Op.create(ALIVE_NODES_PATH + zkNode, nodeData, 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
+
+            List<OpResult> res = zk.multi(joinOps);
+
+            connectLatch.await();
+
+            System.out.println("Node joined: " + nodeId);
+        } catch (Exception e) {
+            log("Connect failed: " + e);
+
+            e.printStackTrace(System.out);
+        }
+    }
+
+    /**
+     *
+     */
+    public void stop() {
+        try {
+            if (zk != null)
+                zk.close();
+        }
+        catch (Exception e) {
+            log("Closed failed: " + e);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        new ZKClusterNodeNew(args[0]).join(args[1]);
+
+        Thread.sleep(Long.MAX_VALUE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5671b68a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java
index cf95bb6..fdd9ae9 100644
--- a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKDisconnectTest.java
@@ -55,7 +55,7 @@ public class ZKDisconnectTest {
 
                     blockConnect.await();
 
-                    System.out.println("TestClientCnxnSocketNIO finish bloc");
+                    System.out.println("TestClientCnxnSocketNIO finish block");
                 }
                 catch (Exception e) {
                     e.printStackTrace();
@@ -81,7 +81,7 @@ public class ZKDisconnectTest {
 
     public static void main(String[] args) {
         try {
-            TestingCluster zkCluster = new TestingCluster(3);
+            TestingCluster zkCluster = new TestingCluster(1);
             zkCluster.start();
 
             Thread.sleep(1000);
@@ -114,6 +114,9 @@ public class ZKDisconnectTest {
 
             node3.stop();
 
+            ZKClusterNode node4 = new ZKClusterNode("n4");
+            node4.join(zkCluster.getConnectString());
+
             System.out.println("Node stopped");
 
             TestClientCnxnSocketNIO.instance.blockConnect.countDown();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5671b68a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java 
b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java
new file mode 100644
index 0000000..f0ab6e3
--- /dev/null
+++ b/modules/zookeeper/src/test/java/org/apache/zookeeper/ZKSimpleTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.curator.test.TestingCluster;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.zk.ZKClusterNodeNew;
+
+/**
+ *
+ */
+public class ZKSimpleTest {
+    public static void main(String[] args) throws Exception {
+        TestingCluster zkCluster = new TestingCluster(1);
+        zkCluster.start();
+
+        System.out.println("ZK started\n");
+
+        ZKClusterNodeNew node0 = new ZKClusterNodeNew("n0");
+        node0.join(zkCluster.getConnectString());
+
+        ZKClusterNodeNew node1 = new ZKClusterNodeNew("n1");
+        node1.join(zkCluster.getConnectString());
+
+        ZKClusterNodeNew node2 = new ZKClusterNodeNew("n2");
+        node2.join(zkCluster.getConnectString());
+
+        System.out.println("Stop node");
+
+        node1.stop();
+
+        System.out.println("Done");
+
+        Thread.sleep(60_000);
+    }
+}

Reply via email to