Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 1f82a5311 -> 18527db9b


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18527db9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18527db9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18527db9

Branch: refs/heads/ignite-zk
Commit: 18527db9ba5d13b0964ec9c87c8b155295921c9a
Parents: 1f82a53
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 21 16:54:38 2017 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 21 18:34:44 2017 +0300

----------------------------------------------------------------------
 .../discovery/zk/internal/ZkAliveNodeData.java  |  37 ++
 .../zk/internal/ZkDiscoveryEventData.java       |  38 ++
 .../zk/internal/ZkDiscoveryEventsData.java      |   5 +-
 .../discovery/zk/internal/ZkEventAckFuture.java |   7 +-
 .../discovery/zk/internal/ZkIgnitePaths.java    | 128 +++++
 .../spi/discovery/zk/internal/ZkPaths.java      | 116 ----
 .../discovery/zk/internal/ZookeeperClient.java  |  43 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 534 +++++++++++++------
 .../zk/ZookeeperDiscoverySpiBasicTest.java      |  49 ++
 .../zk/internal/ZookeeperClientTest.java        |  23 +
 10 files changed, 694 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
new file mode 100644
index 0000000..45f453f
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class ZkAliveNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    long lastProcEvt = -1;
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkAliveNodeData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
index 3982c90..9f18f4f 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventData.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -39,6 +42,9 @@ abstract class ZkDiscoveryEventData implements Serializable {
     /** */
     private final long topVer;
 
+    /** */
+    private transient Set<Integer> remainingAcks;
+
     /**
      * @param evtType Event type.
      * @param topVer Topology version.
@@ -51,6 +57,38 @@ abstract class ZkDiscoveryEventData implements Serializable {
         this.topVer = topVer;
     }
 
+    void remainingAcks(Collection<ZookeeperClusterNode> nodes) {
+        assert remainingAcks == null : this;
+
+        remainingAcks = U.newHashSet(nodes.size());
+
+        for (ZookeeperClusterNode node : nodes) {
+            if (!node.isLocal() && node.order() <= topVer)
+                remainingAcks.add(node.internalId());
+        }
+    }
+
+    boolean allAcksReceived() {
+        return remainingAcks.isEmpty();
+    }
+
+    boolean onAckReceived(Integer nodeInternalId, long ackEvtId) {
+        assert remainingAcks != null;
+
+        if (ackEvtId >= evtId)
+            remainingAcks.remove(nodeInternalId);
+
+        return remainingAcks.isEmpty();
+    }
+
+    boolean onNodeFail(ZookeeperClusterNode node) {
+        assert remainingAcks != null : this;
+
+        remainingAcks.remove(node.internalId());
+
+        return remainingAcks.isEmpty();
+    }
+
     long eventId() {
         return evtId;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
index d3f07ae..ce21a06 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryEventsData.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.zk.internal;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.TreeMap;
 
 /**
@@ -56,9 +57,11 @@ class ZkDiscoveryEventsData implements Serializable {
     /**
      * @param evt Event.
      */
-    void addEvent(ZkDiscoveryEventData evt) {
+    void addEvent(Collection<ZookeeperClusterNode> nodes, ZkDiscoveryEventData 
evt) {
         Object old = evts.put(evt.eventId(), evt);
 
         assert old == null : old;
+
+        evt.remainingAcks(nodes);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
index fa0da99..c89b586 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkEventAckFuture.java
@@ -51,6 +51,11 @@ public class ZkEventAckFuture extends 
GridFutureAdapter<Void> implements Watcher
     /** */
     private final Set<Integer> remaininAcks;
 
+    /**
+     * @param impl
+     * @param evtPath
+     * @param evtId
+     */
     ZkEventAckFuture(ZookeeperDiscoveryImpl impl, String evtPath, Long evtId) {
         this.impl = impl;
         this.log = impl.log();
@@ -94,8 +99,6 @@ public class ZkEventAckFuture extends GridFutureAdapter<Void> 
implements Watcher
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
         if (super.onDone(res, err)) {
-            impl.removeAckFuture(this);
-
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
new file mode 100644
index 0000000..ad35c05
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkIgnitePaths.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+class ZkIgnitePaths {
+    /** */
+    private static final int UUID_LEN = 36;
+
+    /** */
+    private static final String JOIN_DATA_DIR = "joinData";
+
+    /** */
+    private static final String CUSTOM_EVTS_DIR = "customEvts";
+
+    /** */
+    private static final String ALIVE_NODES_DIR = "alive";
+
+    /** */
+    private static final String DISCO_EVENTS_PATH = "events";
+
+    /** */
+    final String basePath;
+
+    /** */
+    private final String clusterName;
+
+    /** */
+    final String clusterDir;
+
+    /** */
+    final String aliveNodesDir;
+
+    /** */
+    final String joinDataDir;
+
+    /** */
+    final String evtsPath;
+
+    /** */
+    final String customEvtsDir;
+
+    /**
+     * @param basePath Base directory.
+     * @param clusterName Cluster name.
+     */
+    ZkIgnitePaths(String basePath, String clusterName) {
+        this.basePath = basePath;
+        this.clusterName = clusterName;
+
+        clusterDir = basePath + "/" + clusterName;
+
+        aliveNodesDir = zkPath(ALIVE_NODES_DIR);
+        joinDataDir = zkPath(JOIN_DATA_DIR);
+        evtsPath = zkPath(DISCO_EVENTS_PATH);
+        customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
+    }
+
+    /**
+     * @param path Relative path.
+     * @return Full path.
+     */
+    String zkPath(String path) {
+        return basePath + "/" + clusterName + "/" + path;
+    }
+
+    static int aliveInternalId(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    static UUID aliveNodeId(String path) {
+        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    static int aliveJoinSequence(String path) {
+        int idx1 = path.indexOf('|');
+        int idx2 = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx1 + 1, idx2));
+    }
+
+    static int customEventSequence(String path) {
+        int idx = path.lastIndexOf('|');
+
+        return Integer.parseInt(path.substring(idx + 1));
+    }
+
+    static UUID customEventSendNodeId(String path) {
+        String idStr = path.substring(0, ZkIgnitePaths.UUID_LEN);
+
+        return UUID.fromString(idStr);
+    }
+
+    String joinEventDataPath(long evtId) {
+        return evtsPath + "/" + evtId;
+    }
+
+    String joinEventDataPathForJoined(long evtId) {
+        return evtsPath + "/joined-" + evtId;
+    }
+
+    String customEventDataPath(String child) {
+        return customEvtsDir + "/" + child;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
deleted file mode 100644
index 643e10d..0000000
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkPaths.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.zk.internal;
-
-import java.util.UUID;
-
-/**
- *
- */
-class ZkPaths {
-    /** */
-    private static final int UUID_LEN = 36;
-
-    /** */
-    private static final String JOIN_DATA_DIR = "joinData";
-
-    /** */
-    private static final String CUSTOM_EVTS_DIR = "customEvts";
-
-    /** */
-    private static final String ALIVE_NODES_DIR = "alive";
-
-    /** */
-    private static final String DISCO_EVENTS_PATH = "events";
-
-    /** */
-    final String basePath;
-
-    /** */
-    private final String clusterName;
-
-    /** */
-    final String clusterDir;
-
-    /** */
-    final String aliveNodesDir;
-
-    /** */
-    final String joinDataDir;
-
-    /** */
-    final String evtsPath;
-
-    /** */
-    final String customEvtsDir;
-
-    /**
-     * @param basePath Base directory.
-     * @param clusterName Cluster name.
-     */
-    ZkPaths(String basePath, String clusterName) {
-        this.basePath = basePath;
-        this.clusterName = clusterName;
-
-        clusterDir = basePath + "/" + clusterName;
-
-        aliveNodesDir = zkPath(ALIVE_NODES_DIR);
-        joinDataDir = zkPath(JOIN_DATA_DIR);
-        evtsPath = zkPath(DISCO_EVENTS_PATH);
-        customEvtsDir = zkPath(CUSTOM_EVTS_DIR);
-    }
-
-    /**
-     * @param path Relative path.
-     * @return Full path.
-     */
-    String zkPath(String path) {
-        return basePath + "/" + clusterName + "/" + path;
-    }
-
-    static int aliveInternalId(String path) {
-        int idx = path.lastIndexOf('|');
-
-        return Integer.parseInt(path.substring(idx + 1));
-    }
-
-    static UUID aliveNodeId(String path) {
-        String idStr = path.substring(0, ZkPaths.UUID_LEN);
-
-        return UUID.fromString(idStr);
-    }
-
-    static int aliveJoinSequence(String path) {
-        int idx1 = path.indexOf('|');
-        int idx2 = path.lastIndexOf('|');
-
-        return Integer.parseInt(path.substring(idx1 + 1, idx2));
-    }
-
-    static int customEventSequence(String path) {
-        int idx = path.lastIndexOf('|');
-
-        return Integer.parseInt(path.substring(idx + 1));
-    }
-
-    static UUID customEventSendNodeId(String path) {
-        String idStr = path.substring(0, ZkPaths.UUID_LEN);
-
-        return UUID.fromString(idStr);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 4fdc9fc..626b235 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -29,12 +29,14 @@ import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -217,8 +219,33 @@ public class ZookeeperClient implements Watcher {
         }
     }
 
+    void createAllIfNeeded(List<String> paths, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        // TODO ZK: need check for max size?
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths)
+            ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode));
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+
+    }
+
     String createIfNeeded(String path, byte[] data, CreateMode createMode)
-        throws ZookeeperClientFailedException, InterruptedException {
+        throws ZookeeperClientFailedException, InterruptedException
+    {
         if (data == null)
             data = EMPTY_BYTES;
 
@@ -266,14 +293,20 @@ public class ZookeeperClient implements Watcher {
     }
 
 
-    void deleteAllIfExists(String parent, List<String> paths, int ver)
-        throws ZookeeperClientFailedException, InterruptedException
+    void deleteAll(@Nullable String parent, List<String> paths, int ver)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
     {
+        if (paths.isEmpty())
+            return;
+
         // TODO ZK: need check for max size?
         List<Op> ops = new ArrayList<>(paths.size());
 
-        for (String path : paths)
-            ops.add(Op.delete(parent + "/" + path, ver));
+        for (String path : paths) {
+            String path0 = parent != null ? parent + "/" + path : path;
+
+            ops.add(Op.delete(path0, ver));
+        }
 
         for (;;) {
             long connStartTime = this.connStartTime;

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 6f8c07f..8246e19 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -22,17 +22,18 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import org.apache.curator.utils.PathUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -63,10 +64,13 @@ import static org.apache.zookeeper.CreateMode.PERSISTENT;
  */
 public class ZookeeperDiscoveryImpl {
     /** */
+    public static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
+
+    /** */
     private final JdkMarshaller marsh = new JdkMarshaller();
 
     /** */
-    private final ZkPaths zkPaths;
+    private final ZkIgnitePaths zkPaths;
 
     /** */
     private final IgniteLogger log;
@@ -102,19 +106,22 @@ public class ZookeeperDiscoveryImpl {
     private long gridStartTime;
 
     /** */
-    private long lastProcEvt = -1;
-
-    /** */
     private boolean joined;
 
     /** */
-    private ZkDiscoveryEventsData evts;
+    private ZkDiscoveryEventsData evtsData;
 
     /** */
     private boolean crd;
 
     /** */
-    private Map<Long, ZkEventAckFuture> ackFuts = new ConcurrentHashMap<>();
+    private String locNodeZkPath;
+
+    /** */
+    private ZkAliveNodeData locNodeInfo = new ZkAliveNodeData();
+
+    /** */
+    private final int evtsAckThreshold;
 
     /**
      * @param log
@@ -137,7 +144,7 @@ public class ZookeeperDiscoveryImpl {
 
         PathUtils.validatePath(basePath);
 
-        zkPaths = new ZkPaths(basePath, clusterName);
+        zkPaths = new ZkIgnitePaths(basePath, clusterName);
 
         this.log = log.getLogger(getClass());
         this.locNode = locNode;
@@ -147,6 +154,13 @@ public class ZookeeperDiscoveryImpl {
         watcher = new ZkWatcher();
         childrenCallback = new ZKChildrenCallback();
         dataCallback = new ZkDataCallback();
+
+        int evtsAckThreshold = 
IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, 
5);
+
+        if (evtsAckThreshold <= 0)
+            evtsAckThreshold = 1;
+
+        this.evtsAckThreshold = evtsAckThreshold;
     }
 
     IgniteLogger log() {
@@ -184,7 +198,7 @@ public class ZookeeperDiscoveryImpl {
             List<String> children = 
zkClient.getChildren(zkPaths.aliveNodesDir);
 
             for (int i = 0; i < children.size(); i++) {
-                UUID id = ZkPaths.aliveNodeId(children.get(i));
+                UUID id = ZkIgnitePaths.aliveNodeId(children.get(i));
 
                 if (nodeId.equals(id))
                     return true;
@@ -300,17 +314,27 @@ public class ZookeeperDiscoveryImpl {
             if (zkClient.exists(zkPaths.aliveNodesDir))
                 return; // This path is created last, assume all others dirs 
are created.
 
-            zkClient.createIfNeeded(zkPaths.basePath, null, PERSISTENT);
-
-            zkClient.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT);
-
-            zkClient.createIfNeeded(zkPaths.evtsPath, null, PERSISTENT);
-
-            zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT);
-
-            zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT);
-
-            zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT);
+            List<String> dirs = new ArrayList<>();
+
+            dirs.add(zkPaths.basePath);
+            dirs.add(zkPaths.clusterDir);
+            dirs.add(zkPaths.evtsPath);
+            dirs.add(zkPaths.joinDataDir);
+            dirs.add(zkPaths.customEvtsDir);
+            dirs.add(zkPaths.aliveNodesDir);
+
+            zkClient.createAllIfNeeded(dirs, PERSISTENT);
+//            zkClient.createIfNeeded(zkPaths.basePath, null, PERSISTENT);
+//
+//            zkClient.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT);
+//
+//            zkClient.createIfNeeded(zkPaths.evtsPath, null, PERSISTENT);
+//
+//            zkClient.createIfNeeded(zkPaths.joinDataDir, null, PERSISTENT);
+//
+//            zkClient.createIfNeeded(zkPaths.customEvtsDir, null, PERSISTENT);
+//
+//            zkClient.createIfNeeded(zkPaths.aliveNodesDir, null, PERSISTENT);
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
@@ -322,14 +346,16 @@ public class ZookeeperDiscoveryImpl {
      */
     private void startJoin(byte[] joinDataBytes) throws InterruptedException {
         try {
-            zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
-
             // TODO ZK: handle max size.
-            String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + 
locNode.id() + "|", joinDataBytes, EPHEMERAL_SEQUENTIAL);
+            String path = zkClient.createIfNeeded(zkPaths.joinDataDir + "/" + 
locNode.id() + "|",
+                joinDataBytes,
+                EPHEMERAL_SEQUENTIAL);
 
             int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') 
+ 1));
 
-            zkClient.createIfNeeded(zkPaths.aliveNodesDir + "/" + locNode.id() 
+ "|" + seqNum + "|", null, EPHEMERAL_SEQUENTIAL);
+            locNodeZkPath = zkClient.createIfNeeded(zkPaths.aliveNodesDir + 
"/" + locNode.id() + "|" + seqNum + "|",
+                null,
+                EPHEMERAL_SEQUENTIAL);
 
             zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
AsyncCallback.Children2Callback() {
                 @Override public void processResult(int rc, String path, 
Object ctx, List<String> children, Stat stat) {
@@ -337,6 +363,8 @@ public class ZookeeperDiscoveryImpl {
                 }
             });
 
+            zkClient.getDataAsync(zkPaths.evtsPath, watcher, dataCallback);
+
             connStartLatch.countDown();
         }
         catch (ZookeeperClientFailedException e) {
@@ -366,7 +394,7 @@ public class ZookeeperDiscoveryImpl {
         checkIsCoordinator(rc, aliveNodes);
     }
 
-    private void checkIsCoordinator(int rc, List<String> aliveNodes) {
+    private void checkIsCoordinator(int rc, final List<String> aliveNodes) {
         try {
             assert rc == 0 : rc;
 
@@ -375,12 +403,12 @@ public class ZookeeperDiscoveryImpl {
             Integer locInternalId = null;
 
             for (String aliveNodePath : aliveNodes) {
-                Integer internalId = ZkPaths.aliveInternalId(aliveNodePath);
+                Integer internalId = 
ZkIgnitePaths.aliveInternalId(aliveNodePath);
 
                 alives.put(internalId, aliveNodePath);
 
                 if (locInternalId == null) {
-                    UUID nodeId = ZkPaths.aliveNodeId(aliveNodePath);
+                    UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
 
                     if (locNode.id().equals(nodeId))
                         locInternalId = internalId;
@@ -393,7 +421,7 @@ public class ZookeeperDiscoveryImpl {
             Map.Entry<Integer, String> crdE = alives.firstEntry();
 
             if (locInternalId.equals(crdE.getKey()))
-                onBecomeCoordinator(locInternalId);
+                onBecomeCoordinator(aliveNodes, locInternalId);
             else {
                 assert alives.size() > 1;
 
@@ -412,7 +440,7 @@ public class ZookeeperDiscoveryImpl {
                     @Override public void process(WatchedEvent evt) {
                         if (evt.getType() == Event.EventType.NodeDeleted) {
                             try {
-                                onPreviousNodeFail(crdInternalId, 
locInternalId0);
+                                onPreviousNodeFail(aliveNodes, crdInternalId, 
locInternalId0);
                             }
                             catch (Throwable e) {
                                 onFatalError(e);
@@ -425,7 +453,7 @@ public class ZookeeperDiscoveryImpl {
 
                         if (stat == null) {
                             try {
-                                onPreviousNodeFail(crdInternalId, 
locInternalId0);
+                                onPreviousNodeFail(aliveNodes, crdInternalId, 
locInternalId0);
                             }
                             catch (Throwable e) {
                                 onFatalError(e);
@@ -440,30 +468,29 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    private void onPreviousNodeFail(int crdInternalId, int locInternalId) 
throws Exception {
-        if (locInternalId == crdInternalId + 1) {
-            if (log.isInfoEnabled())
-                log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
-
-            onBecomeCoordinator(locInternalId);
-        }
-        else {
-            if (log.isInfoEnabled())
-                log.info("Previous node failed, check is node new coordinator 
[locId=" + locNode.id() + ']');
+    private void onPreviousNodeFail(List<String> aliveNodes, int 
crdInternalId, int locInternalId) throws Exception {
+        // TODO ZK:
+//        if (locInternalId == crdInternalId + 1) {
+//            if (log.isInfoEnabled())
+//                log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
+//
+//            onBecomeCoordinator(aliveNodes, locInternalId);
+//        }
+        if (log.isInfoEnabled())
+            log.info("Previous node failed, check is node new coordinator 
[locId=" + locNode.id() + ']');
 
-            zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
AsyncCallback.Children2Callback() {
-                @Override public void processResult(int rc, String path, 
Object ctx, List<String> children, Stat stat) {
-                    checkIsCoordinator(rc, children);
-                }
-            });
-        }
+        zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
AsyncCallback.Children2Callback() {
+            @Override public void processResult(int rc, String path, Object 
ctx, List<String> children, Stat stat) {
+                checkIsCoordinator(rc, children);
+            }
+        });
     }
 
     /**
      * @param locInternalId Local node's internal ID.
      * @throws Exception If failed.
      */
-    private void onBecomeCoordinator(int locInternalId) throws Exception {
+    private void onBecomeCoordinator(List<String> aliveNodes, int 
locInternalId) throws Exception {
         byte[] evtsData = zkClient.getData(zkPaths.evtsPath);
 
         if (evtsData.length > 0)
@@ -476,7 +503,21 @@ public class ZookeeperDiscoveryImpl {
                 log.info("Node is new discovery coordinator [locId=" + 
locNode.id() + ']');
 
             assert locNode.order() > 0 : locNode;
-            assert evts != null;
+            assert this.evtsData != null;
+
+            Iterator<ZkDiscoveryEventData> it = 
this.evtsData.evts.values().iterator();
+
+            while (it.hasNext()) {
+                ZkDiscoveryEventData evtData = it.next();
+
+                evtData.remainingAcks(top.nodesByOrder.values());
+
+                if (evtData.allAcksReceived()) {
+                    processNodesAckEvent(evtData);
+
+                    it.remove();
+                }
+            }
         }
         else {
             if (log.isInfoEnabled())
@@ -487,6 +528,79 @@ public class ZookeeperDiscoveryImpl {
 
         zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, 
childrenCallback);
         zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, 
childrenCallback);
+
+        for (String alivePath : aliveNodes)
+            watchAliveNodeData(alivePath);
+    }
+
+    /**
+     * @param alivePath
+     */
+    private void watchAliveNodeData(String alivePath) {
+        assert locNodeZkPath != null;
+
+        String path = zkPaths.aliveNodesDir + "/" + alivePath;
+
+        if (!path.equals(locNodeZkPath))
+            zkClient.getDataAsync(path, aliveNodeDataWatcher, 
aliveNodeDataUpdateCallback);
+    }
+
+    /** */
+    private final AliveNodeDataWatcher aliveNodeDataWatcher = new 
AliveNodeDataWatcher();
+
+    /** */
+    private AliveNodeDataUpdateCallback aliveNodeDataUpdateCallback = new 
AliveNodeDataUpdateCallback();
+
+    /**
+     *
+     */
+    private class AliveNodeDataWatcher implements Watcher {
+        @Override public void process(WatchedEvent evt) {
+            if (evt.getType() == Event.EventType.NodeDataChanged) {
+                zkClient.getDataAsync(evt.getPath(), this, 
aliveNodeDataUpdateCallback);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private class AliveNodeDataUpdateCallback implements 
AsyncCallback.DataCallback {
+        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            assert crd;
+
+            if (rc == KeeperException.Code.NONODE.intValue()) {
+                if (log.isDebugEnabled())
+                    log.debug("Alive node callaback, no node: " + path);
+
+                return;
+            }
+
+            assert rc == 0 : rc;
+
+            try {
+                if (data.length > 0) {
+                    ZkAliveNodeData nodeData = unmarshal(data);
+
+                    Integer nodeInternalId = 
ZkIgnitePaths.aliveInternalId(path);
+
+                    Iterator<ZkDiscoveryEventData> it = 
evtsData.evts.values().iterator();
+
+                    while (it.hasNext()) {
+                        ZkDiscoveryEventData evtData = it.next();
+
+                        if (evtData.onAckReceived(nodeInternalId, 
nodeData.lastProcEvt)) {
+                            processNodesAckEvent(evtData);
+
+                            it.remove();
+                        }
+                    }
+                }
+            }
+            catch (Throwable e) {
+                onFatalError(e);
+            }
+        }
     }
 
     /**
@@ -503,35 +617,47 @@ public class ZookeeperDiscoveryImpl {
 
         TreeMap<Long, ZookeeperClusterNode> curTop = new 
TreeMap<>(top.nodesByOrder);
 
-        int evtCnt = evts.evts.size();
+        boolean newEvts = false;
 
         for (String child : aliveNodes) {
-            Integer inernalId = ZkPaths.aliveInternalId(child);
+            Integer inernalId = ZkIgnitePaths.aliveInternalId(child);
 
             Object old = alives.put(inernalId, child);
 
             assert old == null;
 
-            if (!top.nodesByInternalId.containsKey(inernalId))
+            if (!top.nodesByInternalId.containsKey(inernalId)) {
                 generateNodeJoin(curTop, inernalId, child);
+
+                watchAliveNodeData(child);
+
+                newEvts = true;
+            }
         }
 
         for (Map.Entry<Integer, ZookeeperClusterNode> e : 
top.nodesByInternalId.entrySet()) {
-            if (!alives.containsKey(e.getKey()))
-                generateNodeFail(curTop, e.getValue());
+            if (!alives.containsKey(e.getKey())) {
+                ZookeeperClusterNode failedNode = e.getValue();
+
+                processEventAcksOnNodeFail(failedNode);
+
+                generateNodeFail(curTop, failedNode);
+
+                newEvts = true;
+            }
         }
 
-        if (evts.evts.size() > evtCnt) {
+        if (newEvts) {
             long start = System.currentTimeMillis();
 
-            zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1);
+            zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
 
             long time = System.currentTimeMillis() - start;
 
             if (log.isInfoEnabled())
-                log.info("Discovery coordinator saved new topology events 
[topVer=" + evts.topVer + ", saveTime=" + time + ']');
+                log.info("Discovery coordinator saved new topology events 
[topVer=" + evtsData.topVer + ", saveTime=" + time + ']');
 
-            onEventsUpdate(evts);
+            onEventsUpdate(evtsData);
         }
     }
 
@@ -544,14 +670,15 @@ public class ZookeeperDiscoveryImpl {
 
         assert rmvd != null;
 
-        evts.topVer++;
-        evts.evtIdGen++;
+        evtsData.topVer++;
+        evtsData.evtIdGen++;
 
-        ZkDiscoveryEventData evtData = new 
ZkDiscoveryNodeFailEventData(evts.evtIdGen,
-            evts.topVer,
+        ZkDiscoveryNodeFailEventData evtData = new 
ZkDiscoveryNodeFailEventData(
+            evtsData.evtIdGen,
+            evtsData.topVer,
             failedNode.internalId());
 
-        evts.addEvent(evtData);
+        evtsData.addEvent(curTop.values(), evtData);
 
         if (log.isInfoEnabled()) {
             log.info("Generated NODE_FAILED event [topVer=" + 
evtData.topologyVersion() +
@@ -564,8 +691,8 @@ public class ZookeeperDiscoveryImpl {
         String aliveNodePath)
         throws Exception
     {
-        UUID nodeId = ZkPaths.aliveNodeId(aliveNodePath);
-        int joinSeq = ZkPaths.aliveJoinSequence(aliveNodePath);
+        UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+        int joinSeq = ZkIgnitePaths.aliveJoinSequence(aliveNodePath);
 
         String joinDataPath = zkPaths.joinDataDir + '/' + nodeId.toString() + 
"|" + String.format("%010d", joinSeq);
 
@@ -587,10 +714,10 @@ public class ZookeeperDiscoveryImpl {
 
         assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
 
-        evts.topVer++;
-        evts.evtIdGen++;
+        evtsData.topVer++;
+        evtsData.evtIdGen++;
 
-        joinedNode.order(evts.topVer);
+        joinedNode.order(evtsData.topVer);
         joinedNode.internalId(internalId);
 
         DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId);
@@ -614,21 +741,19 @@ public class ZookeeperDiscoveryImpl {
         assert old == null;
 
         ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(
-            evts.evtIdGen,
-            evts.topVer,
+            evtsData.evtIdGen,
+            evtsData.topVer,
             joinedNode.id(),
             joinedNode.internalId());
 
         evtData.joiningNodeData = joiningNodeData;
 
-        evts.addEvent(evtData);
-
-        String evtDataPath = zkPaths.evtsPath + "/" + evtData.eventId();
+        evtsData.addEvent(dataForJoined.topology(), evtData);
 
         long start = System.currentTimeMillis();
 
-        zkClient.createIfNeeded(evtDataPath, joinData, PERSISTENT);
-        zkClient.createIfNeeded(evtDataPath + "/joined", 
marshal(dataForJoined), PERSISTENT);
+        zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), 
joinData, PERSISTENT);
+        
zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()), 
marshal(dataForJoined), PERSISTENT);
 
         long time = System.currentTimeMillis() - start;
 
@@ -651,7 +776,7 @@ public class ZookeeperDiscoveryImpl {
 
         gridStartTime = U.currentTimeMillis();
 
-        evts = new ZkDiscoveryEventsData(gridStartTime, 1L, new TreeMap<Long, 
ZkDiscoveryEventData>());
+        evtsData = new ZkDiscoveryEventsData(gridStartTime, 1L, new 
TreeMap<Long, ZkDiscoveryEventData>());
 
         locNode.internalId(locInternalId);
         locNode.order(1);
@@ -666,10 +791,15 @@ public class ZookeeperDiscoveryImpl {
             null);
 
         joinFut.onDone();
+
+        // TODO ZK: remove join zk nodes
     }
 
+    /**
+     * @throws Exception If failed.
+     */
     private void cleanupPreviousClusterData() throws Exception {
-        // TODO ZK: use multi.
+        // TODO ZK: use multi, better batching.
         zkClient.setData(zkPaths.evtsPath, null, -1);
 
         List<String> evtChildren = zkClient.getChildren(zkPaths.evtsPath);
@@ -680,10 +810,11 @@ public class ZookeeperDiscoveryImpl {
             removeChildren(evtDir);
         }
 
-        zkClient.deleteAllIfExists(zkPaths.evtsPath, evtChildren, -1);
+        zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1);
 
-        for (String evtPath : zkClient.getChildren(zkPaths.customEvtsDir))
-            zkClient.delete(zkPaths.customEvtsDir + "/" + evtPath, -1);
+        zkClient.deleteAll(zkPaths.customEvtsDir,
+            zkClient.getChildren(zkPaths.customEvtsDir),
+            -1);
     }
 
     /**
@@ -691,11 +822,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void removeChildren(String path) throws Exception {
-        zkClient.deleteAllIfExists(path, zkClient.getChildren(path), -1);
-    }
-
-    void removeAckFuture(ZkEventAckFuture fut) {
-        ackFuts.remove(fut.eventId());
+        zkClient.deleteAll(path, zkClient.getChildren(path), -1);
     }
 
     ZkClusterNodes nodes() {
@@ -718,9 +845,9 @@ public class ZookeeperDiscoveryImpl {
         for (int i = 0; i < customEvtNodes.size(); i++) {
             String evtPath = customEvtNodes.get(i);
 
-            int evtSeq = ZkPaths.customEventSequence(evtPath);
+            int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
 
-            if (evtSeq > evts.procCustEvt) {
+            if (evtSeq > evtsData.procCustEvt) {
                 if (newEvts == null)
                     newEvts = new TreeMap<>();
 
@@ -730,7 +857,7 @@ public class ZookeeperDiscoveryImpl {
 
         if (newEvts != null) {
             for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
-                UUID sndNodeId = 
ZkPaths.customEventSendNodeId(evtE.getValue());
+                UUID sndNodeId = 
ZkIgnitePaths.customEventSendNodeId(evtE.getValue());
 
                 ZookeeperClusterNode sndNode = top.nodesById.get(sndNodeId);
 
@@ -744,17 +871,17 @@ public class ZookeeperDiscoveryImpl {
                     try {
                         msg = unmarshal(evtBytes);
 
-                        evts.evtIdGen++;
+                        evtsData.evtIdGen++;
 
                         ZkDiscoveryCustomEventData evtData = new 
ZkDiscoveryCustomEventData(
-                            evts.evtIdGen,
-                            evts.topVer,
+                            evtsData.evtIdGen,
+                            evtsData.topVer,
                             sndNodeId,
                             evtE.getValue());
 
                         evtData.msg = msg;
 
-                        evts.addEvent(evtData);
+                        evtsData.addEvent(top.nodesByOrder.values(), evtData);
 
                         if (log.isInfoEnabled())
                             log.info("Generated CUSTOM event [topVer=" + 
evtData.topologyVersion() + ", evt=" + msg + ']');
@@ -769,19 +896,19 @@ public class ZookeeperDiscoveryImpl {
                     zkClient.deleteIfExists(evtDataPath, -1);
                 }
 
-                evts.procCustEvt = evtE.getKey();
+                evtsData.procCustEvt = evtE.getKey();
             }
 
             long start = System.currentTimeMillis();
 
-            zkClient.setData(zkPaths.evtsPath, marsh.marshal(evts), -1);
+            zkClient.setData(zkPaths.evtsPath, marsh.marshal(evtsData), -1);
 
             long time = System.currentTimeMillis() - start;
 
             if (log.isInfoEnabled())
-                log.info("Discovery coordinator saved new topology events 
[topVer=" + evts.topVer + ", saveTime=" + time + ']');
+                log.info("Discovery coordinator saved new topology events 
[topVer=" + evtsData.topVer + ", saveTime=" + time + ']');
 
-            onEventsUpdate(evts);
+            onEventsUpdate(evtsData);
         }
     }
 
@@ -799,18 +926,26 @@ public class ZookeeperDiscoveryImpl {
 
         onEventsUpdate(evtsData);
 
-        evts = evtsData;
+        this.evtsData = evtsData;
     }
 
+    /** */
+    private int procEvtCnt;
+
     /**
      * @param evtsData Events.
      * @throws Exception If failed.
      */
+    @SuppressWarnings("unchecked")
     private void onEventsUpdate(ZkDiscoveryEventsData evtsData) throws 
Exception {
         TreeMap<Long, ZkDiscoveryEventData> evts = evtsData.evts;
 
-        for (Map.Entry<Long, ZkDiscoveryEventData> e : 
evts.tailMap(lastProcEvt, false).entrySet()) {
-            ZkDiscoveryEventData evtData = e.getValue();
+        boolean updateNodeInfo = false;
+
+        Iterator<ZkDiscoveryEventData> it = 
evts.tailMap(locNodeInfo.lastProcEvt, false).values().iterator();
+
+        while (it.hasNext()) {
+            ZkDiscoveryEventData evtData = it.next();
 
             if (!joined) {
                 if (evtData.eventType() != EventType.EVT_NODE_JOINED)
@@ -818,50 +953,13 @@ public class ZookeeperDiscoveryImpl {
 
                 ZkDiscoveryNodeJoinEventData evtData0 = 
(ZkDiscoveryNodeJoinEventData)evtData;
 
-                UUID joinedId = ((ZkDiscoveryNodeJoinEventData)evtData).nodeId;
+                UUID joinedId = evtData0.nodeId;
 
                 boolean locJoin = evtData.eventType() == 
EventType.EVT_NODE_JOINED &&
                     locNode.id().equals(joinedId);
 
-                if (locJoin) {
-                    if (log.isInfoEnabled())
-                        log.info("Local join event data: " + evtData + ']');
-
-                    String path = zkPaths.evtsPath + "/" + evtData.eventId() + 
"/joined";
-
-                    ZkJoinEventDataForJoined dataForJoined = 
unmarshal(zkClient.getData(path));
-
-                    gridStartTime = evtsData.gridStartTime;
-
-                    locNode.internalId(evtData0.joinedInternalId);
-                    locNode.order(evtData.topologyVersion());
-
-                    DiscoveryDataBag dataBag = new 
DiscoveryDataBag(locNode.id());
-
-                    dataBag.commonData(dataForJoined.discoveryData());
-
-                    exchange.onExchange(dataBag);
-
-                    List<ZookeeperClusterNode> allNodes = 
dataForJoined.topology();
-
-                    for (ZookeeperClusterNode node : allNodes)
-                        top.addNode(node);
-
-                    top.addNode(locNode);
-
-                    List<ClusterNode> topSnapshot = new 
ArrayList<>((Collection)top.nodesByOrder.values());
-
-                    lsnr.onDiscovery(evtData.eventType(),
-                        evtData.topologyVersion(),
-                        locNode,
-                        topSnapshot,
-                        Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                        null);
-
-                    joinFut.onDone();
-
-                    joined = true;
-                }
+                if (locJoin)
+                    processLocalJoin(evtsData, evtData0);
             }
             else {
                 if (log.isInfoEnabled())
@@ -879,7 +977,7 @@ public class ZookeeperDiscoveryImpl {
                             joiningData = evtData0.joiningNodeData;
                         }
                         else {
-                            String path = zkPaths.evtsPath + "/" + 
evtData.eventId();
+                            String path = 
zkPaths.joinEventDataPath(evtData.eventId());
 
                             joiningData = unmarshal(zkClient.getData(path));
 
@@ -896,7 +994,7 @@ public class ZookeeperDiscoveryImpl {
                     }
 
                     case EventType.EVT_NODE_FAILED: {
-                        
notifyNodeFail((ZkDiscoveryNodeFailEventData)e.getValue());
+                        notifyNodeFail((ZkDiscoveryNodeFailEventData)evtData);
 
                         break;
                     }
@@ -912,7 +1010,7 @@ public class ZookeeperDiscoveryImpl {
                             msg = evtData0.msg;
                         }
                         else {
-                            String path = zkPaths.customEvtsDir + "/" + 
evtData0.evtPath;
+                            String path = 
zkPaths.customEventDataPath(evtData0.evtPath);
 
                             msg = unmarshal(zkClient.getData(path));
                         }
@@ -925,13 +1023,78 @@ public class ZookeeperDiscoveryImpl {
                     default:
                         assert false : "Invalid event: " + evtData;
                 }
+
+                if (crd) {
+                    if (evtData.allAcksReceived()) {
+                        processNodesAckEvent(evtData);
+
+                        it.remove();
+                    }
+                }
+            }
+
+            if (joined) {
+                locNodeInfo.lastProcEvt = evtData.eventId();
+
+                procEvtCnt++;
+
+                if (procEvtCnt % evtsAckThreshold == 0)
+                    updateNodeInfo = true;
             }
+        }
 
-            if (joined)
-                lastProcEvt = e.getKey();
+        if (!crd && updateNodeInfo) {
+            assert locNodeZkPath != null;
+
+            zkClient.setData(locNodeZkPath, marshal(locNodeInfo), -1);
         }
     }
 
+    private void processLocalJoin(ZkDiscoveryEventsData evtsData, 
ZkDiscoveryNodeJoinEventData evtData)
+        throws Exception
+    {
+        if (log.isInfoEnabled())
+            log.info("Local join event data: " + evtData + ']');
+
+        String path = zkPaths.joinEventDataPathForJoined(evtData.eventId());
+
+        ZkJoinEventDataForJoined dataForJoined = 
unmarshal(zkClient.getData(path));
+
+        gridStartTime = evtsData.gridStartTime;
+
+        locNode.internalId(evtData.joinedInternalId);
+        locNode.order(evtData.topologyVersion());
+
+        DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id());
+
+        dataBag.commonData(dataForJoined.discoveryData());
+
+        exchange.onExchange(dataBag);
+
+        List<ZookeeperClusterNode> allNodes = dataForJoined.topology();
+
+        for (ZookeeperClusterNode node : allNodes)
+            top.addNode(node);
+
+        top.addNode(locNode);
+
+        List<ClusterNode> topSnapshot = new 
ArrayList<>((Collection)top.nodesByOrder.values());
+
+        lsnr.onDiscovery(evtData.eventType(),
+            evtData.topologyVersion(),
+            locNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+
+        joinFut.onDone();
+
+        joined = true;
+
+        // TODO ZK: async
+        zkClient.deleteIfExists(path, -1);
+    }
+
     /**
      * @param evtData Event data.
      * @param msg Custom message.
@@ -953,19 +1116,6 @@ public class ZookeeperDiscoveryImpl {
             topSnapshot,
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             msg);
-
-        if (crd) {
-            ZkEventAckFuture fut = new ZkEventAckFuture(this,
-                zkPaths.customEvtsDir + "/" + evtData.evtPath,
-                evtData.eventId());
-
-            ackFuts.put(evtData.eventId(), fut);
-        }
-        else {
-            String ackPath = zkPaths.customEvtsDir + "/" + evtData.evtPath + 
"/" + locNode.internalId();
-
-            zkClient.createAsync(ackPath, null, CreateMode.PERSISTENT, null);
-        }
     }
 
     /**
@@ -995,7 +1145,7 @@ public class ZookeeperDiscoveryImpl {
      * @param evtData Event data.
      */
     @SuppressWarnings("unchecked")
-    private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) {
+    private void notifyNodeFail(ZkDiscoveryNodeFailEventData evtData) throws 
Exception {
         ZookeeperClusterNode failedNode = 
top.removeNode(evtData.failedNodeInternalId());
 
         assert failedNode != null;
@@ -1008,14 +1158,74 @@ public class ZookeeperDiscoveryImpl {
             topSnapshot,
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
+    }
+
+    /**
+     * @param evtData
+     * @throws Exception
+     */
+    private void processNodesAckEvent(ZkDiscoveryEventData evtData) throws 
Exception {
+        switch (evtData.eventType()) {
+            case EventType.EVT_NODE_JOINED: {
+                
processNodesAckJoinEvent((ZkDiscoveryNodeJoinEventData)evtData);
+
+                break;
+            }
+
+            case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: {
+                
processNodesAckCustomEvent((ZkDiscoveryCustomEventData)evtData);
+
+                break;
+            }
+
+            case EventType.EVT_NODE_FAILED: {
+                log.info("All nodes processed node fail [evtId=" + 
evtData.eventId() + ']');
+
+                // Do not need cleanup.
+                break;
+            }
+        }
+    }
 
-        if (crd) {
-            for (ZkEventAckFuture ackFut : ackFuts.values())
-                ackFut.onNodeFail(failedNode);
+    /**
+     * @param failedNode Failed node.
+     */
+    private void processEventAcksOnNodeFail(ZookeeperClusterNode failedNode) 
throws Exception {
+        for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = 
evtsData.evts.entrySet().iterator(); it.hasNext();) {
+            Map.Entry<Long, ZkDiscoveryEventData> e = it.next();
+
+            ZkDiscoveryEventData evtData = e.getValue();
+
+            if (evtData.onNodeFail(failedNode)) {
+                processNodesAckEvent(evtData);
+
+                it.remove();
+            }
         }
     }
 
     /**
+     * @param evtData Event data.
+     * @throws Exception If failed.
+     */
+    private void processNodesAckJoinEvent(ZkDiscoveryNodeJoinEventData 
evtData) throws Exception {
+        log.info("All nodes processed node join [evtId=" + evtData.eventId() + 
']');
+
+        zkClient.deleteIfExists(zkPaths.joinEventDataPath(evtData.eventId()), 
-1);
+        
zkClient.deleteIfExists(zkPaths.joinEventDataPathForJoined(evtData.eventId()), 
-1);
+    }
+
+    /**
+     * @param evtData Event data.
+     * @throws Exception If failed.
+     */
+    private void processNodesAckCustomEvent(ZkDiscoveryCustomEventData 
evtData) throws Exception {
+        log.info("All nodes processed custom event [evtId=" + 
evtData.eventId() + ']');
+
+        zkClient.deleteIfExists(zkPaths.customEventDataPath(evtData.evtPath), 
-1);
+    }
+
+    /**
      *
      */
     public void stop() {
@@ -1070,10 +1280,10 @@ public class ZookeeperDiscoveryImpl {
             U.warn(log, "Zookeeper connection loss, local node is SEGMENTED");
 
             if (joined) {
-                assert evts != null;
+                assert evtsData != null;
 
                 lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
-                    evts.topVer,
+                    evtsData.topVer,
                     locNode,
                     Collections.<ClusterNode>emptyList(),
                     Collections.<Long, Collection<ClusterNode>>emptyMap(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
index b0df770..21f88c8 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiBasicTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -58,6 +59,7 @@ import org.apache.zookeeper.ZooKeeper;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD;
 import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
 
 /**
@@ -172,6 +174,20 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD);
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
@@ -234,6 +250,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         Ignite srv0 = startGrid(0);
 
         srv0.createCache(new CacheConfiguration<>("c1"));
+
+        waitForEventsAcks(srv0);
     }
 
     /**
@@ -245,6 +263,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         srv0.createCache(new CacheConfiguration<>("c1"));
 
         awaitPartitionMapExchange();
+
+        waitForEventsAcks(srv0);
     }
 
     /**
@@ -594,6 +614,10 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         waitForTopology(5);
 
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
+
         stopGrid(0);
 
         waitForTopology(4);
@@ -604,6 +628,31 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         startGrid(0);
 
         waitForTopology(5);
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes())));
+    }
+
+    /**
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    private void waitForEventsAcks(final Ignite node) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Map<Object, Object> evts = 
GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(),
+                    "impl", "evtsData", "evts");
+
+                if (!evts.isEmpty()) {
+                    info("Unacked events: " + evts);
+
+                    return false;
+                }
+
+                return true;
+            }
+        }, 10_000));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/18527db9/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
index f85cf5a..8aac456 100644
--- 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -51,6 +52,28 @@ public class ZookeeperClientTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testDeleteAll() throws Exception {
+        startZK(1);
+
+        ZookeeperClient client = new ZookeeperClient(log, 
zkCluster.getConnectString(), 3000, null);
+
+        client.createIfNeeded("/apacheIgnite", null, CreateMode.PERSISTENT);
+        client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT);
+        client.createIfNeeded("/apacheIgnite/2", null, CreateMode.PERSISTENT);
+
+        client.deleteAll("/apacheIgnite", Arrays.asList("1", "2"), -1);
+
+        assertTrue(client.getChildren("/apacheIgnite").isEmpty());
+
+        client.createIfNeeded("/apacheIgnite/1", null, CreateMode.PERSISTENT);
+        client.deleteAll("/apacheIgnite", Arrays.asList("1"), -1);
+
+        assertTrue(client.getChildren("/apacheIgnite").isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConnectionLoss1() throws Exception {
         ZookeeperClient client = new ZookeeperClient(log, "localhost:2200", 
3000, null);
 

Reply via email to