zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fb6bd0ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fb6bd0ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fb6bd0ac

Branch: refs/heads/ignite-zk
Commit: fb6bd0ac39f97db0d7e347aff6fa26edda10f940
Parents: fcee8c8
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 21 13:43:15 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 21 15:09:20 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZkEventAckFuture.java | 139 +++++++++++++++++++
 .../discovery/zk/internal/ZookeeperClient.java  | 121 ++++++++++++++--
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  55 +++++++-
 3 files changed, 296 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fb6bd0ac/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
new file mode 100644
index 0000000..fa0da99
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ZkEventAckFuture extends GridFutureAdapter<Void> implements 
Watcher, AsyncCallback.Children2Callback {
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final ZookeeperDiscoveryImpl impl;
+
+    /** */
+    private final Long evtId;
+
+    /** */
+    private final String evtPath;
+
+    /** */
+    private final int expAcks;
+
+    /** */
+    private final Set<Integer> remaininAcks;
+
+    ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) {
+        this.impl = impl;
+        this.log = impl.log();
+        this.evtPath = evtPath;
+        this.evtId = evtId;
+
+        ZkClusterNodes top = impl.nodes();
+
+        remaininAcks = U.newHashSet(top.nodesById.size());
+
+        for (ZookeeperClusterNode node : top.nodesByInternalId.values()) {
+            if (!node.isLocal())
+                remaininAcks.add(node.internalId());
+        }
+
+        expAcks = remaininAcks.size();
+
+        if (expAcks == 0)
+            onDone();
+        else
+            impl.zkClient().getChildrenAsync(evtPath, this, this);
+    }
+
+    /**
+     * @return Event ID.
+     */
+    Long eventId() {
+        return evtId;
+    }
+
+    /**
+     * @param node Failed node.
+     */
+    void onNodeFail(ZookeeperClusterNode node) {
+        assert !remaininAcks.isEmpty();
+
+        if (remaininAcks.remove(node.internalId()) && remaininAcks.isEmpty())
+            onDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
+        if (super.onDone(res, err)) {
+            impl.removeAckFuture(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void process(WatchedEvent evt) {
+        if (isDone())
+            return;
+
+        if (evt.getType() == Event.EventType.NodeChildrenChanged) {
+            if (evtPath.equals(evt.getPath()))
+                impl.zkClient().getChildrenAsync(evtPath, this, this);
+            else
+                U.warn(log, "Received event for unknown path: " + 
evt.getPath());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+        assert rc == 0 : rc;
+
+        if (isDone())
+            return;
+
+        if (expAcks == stat.getCversion()) {
+            log.info("Received expected number of acks [expCnt=" + expAcks + 
", cVer=" + stat.getCversion() + ']');
+
+            onDone();
+        }
+        else {
+            for (int i = 0; i < children.size(); i++) {
+                Integer nodeInternalId = Integer.parseInt(children.get(i));
+
+                if (remaininAcks.remove(nodeInternalId) && remaininAcks.size() 
== 0)
+                    onDone();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fb6bd0ac/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 6393b90..4fdc9fc 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -27,6 +28,7 @@ import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
@@ -264,6 +266,29 @@ public class ZookeeperClient implements Watcher {
     }
 
 
+    void deleteAllIfExists(String parent, List<String> paths, int ver)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        // TODO ZK: need check for max size?
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths)
+            ops.add(Op.delete(parent + "/" + path, ver));
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
     void delete(String path, int ver)
         throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
     {
@@ -331,7 +356,7 @@ public class ZookeeperClient implements Watcher {
     void getChildrenAsync(String path, Watcher watcher, 
AsyncCallback.Children2Callback cb) {
         GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb);
 
-        zk.getChildren(path, watcher, new ChildreCallbackWrapper(op), null);
+        zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null);
     }
 
     void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback 
cb) {
@@ -340,6 +365,15 @@ public class ZookeeperClient implements Watcher {
         zk.getData(path, watcher, new DataCallbackWrapper(op), null);
     }
 
+    void createAsync(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        CreateOperation op = new CreateOperation(path, data, createMode, cb);
+
+        zk.create(path, data, ZK_ACL, createMode, new 
CreateCallbackWrapper(op), null);
+    }
+
     /**
      *
      */
@@ -437,6 +471,29 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
+    private void closeClient() {
+        try {
+            zk.close();
+        }
+        catch (Exception closeErr) {
+            U.warn(log, "Failed to close zookeeper client: " + closeErr, 
closeErr);
+        }
+
+        connTimer.cancel();
+    }
+
+    /**
+     *
+     */
+    private void scheduleConnectionCheck() {
+        assert state == ConnectionState.Disconnected : state;
+
+        connTimer.schedule(new ConnectionTimeoutTask(connStartTime), 
connLossTimeout);
+    }
+
+    /**
+     *
+     */
     interface ZkAsyncOperation {
         /**
          *
@@ -532,37 +589,75 @@ public class ZookeeperClient implements Watcher {
     /**
      *
      */
-    private void closeClient() {
-        try {
-            zk.close();
-        }
-        catch (Exception closeErr) {
-            U.warn(log, "Failed to close zookeeper client: " + closeErr, 
closeErr);
+    class CreateOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final byte[] data;
+
+        /** */
+        private final CreateMode createMode;
+
+        /** */
+        private final AsyncCallback.StringCallback cb;
+
+        CreateOperation(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
+            this.path = path;
+            this.data = data;
+            this.createMode = createMode;
+            this.cb = cb;
         }
 
-        connTimer.cancel();
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            createAsync(path, data, createMode, cb);
+        }
     }
 
     /**
      *
      */
-    private void scheduleConnectionCheck() {
-        assert state == ConnectionState.Disconnected : state;
+    class CreateCallbackWrapper implements AsyncCallback.StringCallback {
+        /** */
+        final CreateOperation op;
 
-        connTimer.schedule(new ConnectionTimeoutTask(connStartTime), 
connLossTimeout);
+        /**
+         * @param op Operation.
+         */
+        CreateCallbackWrapper(CreateOperation op) {
+            this.op = op;
+        }
+
+        @Override public void processResult(int rc, String path, Object ctx, 
String name) {
+            if (rc == KeeperException.Code.NODEEXISTS.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else {
+                if (op.cb != null)
+                    op.cb.processResult(rc, path, ctx, name);
+            }
+        }
     }
 
     /**
      *
      */
-    class ChildreCallbackWrapper implements AsyncCallback.Children2Callback {
+    class ChildrenCallbackWrapper implements AsyncCallback.Children2Callback {
         /** */
         private final GetChildrenOperation op;
 
         /**
          * @param op Operation.
          */
-        private ChildreCallbackWrapper(GetChildrenOperation op) {
+        private ChildrenCallbackWrapper(GetChildrenOperation op) {
             this.op = op;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fb6bd0ac/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index ece71f9..6f8c07f 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import org.apache.curator.utils.PathUtils;
 import org.apache.ignite.IgniteCheckedException;
@@ -112,6 +113,9 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private boolean crd;
 
+    /** */
+    private Map<Long, ZkEventAckFuture> ackFuts = new ConcurrentHashMap<>();
+
     /**
      * @param log
      * @param basePath
@@ -145,6 +149,10 @@ public class ZookeeperDiscoveryImpl {
         dataCallback = new ZkDataCallback();
     }
 
+    IgniteLogger log() {
+        return log;
+    }
+
     public ClusterNode localNode() {
         return locNode;
     }
@@ -664,21 +672,38 @@ public class ZookeeperDiscoveryImpl {
         // TODO ZK: use multi.
         zkClient.setData(zkPaths.evtsPath, null, -1);
 
-        for (String evtPath : zkClient.getChildren(zkPaths.evtsPath)) {
+        List<String> evtChildren = zkClient.getChildren(zkPaths.evtsPath);
+
+        for (String evtPath : evtChildren) {
             String evtDir = zkPaths.evtsPath + "/" + evtPath;
 
             removeChildren(evtDir);
-
-            zkClient.delete(evtDir, -1);
         }
 
+        zkClient.deleteAllIfExists(zkPaths.evtsPath, evtChildren, -1);
+
         for (String evtPath : zkClient.getChildren(zkPaths.customEvtsDir))
             zkClient.delete(zkPaths.customEvtsDir + "/" + evtPath, -1);
     }
 
+    /**
+     * @param path Path.
+     * @throws Exception If failed.
+     */
     private void removeChildren(String path) throws Exception {
-        for (String childPath : zkClient.getChildren(path))
-            zkClient.delete(path + "/" + childPath, -1);
+        zkClient.deleteAllIfExists(path, zkClient.getChildren(path), -1);
+    }
+
+    void removeAckFuture(ZkEventAckFuture fut) {
+        ackFuts.remove(fut.eventId());
+    }
+
+    ZkClusterNodes nodes() {
+        return top;
+    }
+
+    ZookeeperClient zkClient() {
+        return zkClient;
     }
 
     /**
@@ -914,7 +939,7 @@ public class ZookeeperDiscoveryImpl {
     @SuppressWarnings("unchecked")
     private void notifyCustomEvent(ZkDiscoveryCustomEventData evtData, 
DiscoverySpiCustomMessage msg) {
         if (log.isInfoEnabled())
-            log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + 
msg.getClass().getSimpleName() + ']');
+            log.info(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg 
+ ']');
 
         ZookeeperClusterNode sndNode = top.nodesById.get(evtData.sndNodeId);
 
@@ -928,6 +953,19 @@ public class ZookeeperDiscoveryImpl {
             topSnapshot,
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             msg);
+
+        if (crd) {
+            ZkEventAckFuture fut = new ZkEventAckFuture(this,
+                zkPaths.customEvtsDir + "/" + evtData.evtPath,
+                evtData.eventId());
+
+            ackFuts.put(evtData.eventId(), fut);
+        }
+        else {
+            String ackPath = zkPaths.customEvtsDir + "/" + evtData.evtPath + 
"/" + locNode.internalId();
+
+            zkClient.createAsync(ackPath, null, CreateMode.PERSISTENT, null);
+        }
     }
 
     /**
@@ -970,6 +1008,11 @@ public class ZookeeperDiscoveryImpl {
             topSnapshot,
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
+
+        if (crd) {
+            for (ZkEventAckFuture ackFut : ackFuts.values())
+                ackFut.onNodeFail(failedNode);
+        }
     }
 
     /**

Reply via email to