Repository: ignite
Updated Branches:
  refs/heads/ignite-zk be7ae489b -> e909027fa


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e909027f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e909027f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e909027f

Branch: refs/heads/ignite-zk
Commit: e909027fa4268c1e72f4d756b7b2da24bbf89ae3
Parents: be7ae48
Author: sboikov <[email protected]>
Authored: Tue Dec 5 10:47:13 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue Dec 5 14:00:55 2017 +0300

----------------------------------------------------------------------
 .../spi/discovery/zk/ZookeeperDiscoverySpi.java |   8 +-
 .../ZkInternalForceNodeFailMessage.java         |   2 +-
 .../zk/internal/ZkInternalJoinErrorMessage.java |   8 +-
 .../zk/internal/ZkInternalMessage.java          |   4 +-
 .../discovery/zk/internal/ZkRuntimeState.java   |  10 +-
 .../discovery/zk/internal/ZookeeperClient.java  |   5 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 570 ++++++++++++-------
 .../ZookeeperDiscoverySpiBasicTest.java         |  37 +-
 8 files changed, 425 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index f62706e..bd7f427 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -193,6 +193,8 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter 
implements Discovery
             consistentId = ignite.configuration().getConsistentId();
 
             if (consistentId == null) {
+                initAddresses();
+
                 final List<String> sortedAddrs = new ArrayList<>(addrs.get1());
 
                 Collections.sort(sortedAddrs);
@@ -344,15 +346,13 @@ public class ZookeeperDiscoverySpi extends 
IgniteSpiAdapter implements Discovery
             ", rootPath=" + zkRootPath + ']');
 
         impl = new ZookeeperDiscoveryImpl(
+            this,
             igniteInstanceName,
-            zkConnectionString,
-            sesTimeout,
             log,
             zkRootPath,
             locNode,
             lsnr,
-            exchange,
-            locNode.isClient() && !clientReconnectDisabled);
+            exchange);
 
         try {
             impl.joinTopology();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
index fafcafc..f2fb183 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalForceNodeFailMessage.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public class ZkInternalForceNodeFailMessage implements ZkInternalMessage {
+public class ZkInternalForceNodeFailMessage  implements 
DiscoverySpiCustomMessage, ZkInternalMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
index 7e06858..e724673 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java
@@ -17,20 +17,18 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
-import java.io.Serializable;
-
 /**
  *
  */
-class ZkInternalJoinErrorMessage implements Serializable {
+class ZkInternalJoinErrorMessage implements ZkInternalMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
-    private final int nodeInternalId;
+    final int nodeInternalId;
 
     /** */
-    private final String err;
+    final String err;
 
     /**
      * @param nodeInternalId Joining node internal ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
index e56bab0..c1d56f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import java.io.Serializable;
 
 /**
  *
  */
-interface ZkInternalMessage extends DiscoverySpiCustomMessage {
+interface ZkInternalMessage extends Serializable {
     // No-op.
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
index d2d0372..660dc42 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.spi.discovery.zk.internal;
 
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+
 /**
  *
  */
@@ -28,10 +30,16 @@ class ZkRuntimeState {
     ZookeeperClient zkClient;
 
     /** */
+    int internalOrder;
+
+    /** */
+    IgniteSpiTimeoutObject joinTimeoutObj;
+
+    /** */
     long gridStartTime;
 
     /** */
-    boolean joined;
+    volatile boolean joined;
 
     /** */
     ZkDiscoveryEventsData evtsData;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
index 229e5c4..bc024f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -478,7 +478,7 @@ public class ZookeeperClient implements Watcher {
      * @throws InterruptedException If interrupted.
      */
     void setData(String path, byte[] data, int ver)
-        throws ZookeeperClientFailedException, InterruptedException
+        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NoNodeException
     {
         if (data == null)
             data = EMPTY_BYTES;
@@ -491,6 +491,9 @@ public class ZookeeperClient implements Watcher {
 
                 return;
             }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
             catch (Exception e) {
                 onZookeeperError(connStartTime, e);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index ef67ec4..6c9d53a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -44,17 +45,21 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.IgniteSpiThread;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -77,6 +82,9 @@ public class ZookeeperDiscoveryImpl {
     static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
 
     /** */
+    private final ZookeeperDiscoverySpi spi;
+
+    /** */
     private final String igniteInstanceName;
 
     /** */
@@ -128,7 +136,10 @@ public class ZookeeperDiscoveryImpl {
     private final int evtsAckThreshold;
 
     /** */
-    private ZkRuntimeState state;
+    private IgniteThreadPoolExecutor utilityPool;
+
+    /** */
+    private ZkRuntimeState rtState;
 
     /** */
     private volatile ConnectionState connState = ConnectionState.STARTED;
@@ -150,15 +161,13 @@ public class ZookeeperDiscoveryImpl {
      * @param exchange Discovery data exchange.
      */
     public ZookeeperDiscoveryImpl(
+        ZookeeperDiscoverySpi spi,
         String igniteInstanceName,
-        String connectString,
-        int sesTimeout,
         IgniteLogger log,
         String zkRootPath,
         ZookeeperClusterNode locNode,
         DiscoverySpiListener lsnr,
-        DiscoverySpiDataExchange exchange,
-        boolean clientReconnectEnabled) {
+        DiscoverySpiDataExchange exchange) {
         assert locNode.id() != null && locNode.isLocal() : locNode;
 
         MarshallerUtils.setNodeName(marsh, igniteInstanceName);
@@ -167,14 +176,15 @@ public class ZookeeperDiscoveryImpl {
 
         zkPaths = new ZkIgnitePaths(zkRootPath);
 
+        this.spi = spi;
         this.igniteInstanceName = igniteInstanceName;
-        this.connectString = connectString;
-        this.sesTimeout = sesTimeout;
+        this.connectString = spi.getZkConnectionString();
+        this.sesTimeout = spi.getSessionTimeout();
         this.log = log.getLogger(getClass());
         this.locNode = locNode;
         this.lsnr = lsnr;
         this.exchange = exchange;
-        this.clientReconnectEnabled = clientReconnectEnabled;
+        this.clientReconnectEnabled = locNode.isClient() && 
!spi.isClientReconnectDisabled();
 
         watcher = new ZkWatcher();
         childrenCallback = new ZKChildrenCallback();
@@ -209,7 +219,7 @@ public class ZookeeperDiscoveryImpl {
     @Nullable public ClusterNode node(UUID nodeId) {
         assert nodeId != null;
 
-        return state.top.nodesById.get(nodeId);
+        return rtState.top.nodesById.get(nodeId);
     }
 
     /**
@@ -228,7 +238,7 @@ public class ZookeeperDiscoveryImpl {
      * @param warning Warning.
      */
     public void failNode(UUID nodeId, @Nullable String warning) {
-        ZookeeperClusterNode node = state.top.nodesById.get(nodeId);
+        ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId);
 
         if (node == null) {
             if (log.isDebugEnabled())
@@ -259,13 +269,13 @@ public class ZookeeperDiscoveryImpl {
                 return;
         }
 
-        state.zkClient.onCloseStart();
+        rtState.zkClient.onCloseStart();
 
         busyLock.block();
 
         busyLock.unblock();
 
-        state.zkClient.close();
+        rtState.zkClient.close();
 
         UUID newId = UUID.randomUUID();
 
@@ -274,20 +284,20 @@ public class ZookeeperDiscoveryImpl {
             ", prevId=" + locNode.id() +
             ", locNode=" + locNode + ']');
 
-        new ReconnectorThread(newId).start();
+        runInWorkerThread(new ReconnectClosure(newId));
     }
 
     /**
      * @param newId New ID.
      */
     private void doReconnect(UUID newId) {
-        if (state.joined) {
-            assert state.evtsData != null;
+        if (rtState.joined) {
+            assert rtState.evtsData != null;
 
             lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
-                state.evtsData.topVer,
+                rtState.evtsData.topVer,
                 locNode,
-                state.top.topologySnapshot(),
+                rtState.top.topologySnapshot(),
                 Collections.<Long, Collection<ClusterNode>>emptyMap(),
                 null);
         }
@@ -295,7 +305,7 @@ public class ZookeeperDiscoveryImpl {
         try {
             locNode.onClientDisconnected(newId);
 
-            joinTopology0(state.joined);
+            joinTopology0(rtState.joined);
         }
         catch (Exception e) {
             U.error(log, "Failed to reconnect: " + e, e);
@@ -309,7 +319,7 @@ public class ZookeeperDiscoveryImpl {
      * @param e Error.
      */
     private void onSegmented(Exception e) {
-        if (state.joined) {
+        if (rtState.joined) {
             synchronized (stateMux) {
                 connState = ConnectionState.STOPPED;
             }
@@ -329,13 +339,16 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
+    /**
+     *
+     */
     private void notifySegmented() {
-        assert state.evtsData != null;
+        assert rtState.evtsData != null;
 
         lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
-            state.evtsData.topVer,
+            rtState.evtsData.topVer,
             locNode,
-            state.top.topologySnapshot(),
+            rtState.top.topologySnapshot(),
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
     }
@@ -346,7 +359,7 @@ public class ZookeeperDiscoveryImpl {
     public Collection<ClusterNode> remoteNodes() {
         checkState();
 
-        return state.top.remoteNodes();
+        return rtState.top.remoteNodes();
     }
 
     /**
@@ -374,7 +387,7 @@ public class ZookeeperDiscoveryImpl {
             checkState();
 
         try {
-            List<String> children = 
state.zkClient.getChildren(zkPaths.aliveNodesDir);
+            List<String> children = 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir);
 
             for (int i = 0; i < children.size(); i++) {
                 UUID id = ZkIgnitePaths.aliveNodeId(children.get(i));
@@ -410,7 +423,7 @@ public class ZookeeperDiscoveryImpl {
         byte[] msgBytes;
 
         try {
-            msgBytes = U.zip(marshal(msg));
+            msgBytes = marshalZip(msg);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom message: " 
+ msg, e);
@@ -422,7 +435,7 @@ public class ZookeeperDiscoveryImpl {
         try {
             String prefix = UUID.randomUUID().toString();
 
-            state.zkClient.createSequential(prefix,
+            rtState.zkClient.createSequential(prefix,
                 zkPaths.customEvtsDir,
                 prefix + ":" + locNode.id() + '|',
                 msgBytes,
@@ -448,7 +461,7 @@ public class ZookeeperDiscoveryImpl {
      * @return Cluster start time.
      */
     public long gridStartTime() {
-        return state.gridStartTime;
+        return rtState.gridStartTime;
     }
 
     /**
@@ -473,6 +486,8 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * @param prevJoined {@code True} if reconnect after already joined 
topology
+     *    in this case (need produce EVT_CLIENT_NODE_RECONNECTED event).
      * @throws InterruptedException If interrupted.
      */
     private void joinTopology0(boolean prevJoined) throws InterruptedException 
{
@@ -481,7 +496,7 @@ public class ZookeeperDiscoveryImpl {
         if (internalLsnr != null)
             internalLsnr.beforeJoin(log);
 
-        state = new ZkRuntimeState(prevJoined);
+        rtState = new ZkRuntimeState(prevJoined);
 
         DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id());
 
@@ -492,14 +507,14 @@ public class ZookeeperDiscoveryImpl {
         byte[] joinDataBytes;
 
         try {
-            joinDataBytes = U.zip(marshal(joinData));
+            joinDataBytes = marshalZip(joinData);
         }
         catch (Exception e) {
             throw new IgniteSpiException("Failed to marshal joining node 
data", e);
         }
 
         try {
-            state.zkClient = new ZookeeperClient(igniteInstanceName,
+            rtState.zkClient = new ZookeeperClient(igniteInstanceName,
                 log,
                 connectString,
                 sesTimeout,
@@ -517,7 +532,7 @@ public class ZookeeperDiscoveryImpl {
      */
     private void initZkNodes() throws InterruptedException {
         try {
-            if (state.zkClient.exists(zkPaths.aliveNodesDir))
+            if (rtState.zkClient.exists(zkPaths.aliveNodesDir))
                 return; // This path is created last, assume all others dirs 
are created.
 
             List<String> dirs = new ArrayList<>();
@@ -532,14 +547,14 @@ public class ZookeeperDiscoveryImpl {
             dirs.add(zkPaths.aliveNodesDir);
 
             try {
-                state.zkClient.createAll(dirs, PERSISTENT);
+                rtState.zkClient.createAll(dirs, PERSISTENT);
             }
             catch (KeeperException.NodeExistsException e) {
                 if (log.isDebugEnabled())
                     log.debug("Failed to create nodes using bulk operation: " 
+ e);
 
                 for (String dir : dirs)
-                    state.zkClient.createIfNeeded(dir, null, PERSISTENT);
+                    rtState.zkClient.createIfNeeded(dir, null, PERSISTENT);
             }
         }
         catch (ZookeeperClientFailedException e) {
@@ -562,15 +577,18 @@ public class ZookeeperDiscoveryImpl {
 
             // TODO ZK: handle max size.
 
-            String path = state.zkClient.createSequential(prefix,
+            final ZkRuntimeState rtState = this.rtState;
+
+            String joinDataPath = rtState.zkClient.createSequential(prefix,
                 zkPaths.joinDataDir,
                 prefix + ":" + locNode.id() + "|",
                 joinDataBytes,
                 EPHEMERAL_SEQUENTIAL);
 
-            int seqNum = Integer.parseInt(path.substring(path.lastIndexOf('|') 
+ 1));
+            // TODO ZK: no need to use sequential
+            int seqNum = 
Integer.parseInt(joinDataPath.substring(joinDataPath.lastIndexOf('|') + 1));
 
-            state.locNodeZkPath = state.zkClient.createSequential(
+            rtState.locNodeZkPath = rtState.zkClient.createSequential(
                 prefix,
                 zkPaths.aliveNodesDir,
                 prefix + ":" + locNode.id() + "|" + seqNum + "|",
@@ -580,11 +598,24 @@ public class ZookeeperDiscoveryImpl {
             log.info("Node started join [nodeId=" + locNode.id() +
                 ", instanceName=" + 
locNode.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME) +
                 ", joinDataSize=" + joinDataBytes.length +
-                ", nodePath=" + state.locNodeZkPath + ']');
+                ", nodePath=" + rtState.locNodeZkPath + ']');
 
-            state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckCoordinatorCallback());
+            rtState.internalOrder = 
ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath);
 
-            state.zkClient.getDataAsync(zkPaths.evtsPath, watcher, 
dataCallback);
+            /*
+            If node can not join due to some validation error this error is 
reported in join data,
+            As a minor optimization do not start watch this immediately, but 
only if do not receive
+            join event after timeout.
+             */
+            rtState.joinTimeoutObj = new CheckJoinStateTimeoutObject(
+                joinDataPath,
+                rtState);
+
+            spi.getSpiContext().addTimeoutObject(rtState.joinTimeoutObj);
+
+            rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckCoordinatorCallback());
+
+            rtState.zkClient.getDataAsync(zkPaths.evtsPath, watcher, 
dataCallback);
         }
         catch (ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
@@ -596,6 +627,95 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
+    /**
+     *
+     */
+    private class CheckJoinStateTimeoutObject implements 
IgniteSpiTimeoutObject, Watcher, AsyncCallback.DataCallback {
+        /** */
+        private final IgniteUuid id = IgniteUuid.randomUuid();
+
+        /** */
+        private final long endTime = System.currentTimeMillis() + 5000;
+
+        /** */
+        private final String joinDataPath;
+
+        /** */
+        private final ZkRuntimeState rtState;
+
+        /**
+         * @param joinDataPath Node joined data path.
+         * @param rtState State.
+         */
+        CheckJoinStateTimeoutObject(String joinDataPath, ZkRuntimeState 
rtState) {
+            this.joinDataPath = joinDataPath;
+            this.rtState = rtState;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            if (rtState.joined)
+                return;
+
+            synchronized (stateMux) {
+                if (connState != ConnectionState.STARTED)
+                    return;
+            }
+
+            rtState.zkClient.getDataAsync(joinDataPath, this, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            if (rc != 0)
+                return;
+
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                Object obj = unmarshalZip(data);
+
+                if (obj instanceof ZkInternalJoinErrorMessage) {
+                    ZkInternalJoinErrorMessage joinErr = 
(ZkInternalJoinErrorMessage)obj;
+
+                    onSegmented(new IgniteSpiException(joinErr.err));
+                }
+
+                busyLock.leaveBusy();
+            }
+            catch (Throwable e) {
+                onFatalError(busyLock, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void process(WatchedEvent evt) {
+            if (!busyLock.enterBusy())
+                return;
+
+            try {
+                if (evt.getType() == Event.EventType.NodeDataChanged)
+                    rtState.zkClient.getDataAsync(evt.getPath(), this, this);
+
+                busyLock.leaveBusy();
+            }
+            catch (Throwable e) {
+                onFatalError(busyLock, e);
+            }
+        }
+    }
+
     /** TODO ZK */
     private final CountDownLatch connStartLatch = new CountDownLatch(1);
 
@@ -618,7 +738,7 @@ public class ZookeeperDiscoveryImpl {
 
         TreeMap<Integer, String> alives = new TreeMap<>();
 
-        int locInternalId = ZkIgnitePaths.aliveInternalId(state.locNodeZkPath);
+        int locInternalId = 
ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath);
 
         for (String aliveNodePath : aliveNodes) {
             Integer internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
@@ -645,7 +765,7 @@ public class ZookeeperDiscoveryImpl {
 
             PreviousNodeWatcher watcher = new PreviousNodeWatcher();
 
-            state.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), watcher, watcher);
+            rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), watcher, watcher);
         }
     }
 
@@ -660,7 +780,7 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Previous node failed, check is node new coordinator 
[locId=" + locNode.id() + ']');
 
-        state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckCoordinatorCallback());
+        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckCoordinatorCallback());
     }
 
     /**
@@ -669,22 +789,22 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void onBecomeCoordinator(List<String> aliveNodes, int 
locInternalId) throws Exception {
-        byte[] evtsDataBytes = state.zkClient.getData(zkPaths.evtsPath);
+        byte[] evtsDataBytes = rtState.zkClient.getData(zkPaths.evtsPath);
 
         if (evtsDataBytes.length > 0)
             processNewEvents(evtsDataBytes);
 
-        state.crd = true;
+        rtState.crd = true;
 
-        if (state.joined) {
+        if (rtState.joined) {
             if (log.isInfoEnabled())
                 log.info("Node is new discovery coordinator [locId=" + 
locNode.id() + ']');
 
             assert locNode.order() > 0 : locNode;
-            assert state.evtsData != null;
+            assert rtState.evtsData != null;
 
-            for (ZkDiscoveryEventData evtData : state.evtsData.evts.values())
-                evtData.initRemainingAcks(state.top.nodesByOrder.values());
+            for (ZkDiscoveryEventData evtData : rtState.evtsData.evts.values())
+                evtData.initRemainingAcks(rtState.top.nodesByOrder.values());
 
             handleProcessedEvents("crd");
         }
@@ -695,8 +815,8 @@ public class ZookeeperDiscoveryImpl {
             newClusterStarted(locInternalId);
         }
 
-        state.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, 
childrenCallback);
-        state.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, 
childrenCallback);
+        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, watcher, 
childrenCallback);
+        rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, watcher, 
childrenCallback);
 
         for (String alivePath : aliveNodes)
             watchAliveNodeData(alivePath);
@@ -706,12 +826,12 @@ public class ZookeeperDiscoveryImpl {
      * @param alivePath
      */
     private void watchAliveNodeData(String alivePath) {
-        assert state.locNodeZkPath != null;
+        assert rtState.locNodeZkPath != null;
 
         String path = zkPaths.aliveNodesDir + "/" + alivePath;
 
-        if (!path.equals(state.locNodeZkPath))
-            state.zkClient.getDataAsync(path, aliveNodeDataWatcher, 
aliveNodeDataWatcher);
+        if (!path.equals(rtState.locNodeZkPath))
+            rtState.zkClient.getDataAsync(path, aliveNodeDataWatcher, 
aliveNodeDataWatcher);
     }
 
     /**
@@ -719,14 +839,14 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void generateTopologyEvents(List<String> aliveNodes) throws 
Exception {
-        assert state.crd;
+        assert rtState.crd;
 
         if (log.isInfoEnabled())
             log.info("Process alive nodes change: " + aliveNodes.size());
 
         TreeMap<Integer, String> alives = new TreeMap<>();
 
-        TreeMap<Long, ZookeeperClusterNode> curTop = new 
TreeMap<>(state.top.nodesByOrder);
+        TreeMap<Long, ZookeeperClusterNode> curTop = new 
TreeMap<>(rtState.top.nodesByOrder);
 
         boolean newEvts = false;
 
@@ -737,16 +857,15 @@ public class ZookeeperDiscoveryImpl {
 
             assert old == null;
 
-            if (!state.top.nodesByInternalId.containsKey(internalId)) {
-                processJoinOnCoordinator(curTop, internalId, child);
-
-                newEvts = true;
+            if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
+                if (processJoinOnCoordinator(curTop, internalId, child))
+                    newEvts = true;
             }
         }
 
         List<ZookeeperClusterNode> failedNodes = null;
 
-        for (Map.Entry<Integer, ZookeeperClusterNode> e : 
state.top.nodesByInternalId.entrySet()) {
+        for (Map.Entry<Integer, ZookeeperClusterNode> e : 
rtState.top.nodesByInternalId.entrySet()) {
             if (!alives.containsKey(e.getKey())) {
                 ZookeeperClusterNode failedNode = e.getValue();
 
@@ -774,7 +893,7 @@ public class ZookeeperDiscoveryImpl {
      * @param aliveNodePath Joined node path.
      * @throws Exception If failed.
      */
-    private void processJoinOnCoordinator(TreeMap<Long, ZookeeperClusterNode> 
curTop,
+    private boolean processJoinOnCoordinator(TreeMap<Long, 
ZookeeperClusterNode> curTop,
         int internalId,
         String aliveNodePath) throws Exception {
         UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
@@ -783,20 +902,29 @@ public class ZookeeperDiscoveryImpl {
         byte[] joinData;
 
         try {
-            joinData = state.zkClient.getData(joinDataPath);
+            joinData = rtState.zkClient.getData(joinDataPath);
         }
         catch (KeeperException.NoNodeException e) {
             U.warn(log, "Failed to read joining node data, node left before 
join process finished: " + nodeId);
 
-            return;
+            return false;
         }
 
         String err = null;
 
-        ZkJoiningNodeData joiningNodeData = null;
+        Object dataObj = null;
 
         try {
-            joiningNodeData = unmarshalZip(joinData);
+            dataObj = unmarshalZip(joinData);
+
+            if (dataObj instanceof ZkInternalJoinErrorMessage) {
+                if (log.isInfoEnabled())
+                    log.info("Ignore join data, node was failed by previous 
coordinator: " + aliveNodePath);
+
+                zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
+
+                return false;
+            }
         }
         catch (Exception e) {
             U.error(log, "Failed to unmarshal joining node data [nodePath=" + 
aliveNodePath + "']", e);
@@ -804,11 +932,12 @@ public class ZookeeperDiscoveryImpl {
             err = "Failed to unmarshal join data: " + e;
         }
 
-        if (err == null) {
-            assert joiningNodeData != null;
+        assert dataObj instanceof ZkJoiningNodeData : dataObj;
 
+        ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
+
+        if (err == null)
             err = validateJoiningNode(joiningNodeData.node());
-        }
 
         if (err == null) {
             ZookeeperClusterNode joinedNode = joiningNodeData.node();
@@ -818,20 +947,31 @@ public class ZookeeperDiscoveryImpl {
             generateNodeJoin(curTop, joinData, joiningNodeData, internalId);
 
             watchAliveNodeData(aliveNodePath);
+
+            return true;
         }
         else {
             ZkInternalJoinErrorMessage msg = new 
ZkInternalJoinErrorMessage(internalId, err);
 
-            // IgniteNodeValidationResult err = 
spi.getSpiContext().validateNode(node);
+            try {
+                zkClient().setData(joinDataPath, marshalZip(msg), -1);
+            }
+            catch (KeeperException.NoNodeException e) {
+                // Ignore, node already failed.
+            }
+
+            zkClient().deleteIfExists(zkPaths.aliveNodesDir + "/" + 
aliveNodePath, -1);
+
+            return false;
         }
     }
 
     /**
      * @param node Joining node.
-     * @return
+     * @return Non null error message if validation failed.
      */
     @Nullable private String validateJoiningNode(ZookeeperClusterNode node) {
-        ZookeeperClusterNode node0 = state.top.nodesById.get(node.id());
+        ZookeeperClusterNode node0 = rtState.top.nodesById.get(node.id());
 
         if (node0 != null) {
             U.error(log, "Failed to include node in cluster, node with the 
same ID already exists [joiningNode=" + node +
@@ -849,21 +989,21 @@ public class ZookeeperDiscoveryImpl {
     private void saveAndProcessNewEvents() throws Exception {
         long start = System.currentTimeMillis();
 
-        byte[] evtsBytes = U.zip(marshal(state.evtsData));
+        byte[] evtsBytes = marshalZip(rtState.evtsData);
 
-        state.zkClient.setData(zkPaths.evtsPath, evtsBytes, -1);
+        rtState.zkClient.setData(zkPaths.evtsPath, evtsBytes, -1);
 
         long time = System.currentTimeMillis() - start;
 
         if (log.isInfoEnabled()) {
-            log.info("Discovery coordinator saved new topology events 
[topVer=" + state.evtsData.topVer +
+            log.info("Discovery coordinator saved new topology events 
[topVer=" + rtState.evtsData.topVer +
                 ", size=" + evtsBytes.length +
-                ", evts=" + state.evtsData.evts.size() +
-                ", lastEvt=" + state.evtsData.evtIdGen +
+                ", evts=" + rtState.evtsData.evts.size() +
+                ", lastEvt=" + rtState.evtsData.evtIdGen +
                 ", saveTime=" + time + ']');
         }
 
-        processNewEvents(state.evtsData);
+        processNewEvents(rtState.evtsData);
     }
 
     /**
@@ -875,15 +1015,15 @@ public class ZookeeperDiscoveryImpl {
 
         assert rmvd != null;
 
-        state.evtsData.topVer++;
-        state.evtsData.evtIdGen++;
+        rtState.evtsData.topVer++;
+        rtState.evtsData.evtIdGen++;
 
         ZkDiscoveryNodeFailEventData evtData = new 
ZkDiscoveryNodeFailEventData(
-            state.evtsData.evtIdGen,
-            state.evtsData.topVer,
+            rtState.evtsData.evtIdGen,
+            rtState.evtsData.topVer,
             failedNode.internalId());
 
-        state.evtsData.addEvent(curTop.values(), evtData);
+        rtState.evtsData.addEvent(curTop.values(), evtData);
 
         if (log.isInfoEnabled())
             log.info("Generated NODE_FAILED event [evt=" + evtData + ']');
@@ -905,10 +1045,10 @@ public class ZookeeperDiscoveryImpl {
 
         UUID nodeId = joinedNode.id();
 
-        state.evtsData.topVer++;
-        state.evtsData.evtIdGen++;
+        rtState.evtsData.topVer++;
+        rtState.evtsData.evtIdGen++;
 
-        joinedNode.order(state.evtsData.topVer);
+        joinedNode.order(rtState.evtsData.topVer);
         joinedNode.internalId(internalId);
 
         DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId);
@@ -932,23 +1072,23 @@ public class ZookeeperDiscoveryImpl {
         assert old == null;
 
         ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(
-            state.evtsData.evtIdGen,
-            state.evtsData.topVer,
+            rtState.evtsData.evtIdGen,
+            rtState.evtsData.topVer,
             joinedNode.id(),
             joinedNode.internalId());
 
         evtData.joiningNodeData = joiningNodeData;
 
-        state.evtsData.addEvent(dataForJoined.topology(), evtData);
+        rtState.evtsData.addEvent(dataForJoined.topology(), evtData);
 
         evtData.addRemainingAck(joinedNode); // Topology for joined node does 
not contain joined node.
 
-        byte[] dataForJoinedBytes = U.zip(marshal(dataForJoined));
+        byte[] dataForJoinedBytes = marshalZip(dataForJoined);
 
         long start = System.currentTimeMillis();
 
-        
state.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), 
joinData, PERSISTENT);
-        
state.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()),
 dataForJoinedBytes, PERSISTENT);
+        
rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPath(evtData.eventId()), 
joinData, PERSISTENT);
+        
rtState.zkClient.createIfNeeded(zkPaths.joinEventDataPathForJoined(evtData.eventId()),
 dataForJoinedBytes, PERSISTENT);
 
         long time = System.currentTimeMillis() - start;
 
@@ -966,27 +1106,29 @@ public class ZookeeperDiscoveryImpl {
      */
     @SuppressWarnings("unchecked")
     private void newClusterStarted(int locInternalId) throws Exception {
+        spi.getSpiContext().removeTimeoutObject(rtState.joinTimeoutObj);
+
         cleanupPreviousClusterData();
 
-        state.joined = true;
+        rtState.joined = true;
 
-        state.gridStartTime = U.currentTimeMillis();
+        rtState.gridStartTime = U.currentTimeMillis();
 
-        state.evtsData = new ZkDiscoveryEventsData(state.gridStartTime, 1L, 
new TreeMap<Long, ZkDiscoveryEventData>());
+        rtState.evtsData = new ZkDiscoveryEventsData(rtState.gridStartTime, 
1L, new TreeMap<Long, ZkDiscoveryEventData>());
 
         locNode.internalId(locInternalId);
         locNode.order(1);
 
-        state.top.addNode(locNode);
+        rtState.top.addNode(locNode);
 
-        String path = 
state.locNodeZkPath.substring(state.locNodeZkPath.lastIndexOf('/') + 1);
+        String path = 
rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
 
         String joinDataPath = zkPaths.joiningNodeDataPath(locNode.id(), path);
 
         if (log.isDebugEnabled())
             log.debug("Delete join data: " + joinDataPath);
 
-        state.zkClient.deleteIfExistsAsync(joinDataPath);
+        rtState.zkClient.deleteIfExistsAsync(joinDataPath);
 
         final List<ClusterNode> topSnapshot = 
Collections.singletonList((ClusterNode)locNode);
 
@@ -1000,7 +1142,7 @@ public class ZookeeperDiscoveryImpl {
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
 
-        if (state.prevJoined) {
+        if (rtState.prevJoined) {
             lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
                 1L,
                 locNode,
@@ -1021,9 +1163,9 @@ public class ZookeeperDiscoveryImpl {
         long start = System.currentTimeMillis();
 
         // TODO ZK: use multi, better batching.
-        state.zkClient.setData(zkPaths.evtsPath, null, -1);
+        rtState.zkClient.setData(zkPaths.evtsPath, null, -1);
 
-        List<String> evtChildren = 
state.zkClient.getChildren(zkPaths.evtsPath);
+        List<String> evtChildren = 
rtState.zkClient.getChildren(zkPaths.evtsPath);
 
         for (String evtPath : evtChildren) {
             String evtDir = zkPaths.evtsPath + "/" + evtPath;
@@ -1031,14 +1173,14 @@ public class ZookeeperDiscoveryImpl {
             removeChildren(evtDir);
         }
 
-        state.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1);
+        rtState.zkClient.deleteAll(zkPaths.evtsPath, evtChildren, -1);
 
-        state.zkClient.deleteAll(zkPaths.customEvtsDir,
-            state.zkClient.getChildren(zkPaths.customEvtsDir),
+        rtState.zkClient.deleteAll(zkPaths.customEvtsDir,
+            rtState.zkClient.getChildren(zkPaths.customEvtsDir),
             -1);
 
-        state.zkClient.deleteAll(zkPaths.customEvtsAcksDir,
-            state.zkClient.getChildren(zkPaths.customEvtsAcksDir),
+        rtState.zkClient.deleteAll(zkPaths.customEvtsAcksDir,
+            rtState.zkClient.getChildren(zkPaths.customEvtsAcksDir),
             -1);
 
         long time = System.currentTimeMillis() - start;
@@ -1054,15 +1196,15 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void removeChildren(String path) throws Exception {
-        state.zkClient.deleteAll(path, state.zkClient.getChildren(path), -1);
+        rtState.zkClient.deleteAll(path, rtState.zkClient.getChildren(path), 
-1);
     }
 
     ZkClusterNodes nodes() {
-        return state.top;
+        return rtState.top;
     }
 
     ZookeeperClient zkClient() {
-        return state.zkClient;
+        return rtState.zkClient;
     }
 
     /**
@@ -1070,7 +1212,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void generateCustomEvents(List<String> customEvtNodes) throws 
Exception {
-        assert state.crd;
+        assert rtState.crd;
 
         TreeMap<Integer, String> newEvts = null;
 
@@ -1079,7 +1221,7 @@ public class ZookeeperDiscoveryImpl {
 
             int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
 
-            if (evtSeq > state.evtsData.procCustEvt) {
+            if (evtSeq > rtState.evtsData.procCustEvt) {
                 if (newEvts == null)
                     newEvts = new TreeMap<>();
 
@@ -1093,7 +1235,7 @@ public class ZookeeperDiscoveryImpl {
             for (Map.Entry<Integer, String> evtE : newEvts.entrySet()) {
                 UUID sndNodeId = 
ZkIgnitePaths.customEventSendNodeId(evtE.getValue());
 
-                ZookeeperClusterNode sndNode = 
state.top.nodesById.get(sndNodeId);
+                ZookeeperClusterNode sndNode = 
rtState.top.nodesById.get(sndNodeId);
 
                 if (alives != null && !alives.contains(sndNode.id()))
                     sndNode = null;
@@ -1101,27 +1243,27 @@ public class ZookeeperDiscoveryImpl {
                 String evtDataPath = zkPaths.customEvtsDir + "/" + 
evtE.getValue();
 
                 if (sndNode != null) {
-                    byte[] evtBytes = 
state.zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue());
+                    byte[] evtBytes = 
rtState.zkClient.getData(zkPaths.customEvtsDir + "/" + evtE.getValue());
 
                     DiscoverySpiCustomMessage msg;
 
                     try {
                         msg = unmarshalZip(evtBytes);
 
-                        state.evtsData.evtIdGen++;
+                        rtState.evtsData.evtIdGen++;
 
                         if (msg instanceof ZkInternalForceNodeFailMessage) {
                             ZkInternalForceNodeFailMessage msg0 = 
(ZkInternalForceNodeFailMessage)msg;
 
                             if (alives == null)
-                                alives = new 
HashSet<>(state.top.nodesById.keySet());
+                                alives = new 
HashSet<>(rtState.top.nodesById.keySet());
 
                             if (alives.contains(msg0.nodeId)) {
-                                state.evtsData.topVer++;
+                                rtState.evtsData.topVer++;
 
                                 alives.remove(msg0.nodeId);
 
-                                ZookeeperClusterNode node = 
state.top.nodesById.get(msg0.nodeId);
+                                ZookeeperClusterNode node = 
rtState.top.nodesById.get(msg0.nodeId);
 
                                 assert node != null :  msg0.nodeId;
 
@@ -1140,15 +1282,15 @@ public class ZookeeperDiscoveryImpl {
                         }
 
                         ZkDiscoveryCustomEventData evtData = new 
ZkDiscoveryCustomEventData(
-                            state.evtsData.evtIdGen,
-                            state.evtsData.topVer,
+                            rtState.evtsData.evtIdGen,
+                            rtState.evtsData.topVer,
                             sndNodeId,
                             evtE.getValue(),
                             false);
 
                         evtData.msg = msg;
 
-                        
state.evtsData.addEvent(state.top.nodesByOrder.values(), evtData);
+                        
rtState.evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
 
                         if (log.isDebugEnabled())
                             log.debug("Generated CUSTOM event [evt=" + evtData 
+ ", msg=" + msg + ']');
@@ -1160,10 +1302,10 @@ public class ZookeeperDiscoveryImpl {
                 else {
                     U.warn(log, "Ignore custom event from unknown node: " + 
sndNodeId);
 
-                    state.zkClient.deleteIfExistsAsync(evtDataPath);
+                    rtState.zkClient.deleteIfExistsAsync(evtDataPath);
                 }
 
-                state.evtsData.procCustEvt = evtE.getKey();
+                rtState.evtsData.procCustEvt = evtE.getKey();
             }
 
             saveAndProcessNewEvents();
@@ -1178,13 +1320,13 @@ public class ZookeeperDiscoveryImpl {
         if (data.length == 0)
             return;
 
-        assert !state.crd;
+        assert !rtState.crd;
 
         ZkDiscoveryEventsData newEvts = unmarshalZip(data);
 
         // Need keep processed custom events since they contains message 
object.
-        if (state.evtsData != null) {
-            for (Map.Entry<Long, ZkDiscoveryEventData> e : 
state.evtsData.evts.entrySet()) {
+        if (rtState.evtsData != null) {
+            for (Map.Entry<Long, ZkDiscoveryEventData> e : 
rtState.evtsData.evts.entrySet()) {
                 ZkDiscoveryEventData evtData = e.getValue();
 
                 if (evtData.eventType() == 
DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
@@ -1199,7 +1341,7 @@ public class ZookeeperDiscoveryImpl {
 
         processNewEvents(newEvts);
 
-        state.evtsData = newEvts;
+        rtState.evtsData = newEvts;
     }
 
     /**
@@ -1212,8 +1354,8 @@ public class ZookeeperDiscoveryImpl {
 
         boolean updateNodeInfo = false;
 
-        for (ZkDiscoveryEventData evtData : 
evts.tailMap(state.locNodeInfo.lastProcEvt, false).values()) {
-            if (!state.joined) {
+        for (ZkDiscoveryEventData evtData : 
evts.tailMap(rtState.locNodeInfo.lastProcEvt, false).values()) {
+            if (!rtState.joined) {
                 if (evtData.eventType() != EventType.EVT_NODE_JOINED)
                     continue;
 
@@ -1222,10 +1364,13 @@ public class ZookeeperDiscoveryImpl {
                 UUID joinedId = evtData0.nodeId;
 
                 boolean locJoin = evtData.eventType() == 
EventType.EVT_NODE_JOINED &&
-                    locNode.id().equals(joinedId);
+                    evtData0.joinedInternalId == rtState.internalOrder;
+
+                if (locJoin) {
+                    assert locNode.id().equals(joinedId);
 
-                if (locJoin)
                     processLocalJoin(evtsData, evtData0);
+                }
             }
             else {
                 if (log.isDebugEnabled())
@@ -1237,7 +1382,7 @@ public class ZookeeperDiscoveryImpl {
 
                         ZkJoiningNodeData joiningData;
 
-                        if (state.crd) {
+                        if (rtState.crd) {
                             assert evtData0.joiningNodeData != null;
 
                             joiningData = evtData0.joiningNodeData;
@@ -1245,7 +1390,7 @@ public class ZookeeperDiscoveryImpl {
                         else {
                             String path = 
zkPaths.joinEventDataPath(evtData.eventId());
 
-                            joiningData = 
unmarshalZip(state.zkClient.getData(path));
+                            joiningData = 
unmarshalZip(rtState.zkClient.getData(path));
 
                             DiscoveryDataBag dataBag = new 
DiscoveryDataBag(evtData0.nodeId);
 
@@ -1273,7 +1418,7 @@ public class ZookeeperDiscoveryImpl {
 
                         DiscoverySpiCustomMessage msg;
 
-                        if (state.crd) {
+                        if (rtState.crd) {
                             assert evtData0.msg != null : evtData0;
 
                             msg = evtData0.msg;
@@ -1286,7 +1431,7 @@ public class ZookeeperDiscoveryImpl {
                             else
                                 path = zkPaths.customEventDataPath(false, 
evtData0.evtPath);
 
-                            msg = unmarshalZip(state.zkClient.getData(path));
+                            msg = unmarshalZip(rtState.zkClient.getData(path));
 
                             evtData0.msg = msg;
                         }
@@ -1308,25 +1453,25 @@ public class ZookeeperDiscoveryImpl {
                 }
             }
 
-            if (state.joined) {
-                state.locNodeInfo.lastProcEvt = evtData.eventId();
+            if (rtState.joined) {
+                rtState.locNodeInfo.lastProcEvt = evtData.eventId();
 
-                state.procEvtCnt++;
+                rtState.procEvtCnt++;
 
-                if (state.procEvtCnt % evtsAckThreshold == 0)
+                if (rtState.procEvtCnt % evtsAckThreshold == 0)
                     updateNodeInfo = true;
             }
         }
 
-        if (state.crd)
+        if (rtState.crd)
             handleProcessedEvents("procEvt");
         else if (updateNodeInfo) {
-            assert state.locNodeZkPath != null;
+            assert rtState.locNodeZkPath != null;
 
             if (log.isDebugEnabled())
-                log.debug("Update processed events: " + 
state.locNodeInfo.lastProcEvt);
+                log.debug("Update processed events: " + 
rtState.locNodeInfo.lastProcEvt);
 
-            state.zkClient.setData(state.locNodeZkPath, 
marshal(state.locNodeInfo), -1);
+            rtState.zkClient.setData(rtState.locNodeZkPath, 
marshalZip(rtState.locNodeInfo), -1);
         }
     }
 
@@ -1342,11 +1487,13 @@ public class ZookeeperDiscoveryImpl {
         if (log.isInfoEnabled())
             log.info("Local join event data: " + evtData + ']');
 
+        spi.getSpiContext().removeTimeoutObject(rtState.joinTimeoutObj);
+
         String path = zkPaths.joinEventDataPathForJoined(evtData.eventId());
 
-        ZkJoinEventDataForJoined dataForJoined = 
unmarshalZip(state.zkClient.getData(path));
+        ZkJoinEventDataForJoined dataForJoined = 
unmarshalZip(rtState.zkClient.getData(path));
 
-        state.gridStartTime = evtsData.gridStartTime;
+        rtState.gridStartTime = evtsData.gridStartTime;
 
         locNode.internalId(evtData.joinedInternalId);
         locNode.order(evtData.topologyVersion());
@@ -1364,12 +1511,12 @@ public class ZookeeperDiscoveryImpl {
 
             node.setMetrics(new ClusterMetricsSnapshot());
 
-            state.top.addNode(node);
+            rtState.top.addNode(node);
         }
 
-        state.top.addNode(locNode);
+        rtState.top.addNode(locNode);
 
-        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
+        final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
         if (connState == ConnectionState.DISCONNECTED)
             connState = ConnectionState.STARTED;
@@ -1381,7 +1528,7 @@ public class ZookeeperDiscoveryImpl {
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
 
-        if (state.prevJoined) {
+        if (rtState.prevJoined) {
             lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
                 evtData.topologyVersion(),
                 locNode,
@@ -1394,12 +1541,12 @@ public class ZookeeperDiscoveryImpl {
 
         joinFut.onDone();
 
-        state.joined = true;
+        rtState.joined = true;
 
         if (log.isDebugEnabled())
             log.debug("Delete data for joined: " + path);
 
-        state.zkClient.deleteIfExistsAsync(path);
+        rtState.zkClient.deleteIfExistsAsync(path);
     }
 
     /**
@@ -1410,7 +1557,7 @@ public class ZookeeperDiscoveryImpl {
         if (msg instanceof ZkInternalForceNodeFailMessage) {
             ZkInternalForceNodeFailMessage msg0 = 
(ZkInternalForceNodeFailMessage)msg;
 
-            ClusterNode creatorNode = 
state.top.nodesById.get(evtData.sndNodeId);
+            ClusterNode creatorNode = 
rtState.top.nodesById.get(evtData.sndNodeId);
 
             if (msg0.warning != null) {
                 U.warn(log, "Received EVT_NODE_FAILED event with warning [" +
@@ -1424,7 +1571,7 @@ public class ZookeeperDiscoveryImpl {
                     ", nodeId=" + msg0.nodeId + ']');
             }
 
-            ZookeeperClusterNode node = state.top.nodesById.get(msg0.nodeId);
+            ZookeeperClusterNode node = rtState.top.nodesById.get(msg0.nodeId);
 
             assert node != null : msg0.nodeId;
 
@@ -1452,11 +1599,11 @@ public class ZookeeperDiscoveryImpl {
         if (log.isDebugEnabled())
             log.debug(" [topVer=" + evtData.topologyVersion() + ", msg=" + msg 
+ ']');
 
-        final ZookeeperClusterNode sndNode = 
state.top.nodesById.get(evtData.sndNodeId);
+        final ZookeeperClusterNode sndNode = 
rtState.top.nodesById.get(evtData.sndNodeId);
 
         assert sndNode != null : evtData;
 
-        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
+        final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
         lsnr.onDiscovery(evtData.eventType(),
             evtData.topologyVersion(),
@@ -1479,9 +1626,9 @@ public class ZookeeperDiscoveryImpl {
 
         joinedNode.setMetrics(new ClusterMetricsSnapshot());
 
-        state.top.addNode(joinedNode);
+        rtState.top.addNode(joinedNode);
 
-        final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
+        final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
         lsnr.onDiscovery(evtData.eventType(),
             evtData.topologyVersion(),
@@ -1493,18 +1640,19 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param evtData Event data.
+     * @throws Exception If failed.
      */
     private void notifyNodeFail(final ZkDiscoveryNodeFailEventData evtData) 
throws Exception {
         processNodeFail(evtData.failedNodeInternalId(), 
evtData.topologyVersion());
     }
 
     /**
-     * @param nodeInternalId
-     * @param topVer
-     * @throws Exception
+     * @param nodeInternalId Failed node internal ID.
+     * @param topVer Topology version.
+     * @throws Exception If failed.
      */
     private void processNodeFail(int nodeInternalId, long topVer) throws 
Exception {
-        final ZookeeperClusterNode failedNode = 
state.top.removeNode(nodeInternalId);
+        final ZookeeperClusterNode failedNode = 
rtState.top.removeNode(nodeInternalId);
 
         assert failedNode != null;
 
@@ -1532,7 +1680,7 @@ public class ZookeeperDiscoveryImpl {
                         ", prevId=" + locNode.id() +
                         ", locNode=" + locNode + ']');
 
-                    new ReconnectorThread(newId).start();
+                    runInWorkerThread(new ReconnectClosure(newId));
                 }
             }
             else
@@ -1542,7 +1690,7 @@ public class ZookeeperDiscoveryImpl {
             throw new ZookeeperClientFailedException("Received node failed 
event for local node.");
         }
         else {
-            final List<ClusterNode> topSnapshot = state.top.topologySnapshot();
+            final List<ClusterNode> topSnapshot = 
rtState.top.topologySnapshot();
 
             lsnr.onDiscovery(EVT_NODE_FAILED,
                 topVer,
@@ -1558,7 +1706,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws Exception If failed.
      */
     private void handleProcessedEvents(String ctx) throws Exception {
-        Iterator<ZkDiscoveryEventData> it = 
state.evtsData.evts.values().iterator();
+        Iterator<ZkDiscoveryEventData> it = 
rtState.evtsData.evts.values().iterator();
 
         List<ZkDiscoveryCustomEventData> newEvts = null;
 
@@ -1590,11 +1738,11 @@ public class ZookeeperDiscoveryImpl {
                         DiscoverySpiCustomMessage ack = 
handleProcessedCustomEvent(ctx, (ZkDiscoveryCustomEventData)evtData);
 
                         if (ack != null) {
-                            state.evtsData.evtIdGen++;
+                            rtState.evtsData.evtIdGen++;
 
-                            long evtId = state.evtsData.evtIdGen;
+                            long evtId = rtState.evtsData.evtIdGen;
 
-                            byte[] ackBytes = U.zip(marshal(ack));
+                            byte[] ackBytes = marshalZip(ack);
 
                             String path = zkPaths.ackEventDataPath(evtId);
 
@@ -1602,7 +1750,7 @@ public class ZookeeperDiscoveryImpl {
                                 log.debug("Create ack event: " + path);
 
                             // TODO ZK: delete is previous exists?
-                            state.zkClient.createIfNeeded(
+                            rtState.zkClient.createIfNeeded(
                                 path,
                                 ackBytes,
                                 CreateMode.PERSISTENT);
@@ -1647,10 +1795,10 @@ public class ZookeeperDiscoveryImpl {
         }
 
         if (newEvts != null) {
-            Collection<ZookeeperClusterNode> nodes = 
state.top.nodesByOrder.values();
+            Collection<ZookeeperClusterNode> nodes = 
rtState.top.nodesByOrder.values();
 
             for (int i = 0; i < newEvts.size(); i++)
-                state.evtsData.addEvent(nodes, newEvts.get(i));
+                rtState.evtsData.addEvent(nodes, newEvts.get(i));
 
             saveAndProcessNewEvents();
         }
@@ -1663,7 +1811,7 @@ public class ZookeeperDiscoveryImpl {
     private void handleProcessedEventsOnNodesFail(List<ZookeeperClusterNode> 
failedNodes) throws Exception {
         boolean processed = false;
 
-        for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = 
state.evtsData.evts.entrySet().iterator(); it.hasNext();) {
+        for (Iterator<Map.Entry<Long, ZkDiscoveryEventData>> it = 
rtState.evtsData.evts.entrySet().iterator(); it.hasNext();) {
             Map.Entry<Long, ZkDiscoveryEventData> e = it.next();
 
             ZkDiscoveryEventData evtData = e.getValue();
@@ -1694,8 +1842,8 @@ public class ZookeeperDiscoveryImpl {
         if (log.isDebugEnabled())
             log.debug("Delete processed event data [path1=" + evtDataPath + ", 
path2=" + dataForJoinedPath + ']');
 
-        state.zkClient.deleteIfExistsAsync(evtDataPath);
-        state.zkClient.deleteIfExistsAsync(dataForJoinedPath);
+        rtState.zkClient.deleteIfExistsAsync(evtDataPath);
+        rtState.zkClient.deleteIfExistsAsync(dataForJoinedPath);
     }
 
     /**
@@ -1715,7 +1863,7 @@ public class ZookeeperDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Delete path: " + path);
 
-            state.zkClient.deleteIfExistsAsync(path);
+            rtState.zkClient.deleteIfExistsAsync(path);
 
             assert evtData.msg != null || locNode.order() > 
evtData.topologyVersion() : evtData;
 
@@ -1728,13 +1876,38 @@ public class ZookeeperDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Delete path: " + path);
 
-            state.zkClient.deleteIfExistsAsync(path);
+            rtState.zkClient.deleteIfExistsAsync(path);
         }
 
         return null;
     }
 
     /**
+     * @param c Closure to run.
+     */
+    private void runInWorkerThread(Runnable c) {
+        IgniteThreadPoolExecutor pool;
+
+        synchronized (stateMux) {
+            if (connState == ConnectionState.STOPPED)
+                return;
+
+            if (utilityPool == null) {
+                utilityPool = new IgniteThreadPoolExecutor("zk-discovery-pool",
+                    igniteInstanceName,
+                    0,
+                    1,
+                    2000,
+                    new LinkedBlockingQueue<Runnable>());
+            }
+
+            pool = utilityPool;
+        }
+
+        pool.submit(c);
+    }
+
+    /**
      *
      */
     public void onStop() {
@@ -1745,7 +1918,7 @@ public class ZookeeperDiscoveryImpl {
             connState = ConnectionState.STOPPED;
         }
 
-        ZookeeperClient zkClient = state.zkClient;
+        ZookeeperClient zkClient = rtState.zkClient;
 
         if (zkClient != null)
             zkClient.onCloseStart();
@@ -1769,7 +1942,9 @@ public class ZookeeperDiscoveryImpl {
 
         joinFut.onDone(e);
 
-        ZookeeperClient zkClient = state.zkClient;
+        IgniteUtils.shutdownNow(ZookeeperDiscoveryImpl.class, utilityPool, 
log);
+
+        ZookeeperClient zkClient = rtState.zkClient;
 
         if (zkClient != null)
             zkClient.close();
@@ -1811,12 +1986,6 @@ public class ZookeeperDiscoveryImpl {
      * @return Unmarshalled object.
      * @throws IgniteCheckedException If failed.
      */
-    private <T> T unmarshal(byte[] bytes) throws IgniteCheckedException {
-        assert bytes != null && bytes.length > 0;
-
-        return marsh.unmarshal(bytes, null);
-    }
-
     private <T> T unmarshalZip(byte[] bytes) throws IgniteCheckedException {
         assert bytes != null && bytes.length > 0;
 
@@ -1828,29 +1997,30 @@ public class ZookeeperDiscoveryImpl {
      * @return Bytes.
      * @throws IgniteCheckedException If failed.
      */
-    private byte[] marshal(Object obj) throws IgniteCheckedException {
+    private byte[] marshalZip(Object obj) throws IgniteCheckedException {
         assert obj != null;
 
-        return marsh.marshal(obj);
+        return U.zip(marsh.marshal(obj));
     }
 
     /**
      *
      */
-    private class ReconnectorThread extends IgniteSpiThread {
+    private class ReconnectClosure implements Runnable {
         /** */
         private final UUID newId;
 
         /**
-         *
+         * @param newId New ID.
          */
-        ReconnectorThread(UUID newId) {
-            super(ZookeeperDiscoveryImpl.this.igniteInstanceName, 
"zk-reconnector", log);
+        ReconnectClosure(UUID newId) {
+            assert newId != null;
 
             this.newId = newId;
         }
 
-        @Override protected void body() throws InterruptedException {
+        /** {@inheritDoc} */
+        @Override public void run() {
             busyLock.block();
 
             busyLock.unblock();
@@ -1883,7 +2053,7 @@ public class ZookeeperDiscoveryImpl {
                     ", prevId=" + locNode.id() +
                     ", locNode=" + locNode + ']');
 
-                new ReconnectorThread(newId).start();
+                runInWorkerThread(new ReconnectClosure(newId));
             }
             else {
                 U.warn(log, "Connection to Zookeeper server is lost, local 
node SEGMENTED.");
@@ -1905,17 +2075,17 @@ public class ZookeeperDiscoveryImpl {
             try {
                 if (evt.getType() == Event.EventType.NodeDataChanged) {
                     if (evt.getPath().equals(zkPaths.evtsPath)) {
-                        if (!state.crd)
-                            state.zkClient.getDataAsync(evt.getPath(), this, 
dataCallback);
+                        if (!rtState.crd)
+                            rtState.zkClient.getDataAsync(evt.getPath(), this, 
dataCallback);
                     }
                     else
                         U.warn(log, "Received NodeDataChanged for unexpected 
path: " + evt.getPath());
                 }
                 else if (evt.getType() == Event.EventType.NodeChildrenChanged) 
{
                     if (evt.getPath().equals(zkPaths.aliveNodesDir))
-                        state.zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
+                        rtState.zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
                     else if (evt.getPath().equals(zkPaths.customEvtsDir))
-                        state.zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
+                        rtState.zkClient.getChildrenAsync(evt.getPath(), this, 
childrenCallback);
                     else
                         U.warn(log, "Received NodeChildrenChanged for 
unexpected path: " + evt.getPath());
                 }
@@ -1968,7 +2138,7 @@ public class ZookeeperDiscoveryImpl {
                 assert rc == 0 : KeeperException.Code.get(rc);
 
                 if (path.equals(zkPaths.evtsPath)) {
-                    if (!state.crd)
+                    if (!rtState.crd)
                         processNewEvents(data);
                 }
                 else
@@ -1993,7 +2163,7 @@ public class ZookeeperDiscoveryImpl {
 
             try {
                 if (evt.getType() == Event.EventType.NodeDataChanged)
-                    state.zkClient.getDataAsync(evt.getPath(), this, this);
+                    rtState.zkClient.getDataAsync(evt.getPath(), this, this);
 
                 busyLock.leaveBusy();
             }
@@ -2008,7 +2178,7 @@ public class ZookeeperDiscoveryImpl {
                 return;
 
             try {
-                assert state.crd;
+                assert rtState.crd;
 
                 processResult0(rc, path, data);
 
@@ -2036,11 +2206,11 @@ public class ZookeeperDiscoveryImpl {
             assert rc == 0 : KeeperException.Code.get(rc);
 
             if (data.length > 0) {
-                ZkAliveNodeData nodeData = unmarshal(data);
+                ZkAliveNodeData nodeData = unmarshalZip(data);
 
                 Integer nodeInternalId = ZkIgnitePaths.aliveInternalId(path);
 
-                Iterator<ZkDiscoveryEventData> it = 
state.evtsData.evts.values().iterator();
+                Iterator<ZkDiscoveryEventData> it = 
rtState.evtsData.evts.values().iterator();
 
                 boolean processed = false;
 
@@ -2075,7 +2245,7 @@ public class ZookeeperDiscoveryImpl {
                         log.info("Previous node watch event: " + evt);
 
                     if (evt.getType() != Event.EventType.None)
-                        state.zkClient.existsAsync(evt.getPath(), this, this);
+                        rtState.zkClient.existsAsync(evt.getPath(), this, 
this);
                 }
 
                 busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e909027f/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 2c6890f..6c32a4e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -38,6 +38,7 @@ import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingCluster;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -760,7 +761,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     private static String aliveZkNodePath(DiscoverySpi spi) {
-        String path = GridTestUtils.getFieldValue(spi, "impl", "state", 
"locNodeZkPath");
+        String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", 
"locNodeZkPath");
 
         return path.substring(path.lastIndexOf('/') + 1);
     }
@@ -1406,11 +1407,37 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testDuplicatedNodeId() throws Exception {
-        nodeId = UUID.randomUUID();
+        UUID nodeId0 = nodeId = UUID.randomUUID();
 
         startGrid(0);
 
-        startGrid(1);
+        int failingNodeIdx = 100;
+
+        for (int i = 0; i < 5; i++) {
+            final int idx = failingNodeIdx++;
+
+            nodeId = nodeId0;
+
+            info("Start node with duplicated ID [iter=" + i + ", nodeId=" + 
nodeId + ']');
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    startGrid(idx);
+
+                    return null;
+                }
+            }, IgniteCheckedException.class, null);
+
+            nodeId = null;
+
+            info("Start node with unique ID [iter=" + i + ']');
+
+            Ignite ignite = startGrid(idx);
+
+            nodeId0 = ignite.cluster().localNode().id();
+
+            waitForTopology(i + 2);
+        }
     }
 
     /**
@@ -1650,7 +1677,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 Map<Object, Object> evts = 
GridTestUtils.getFieldValue(node.configuration().getDiscoverySpi(),
-                    "impl", "state", "evtsData", "evts");
+                    "impl", "rtState", "evtsData", "evts");
 
                 if (!evts.isEmpty()) {
                     info("Unacked events: " + evts);
@@ -1809,7 +1836,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @param spi Spi instance.
      */
     private static ZooKeeper zkClient(ZookeeperDiscoverySpi spi) {
-        return GridTestUtils.getFieldValue(spi, "impl", "state", "zkClient", 
"zk");
+        return GridTestUtils.getFieldValue(spi, "impl", "rtState", "zkClient", 
"zk");
     }
 
     /**

Reply via email to