http://git-wip-us.apache.org/repos/asf/ignite/blob/8d3755b9/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
new file mode 100644
index 0000000..7708358
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -0,0 +1,4464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CommunicationFailureResolver;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.ClusterMetricsSnapshot;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.MarshallerUtils;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.security.SecurityCredentials;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2;
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+import static org.apache.zookeeper.CreateMode.PERSISTENT;
+
+/**
+ * Zookeeper Discovery Impl.
+ */
+public class ZookeeperDiscoveryImpl {
+    /** */
+    static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD";
+
+    /** */
+    static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT";
+
+    /** */
+    static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS";
+
+    /** */
+    private static final String IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE = 
"IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE";
+
+    /** */
+    final ZookeeperDiscoverySpi spi;
+
+    /** */
+    private final String igniteInstanceName;
+
+    /** */
+    private final String connectString;
+
+    /** */
+    private final int sesTimeout;
+
+    /** */
+    private final JdkMarshaller marsh = new JdkMarshaller();
+
+    /** */
+    private final ZkIgnitePaths zkPaths;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** */
+    private final ZookeeperClusterNode locNode;
+
+    /** */
+    private final DiscoverySpiListener lsnr;
+
+    /** */
+    private final DiscoverySpiDataExchange exchange;
+
+    /** */
+    private final boolean clientReconnectEnabled;
+
+    /** */
+    private final GridFutureAdapter<Void> joinFut = new GridFutureAdapter<>();
+
+    /** */
+    private final int evtsAckThreshold;
+
+    /** */
+    private IgniteThreadPoolExecutor utilityPool;
+
+    /** */
+    private ZkRuntimeState rtState;
+
+    /** */
+    private volatile ConnectionState connState = ConnectionState.STARTED;
+
+    /** */
+    private final AtomicBoolean stop = new AtomicBoolean();
+
+    /** */
+    private final Object stateMux = new Object();
+
+    /** */
+    public volatile IgniteDiscoverySpiInternalListener internalLsnr;
+
+    /** */
+    private final ConcurrentHashMap<Long, PingFuture> pingFuts = new 
ConcurrentHashMap<>();
+
+    /** */
+    private final AtomicReference<ZkCommunicationErrorProcessFuture> 
commErrProcFut = new AtomicReference<>();
+
+    /** */
+    private long prevSavedEvtsTopVer;
+
+    /**
+     * @param spi Discovery SPI.
+     * @param igniteInstanceName Instance name.
+     * @param log Logger.
+     * @param zkRootPath Zookeeper base path node all nodes.
+     * @param locNode Local node instance.
+     * @param lsnr Discovery events listener.
+     * @param exchange Discovery data exchange.
+     * @param internalLsnr Internal listener (used for testing only).
+     */
+    public ZookeeperDiscoveryImpl(
+        ZookeeperDiscoverySpi spi,
+        String igniteInstanceName,
+        IgniteLogger log,
+        String zkRootPath,
+        ZookeeperClusterNode locNode,
+        DiscoverySpiListener lsnr,
+        DiscoverySpiDataExchange exchange,
+        IgniteDiscoverySpiInternalListener internalLsnr) {
+        assert locNode.id() != null && locNode.isLocal() : locNode;
+
+        MarshallerUtils.setNodeName(marsh, igniteInstanceName);
+
+        zkPaths = new ZkIgnitePaths(zkRootPath);
+
+        this.spi = spi;
+        this.igniteInstanceName = igniteInstanceName;
+        this.connectString = spi.getZkConnectionString();
+        this.sesTimeout = (int)spi.getSessionTimeout();
+        this.log = log.getLogger(getClass());
+        this.locNode = locNode;
+        this.lsnr = lsnr;
+        this.exchange = exchange;
+        this.clientReconnectEnabled = locNode.isClient() && 
!spi.isClientReconnectDisabled();
+
+        int evtsAckThreshold = 
IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, 
5);
+
+        if (evtsAckThreshold <= 0)
+            evtsAckThreshold = 1;
+
+        this.evtsAckThreshold = evtsAckThreshold;
+
+        if (internalLsnr != null)
+            this.internalLsnr = internalLsnr;
+    }
+
+    /**
+     * @return Exception.
+     */
+    private static IgniteClientDisconnectedCheckedException disconnectError() {
+        return new IgniteClientDisconnectedCheckedException(null, "Client node 
disconnected.");
+    }
+
+    /**
+     * @return Logger.
+     */
+    IgniteLogger log() {
+        return log;
+    }
+
+    /**
+     * @return Local node instance.
+     */
+    public ClusterNode localNode() {
+        return locNode;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return Node instance.
+     */
+    @Nullable public ZookeeperClusterNode node(UUID nodeId) {
+        assert nodeId != null;
+
+        return rtState.top.nodesById.get(nodeId);
+    }
+
+    /**
+     * @param nodeOrder Node order.
+     * @return Node instance.
+     */
+    @Nullable public ZookeeperClusterNode node(long nodeOrder) {
+        assert nodeOrder > 0 : nodeOrder;
+
+        return rtState.top.nodesByOrder.get(nodeOrder);
+    }
+
+    /**
+     * @param fut Future to remove.
+     */
+    void 
clearCommunicationErrorProcessFuture(ZkCommunicationErrorProcessFuture fut) {
+        assert fut.isDone() : fut;
+
+        commErrProcFut.compareAndSet(fut, null);
+    }
+
+    /**
+     * @param node0 Problem node ID
+     * @param err Connect error.
+     */
+    public void resolveCommunicationError(ClusterNode node0, Exception err) {
+        ZookeeperClusterNode node = node(node0.id());
+
+        if (node == null)
+            throw new IgniteSpiException(new 
ClusterTopologyCheckedException("Node failed: " + node0.id()));
+
+        IgniteInternalFuture<Boolean> nodeStatusFut;
+
+        for (;;) {
+            checkState();
+
+            ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
+
+            if (fut == null || fut.isDone()) {
+                ZkCommunicationErrorProcessFuture newFut = 
ZkCommunicationErrorProcessFuture.createOnCommunicationError(
+                    this,
+                    node.sessionTimeout() + 1000);
+
+                if (commErrProcFut.compareAndSet(fut, newFut)) {
+                    fut = newFut;
+
+                    if (log.isInfoEnabled()) {
+                        log.info("Created new communication error process 
future [errNode=" + node0.id() +
+                            ", err=" + err + ']');
+                    }
+
+                    try {
+                        checkState();
+                    }
+                    catch (Exception e) {
+                        fut.onError(e);
+
+                        throw e;
+                    }
+
+                    fut.scheduleCheckOnTimeout();
+                }
+                else {
+                    fut = commErrProcFut.get();
+
+                    if (fut == null)
+                        continue;
+                }
+            }
+
+            nodeStatusFut = fut.nodeStatusFuture(node);
+
+            if (nodeStatusFut != null)
+                break;
+            else {
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Previous communication error process future 
failed: " + e);
+                }
+            }
+        }
+
+        try {
+            if (!nodeStatusFut.get())
+                throw new IgniteSpiException(new 
ClusterTopologyCheckedException("Node failed: " + node0.id()));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException(e);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return Ping result.
+     */
+    public boolean pingNode(UUID nodeId) {
+        checkState();
+
+        ZkRuntimeState rtState = this.rtState;
+
+        ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId);
+
+        if (node == null)
+            return false;
+
+        if (node.isLocal())
+            return true;
+
+        PingFuture fut = pingFuts.get(node.order());
+
+        if (fut == null) {
+            fut = new PingFuture(rtState, node);
+
+            PingFuture old = pingFuts.putIfAbsent(node.order(), fut);
+
+            if (old == null) {
+                if (fut.checkNodeAndState())
+                    spi.getSpiContext().addTimeoutObject(fut);
+                else
+                    assert fut.isDone();
+            }
+            else
+                fut = old;
+        }
+
+        try {
+            return fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException(e);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param warning Warning.
+     */
+    public void failNode(UUID nodeId, @Nullable String warning) {
+        ZookeeperClusterNode node = rtState.top.nodesById.get(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignore forcible node fail request, node does not 
exist: " + nodeId);
+
+            return;
+        }
+
+        if (!node.isClient()) {
+            U.warn(log, "Ignore forcible node fail request for non-client 
node: " + node);
+
+            return;
+        }
+
+        sendCustomMessage(new ZkForceNodeFailMessage(node.internalId(), 
warning));
+    }
+
+    /**
+     *
+     */
+    public void reconnect() {
+        assert clientReconnectEnabled;
+
+        synchronized (stateMux) {
+            if (connState == ConnectionState.STARTED) {
+                connState = ConnectionState.DISCONNECTED;
+
+                rtState.onCloseStart(disconnectError());
+            }
+            else
+                return;
+        }
+
+        busyLock.block();
+
+        busyLock.unblock();
+
+        rtState.zkClient.close();
+
+        UUID newId = UUID.randomUUID();
+
+        U.quietAndWarn(log, "Local node will try to reconnect to cluster with 
new id due to network problems [" +
+            "newId=" + newId +
+            ", prevId=" + locNode.id() +
+            ", locNode=" + locNode + ']');
+
+        runInWorkerThread(new ReconnectClosure(newId));
+    }
+
+    /**
+     * @param newId New ID.
+     */
+    private void doReconnect(UUID newId) {
+        if (rtState.joined) {
+            assert rtState.evtsData != null;
+
+            lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+                rtState.evtsData.topVer,
+                locNode,
+                rtState.top.topologySnapshot(),
+                Collections.<Long, Collection<ClusterNode>>emptyMap(),
+                null);
+        }
+
+        try {
+            locNode.onClientDisconnected(newId);
+
+            joinTopology(rtState);
+        }
+        catch (Exception e) {
+            if (stopping()) {
+                if (log.isDebugEnabled())
+                    log.debug("Reconnect failed, node is stopping [err=" + e + 
']');
+
+                return;
+            }
+
+            U.error(log, "Failed to reconnect: " + e, e);
+
+            onSegmented(e);
+        }
+    }
+
+    /**
+     * @return {@code True} if started to stop.
+     */
+    private boolean stopping() {
+        if (stop.get())
+            return true;
+
+        synchronized (stateMux) {
+            if (connState == ConnectionState.STOPPED)
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param e Error.
+     */
+    private void onSegmented(Exception e) {
+        rtState.errForClose = e;
+
+        if (rtState.joined || joinFut.isDone()) {
+            synchronized (stateMux) {
+                connState = ConnectionState.STOPPED;
+            }
+
+            notifySegmented();
+        }
+        else
+            joinFut.onDone(e);
+    }
+
+    /**
+     *
+     */
+    private void notifySegmented() {
+        List<ClusterNode> nodes = rtState.top.topologySnapshot();
+
+        if (nodes.isEmpty())
+            nodes = Collections.singletonList((ClusterNode)locNode);
+
+        lsnr.onDiscovery(EVT_NODE_SEGMENTED,
+            rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
+            locNode,
+            nodes,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+    }
+
+    /**
+     * @return Remote nodes.
+     */
+    public Collection<ClusterNode> remoteNodes() {
+        checkState();
+
+        return rtState.top.remoteNodes();
+    }
+
+    /**
+     *
+     */
+    private void checkState() {
+        switch (connState) {
+            case STARTED:
+                break;
+
+            case STOPPED:
+                throw new IgniteSpiException("Node stopped.");
+
+            case DISCONNECTED:
+                throw new IgniteClientDisconnectedException(null, "Client is 
disconnected.");
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if node joined or joining topology.
+     */
+    public boolean knownNode(UUID nodeId) {
+        while (!busyLock.enterBusy())
+            checkState();
+
+        try {
+            List<String> children = 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir);
+
+            for (int i = 0; i < children.size(); i++) {
+                UUID id = ZkIgnitePaths.aliveNodeId(children.get(i));
+
+                if (nodeId.equals(id))
+                    return true;
+            }
+
+            return false;
+        }
+        catch (ZookeeperClientFailedException e) {
+            if (clientReconnectEnabled)
+                throw new IgniteClientDisconnectedException(null, "Client is 
disconnected.");
+
+            throw new IgniteException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param msg Message.
+     */
+    public void sendCustomMessage(DiscoverySpiCustomMessage msg) {
+        assert msg != null;
+
+        byte[] msgBytes;
+
+        try {
+            msgBytes = marshalZip(msg);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to marshal custom message: " 
+ msg, e);
+        }
+
+        while (!busyLock.enterBusy())
+            checkState();
+
+        try {
+            ZookeeperClient zkClient = rtState.zkClient;
+
+            saveCustomMessage(zkClient, msgBytes);
+        }
+        catch (ZookeeperClientFailedException e) {
+            if (clientReconnectEnabled)
+                throw new IgniteClientDisconnectedException(null, "Client is 
disconnected.");
+
+            throw new IgniteException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param zkClient Client.
+     * @param msgBytes Marshalled message.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    private void saveCustomMessage(ZookeeperClient zkClient, byte[] msgBytes)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        String prefix = UUID.randomUUID().toString();
+
+        int partCnt = 1;
+
+        int overhead = 10;
+
+        UUID locId = locNode.id();
+
+        String path = zkPaths.createCustomEventPath(prefix, locId, partCnt);
+
+        if (zkClient.needSplitNodeData(path, msgBytes, overhead)) {
+            List<byte[]> parts = zkClient.splitNodeData(path, msgBytes, 
overhead);
+
+            String partsBasePath = zkPaths.customEventPartsBasePath(prefix, 
locId);
+
+            saveMultipleParts(zkClient, partsBasePath, parts);
+
+            msgBytes = null;
+
+            partCnt = parts.size();
+        }
+
+        zkClient.createSequential(prefix,
+            zkPaths.customEvtsDir,
+            zkPaths.createCustomEventPath(prefix, locId, partCnt),
+            msgBytes,
+            CreateMode.PERSISTENT_SEQUENTIAL);
+    }
+
+    /**
+     * @return Cluster start time.
+     */
+    public long gridStartTime() {
+        return rtState.gridStartTime;
+    }
+
+    /**
+     * Starts join procedure and waits for {@link EventType#EVT_NODE_JOINED} 
event for local node.
+     *
+     * @throws InterruptedException If interrupted.
+     */
+    public void startJoinAndWait() throws InterruptedException {
+        joinTopology(null);
+
+        for (;;) {
+            try {
+                joinFut.get(10_000);
+
+                break;
+            }
+            catch (IgniteFutureTimeoutCheckedException e) {
+                U.warn(log, "Waiting for local join event [nodeId=" + 
locNode.id() + ", name=" + igniteInstanceName + ']');
+            }
+            catch (Exception e) {
+                IgniteSpiException spiErr = X.cause(e, 
IgniteSpiException.class);
+
+                if (spiErr != null)
+                    throw spiErr;
+
+                throw new IgniteSpiException("Failed to join cluster", e);
+            }
+        }
+    }
+
+    /**
+     * @param prevState Previous state in case of connect retry.
+     * @throws InterruptedException If interrupted.
+     */
+    private void joinTopology(@Nullable ZkRuntimeState prevState) throws 
InterruptedException {
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            boolean reconnect = prevState != null;
+
+            // Need fire EVT_CLIENT_NODE_RECONNECTED event if reconnect after 
already joined.
+            boolean prevJoined = prevState != null && prevState.joined;
+
+            IgniteDiscoverySpiInternalListener internalLsnr = 
this.internalLsnr;
+
+            if (internalLsnr != null)
+                internalLsnr.beforeJoin(locNode, log);
+
+            if (locNode.isClient() && reconnect)
+                locNode.setAttributes(spi.getSpiContext().nodeAttributes());
+
+            marshalCredentialsOnJoin(locNode);
+
+            synchronized (stateMux) {
+                if (connState == ConnectionState.STOPPED)
+                    return;
+
+                connState = ConnectionState.STARTED;
+            }
+
+            ZkRuntimeState rtState = this.rtState = new 
ZkRuntimeState(prevJoined);
+
+            DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id(), 
locNode.isClient());
+
+            exchange.collect(discoDataBag);
+
+            ZkJoiningNodeData joinData = new ZkJoiningNodeData(locNode, 
discoDataBag.joiningNodeData());
+
+            byte[] joinDataBytes;
+
+            try {
+                joinDataBytes = marshalZip(joinData);
+            }
+            catch (Exception e) {
+                throw new IgniteSpiException("Failed to marshal joining node 
data", e);
+            }
+
+            try {
+                rtState.zkClient = new ZookeeperClient(
+                    igniteInstanceName,
+                    log,
+                    connectString,
+                    sesTimeout,
+                    new ConnectionLossListener());
+            }
+            catch (Exception e) {
+                throw new IgniteSpiException("Failed to create Zookeeper 
client", e);
+            }
+
+            startJoin(rtState, prevState, joinDataBytes);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @throws InterruptedException If interrupted.
+     */
+    private void initZkNodes() throws InterruptedException {
+        try {
+            ZookeeperClient client = rtState.zkClient;
+
+            if (!client.exists(zkPaths.clusterDir)) {
+                createRootPathParents(zkPaths.clusterDir, client);
+
+                client.createIfNeeded(zkPaths.clusterDir, null, PERSISTENT);
+            }
+
+            List<String> createdDirs = client.getChildren(zkPaths.clusterDir);
+
+            String[] requiredDirs = {
+                zkPaths.evtsPath,
+                zkPaths.joinDataDir,
+                zkPaths.customEvtsDir,
+                zkPaths.customEvtsPartsDir,
+                zkPaths.customEvtsAcksDir,
+                zkPaths.aliveNodesDir};
+
+            List<String> dirs = new ArrayList<>();
+
+            for (String dir : requiredDirs) {
+                String dir0 = dir.substring(zkPaths.clusterDir.length() + 1);
+
+                if (!createdDirs.contains(dir0))
+                    dirs.add(dir);
+            }
+
+            try {
+                if (!dirs.isEmpty())
+                    client.createAll(dirs, PERSISTENT);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to create nodes using bulk operation: " 
+ e);
+
+                for (String dir : dirs)
+                    client.createIfNeeded(dir, null, PERSISTENT);
+            }
+        }
+        catch (ZookeeperClientFailedException e) {
+            throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
+        }
+    }
+
+    /**
+     * @param rootDir Root directory.
+     * @param client Client.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    private void createRootPathParents(String rootDir, ZookeeperClient client)
+        throws ZookeeperClientFailedException, InterruptedException {
+        int startIdx = 0;
+
+        for (;;) {
+            int separatorIdx = rootDir.indexOf(ZkIgnitePaths.PATH_SEPARATOR, 
startIdx);
+
+            if (separatorIdx == -1)
+                break;
+
+            if (separatorIdx > 0) {
+                String path = rootDir.substring(0, separatorIdx);
+
+                client.createIfNeeded(path, null, CreateMode.PERSISTENT);
+            }
+
+            startIdx = separatorIdx + 1;
+        }
+    }
+
+    /**
+     * @param zkClient Client.
+     * @param basePath Base path.
+     * @param partCnt Parts count.
+     */
+    private void deleteMultiplePartsAsync(ZookeeperClient zkClient, String 
basePath, int partCnt) {
+        for (int i = 0; i < partCnt; i++) {
+            String path = multipartPathName(basePath, i);
+
+            zkClient.deleteIfExistsAsync(path);
+        }
+    }
+
+    /**
+     * @param zkClient Client.
+     * @param basePath Base path.
+     * @param partCnt Parts count.
+     * @return Read parts.
+     * @throws Exception If failed.
+     */
+    private byte[] readMultipleParts(ZookeeperClient zkClient, String 
basePath, int partCnt)
+        throws Exception {
+        assert partCnt >= 1;
+
+        if (partCnt > 1) {
+            List<byte[]> parts = new ArrayList<>(partCnt);
+
+            int totSize = 0;
+
+            for (int i = 0; i < partCnt; i++) {
+                byte[] part = zkClient.getData(multipartPathName(basePath, i));
+
+                parts.add(part);
+
+                totSize += part.length;
+            }
+
+            byte[] res = new byte[totSize];
+
+            int pos = 0;
+
+            for (int i = 0; i < partCnt; i++) {
+                byte[] part = parts.get(i);
+
+                System.arraycopy(part, 0, res, pos, part.length);
+
+                pos += part.length;
+            }
+
+            return res;
+        }
+        else
+            return zkClient.getData(multipartPathName(basePath, 0));
+    }
+
+    /**
+     * @param zkClient Client.
+     * @param basePath Base path.
+     * @param parts Data parts.
+     * @return Number of parts.
+     * @throws ZookeeperClientFailedException If client failed.
+     * @throws InterruptedException If interrupted.
+     */
+    private int saveMultipleParts(ZookeeperClient zkClient, String basePath, 
List<byte[]> parts)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        assert parts.size() > 1;
+
+        for (int i = 0; i < parts.size(); i++) {
+            byte[] part = parts.get(i);
+
+            String path = multipartPathName(basePath, i);
+
+            zkClient.createIfNeeded(path, part, PERSISTENT);
+        }
+
+        return parts.size();
+    }
+
+    /**
+     * @param basePath Base path.
+     * @param part Part number.
+     * @return Path.
+     */
+    private static String multipartPathName(String basePath, int part) {
+        return basePath + String.format("%04d", part);
+    }
+
+    /**
+     * @param rtState Runtime state.
+     * @param joinDataBytes Joining node data.
+     * @param prevState Previous state in case of connect retry.
+     * @throws InterruptedException If interrupted.
+     */
+    private void startJoin(ZkRuntimeState rtState, @Nullable ZkRuntimeState 
prevState, final byte[] joinDataBytes)
+        throws InterruptedException
+    {
+        try {
+            long startTime = System.currentTimeMillis();
+
+            initZkNodes();
+
+            String prefix = UUID.randomUUID().toString();
+
+            rtState.init(new ZkWatcher(rtState), new 
AliveNodeDataWatcher(rtState));
+
+            ZookeeperClient zkClient = rtState.zkClient;
+
+            final int OVERHEAD = 5;
+
+            // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8193
+            String joinDataPath = zkPaths.joinDataDir + "/" + prefix + ":" + 
locNode.id();
+
+            if (zkClient.needSplitNodeData(joinDataPath, joinDataBytes, 
OVERHEAD)) {
+                List<byte[]> parts = zkClient.splitNodeData(joinDataPath, 
joinDataBytes, OVERHEAD);
+
+                rtState.joinDataPartCnt = parts.size();
+
+                saveMultipleParts(zkClient, joinDataPath + ":", parts);
+
+                joinDataPath = zkClient.createIfNeeded(
+                    joinDataPath,
+                    marshalZip(new ZkJoiningNodeData(parts.size())),
+                    PERSISTENT);
+            }
+            else {
+                joinDataPath = zkClient.createIfNeeded(
+                    joinDataPath,
+                    joinDataBytes,
+                    PERSISTENT);
+            }
+
+            rtState.locNodeZkPath = zkClient.createSequential(
+                prefix,
+                zkPaths.aliveNodesDir,
+                zkPaths.aliveNodePathForCreate(prefix, locNode),
+                null,
+                EPHEMERAL_SEQUENTIAL);
+
+            rtState.internalOrder = 
ZkIgnitePaths.aliveInternalId(rtState.locNodeZkPath);
+
+            if (log.isInfoEnabled()) {
+                log.info("Node started join [nodeId=" + locNode.id() +
+                    ", instanceName=" + 
locNode.attribute(ATTR_IGNITE_INSTANCE_NAME) +
+                    ", zkSessionId=0x" + 
Long.toHexString(rtState.zkClient.zk().getSessionId()) +
+                    ", joinDataSize=" + joinDataBytes.length +
+                    (rtState.joinDataPartCnt > 1 ? (", joinDataPartCnt=" + 
rtState.joinDataPartCnt) : "") +
+                    ", consistentId=" + locNode.consistentId() +
+                    ", initTime=" + (System.currentTimeMillis() - startTime) +
+                    ", nodePath=" + rtState.locNodeZkPath + ']');
+            }
+
+            /*
+            If node can not join due to validation error this error is 
reported in join data,
+            As a minor optimization do not start watch join data immediately, 
but only if do not receive
+            join event after some timeout.
+             */
+            CheckJoinErrorWatcher joinErrorWatcher = new 
CheckJoinErrorWatcher(5000, joinDataPath, rtState);
+
+            rtState.joinErrTo = joinErrorWatcher.timeoutObj;
+
+            if (locNode.isClient() && spi.getJoinTimeout() > 0) {
+                ZkTimeoutObject joinTimeoutObj = prevState != null ? 
prevState.joinTo : null;
+
+                if (joinTimeoutObj == null) {
+                    joinTimeoutObj = new 
JoinTimeoutObject(spi.getJoinTimeout());
+
+                    spi.getSpiContext().addTimeoutObject(joinTimeoutObj);
+                }
+
+                rtState.joinTo = joinTimeoutObj;
+            }
+
+            if (!locNode.isClient())
+                zkClient.getChildrenAsync(zkPaths.aliveNodesDir, null, new 
CheckCoordinatorCallback(rtState));
+
+            zkClient.getDataAsync(zkPaths.evtsPath, rtState.watcher, 
rtState.watcher);
+
+            spi.getSpiContext().addTimeoutObject(rtState.joinErrTo);
+        }
+        catch (IgniteCheckedException | ZookeeperClientFailedException e) {
+            throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
+        }
+    }
+
+    /**
+     * Authenticate local node.
+     *
+     * @param nodeAuth Authenticator.
+     * @param locCred Local security credentials for authentication.
+     * @throws IgniteSpiException If any error occurs.
+     */
+    private void localAuthentication(DiscoverySpiNodeAuthenticator nodeAuth, 
SecurityCredentials locCred){
+        assert nodeAuth != null;
+        assert locCred != null;
+
+        try {
+            SecurityContext subj = nodeAuth.authenticateNode(locNode, locCred);
+
+            // Note: exception message is checked in tests.
+            if (subj == null)
+                throw new IgniteSpiException("Authentication failed for local 
node.");
+
+            if (!(subj instanceof Serializable))
+                throw new IgniteSpiException("Authentication subject is not 
Serializable.");
+
+            Map<String, Object> attrs = new HashMap<>(locNode.attributes());
+
+            attrs.put(ATTR_SECURITY_SUBJECT_V2, U.marshal(marsh, subj));
+
+            locNode.setAttributes(attrs);
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException("Failed to authenticate local node 
(will shutdown local node).", e);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param zipBytes Zip-compressed marshalled security subject.
+     * @throws Exception If failed.
+     */
+    private void setNodeSecuritySubject(ZookeeperClusterNode node, byte[] 
zipBytes) throws Exception {
+        assert zipBytes != null;
+
+        Map<String, Object> attrs = new HashMap<>(node.getAttributes());
+
+        attrs.put(ATTR_SECURITY_SUBJECT_V2, unzip(zipBytes));
+
+        node.setAttributes(attrs);
+    }
+
+    /**
+     * @param node Node.
+     * @return Credentials.
+     * @throws IgniteCheckedException If failed to unmarshal.
+     */
+    private SecurityCredentials unmarshalCredentials(ZookeeperClusterNode 
node) throws Exception {
+        byte[] credBytes = 
(byte[])node.getAttributes().get(ATTR_SECURITY_CREDENTIALS);
+
+        if (credBytes == null)
+            return null;
+
+        return unmarshalZip(credBytes);
+    }
+
+    /**
+     * Marshalls credentials with discovery SPI marshaller (will replace 
attribute value).
+     *
+     * @param node Node to marshall credentials for.
+     * @throws IgniteSpiException If marshalling failed.
+     */
+    private void marshalCredentialsOnJoin(ZookeeperClusterNode node) throws 
IgniteSpiException {
+        try {
+            // Use security-unsafe getter.
+            Map<String, Object> attrs0 = node.getAttributes();
+
+            Object creds = attrs0.get(ATTR_SECURITY_CREDENTIALS);
+
+            if (creds != null) {
+                Map<String, Object> attrs = new HashMap<>(attrs0);
+
+                assert !(creds instanceof byte[]);
+
+                attrs.put(ATTR_SECURITY_CREDENTIALS, marshalZip(creds));
+
+                node.setAttributes(attrs);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSpiException("Failed to marshal node security 
credentials: " + node.id(), e);
+        }
+    }
+
+    /**
+     *
+     */
+    private class UpdateProcessedEventsTimeoutObject extends ZkTimeoutObject {
+        /** */
+        private final ZkRuntimeState rtState;
+
+        /**
+         * @param rtState Runtime state.
+         * @param timeout Timeout.
+         */
+        UpdateProcessedEventsTimeoutObject(ZkRuntimeState rtState, long 
timeout) {
+            super(timeout);
+
+            this.rtState = rtState;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            runInWorkerThread(new ZkRunnable(rtState, 
ZookeeperDiscoveryImpl.this) {
+                @Override protected void run0() throws Exception {
+                    updateProcessedEventsOnTimeout(rtState, 
UpdateProcessedEventsTimeoutObject.this);
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
+    private class JoinTimeoutObject extends ZkTimeoutObject {
+        /**
+         * @param timeout Timeout.
+         */
+        JoinTimeoutObject(long timeout) {
+            super(timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            if (cancelled || rtState.joined)
+                return;
+
+            runInWorkerThread(new Runnable() {
+                @Override public void run() {
+                    synchronized (stateMux) {
+                        if (cancelled || rtState.joined)
+                            return;
+
+                        if (connState == ConnectionState.STOPPED)
+                            return;
+
+                        connState = ConnectionState.STOPPED;
+                    }
+
+                    U.warn(log, "Failed to connect to cluster, either 
connection to ZooKeeper can not be established or there " +
+                        "are no alive server nodes (consider increasing 
'joinTimeout' configuration  property) [" +
+                        "joinTimeout=" + spi.getJoinTimeout() + ']');
+
+                    // Note: exception message is checked in tests.
+                    onSegmented(new IgniteSpiException("Failed to connect to 
cluster within configured timeout"));
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
+    private class CheckJoinErrorWatcher extends ZkAbstractWatcher implements 
AsyncCallback.DataCallback {
+        /** */
+        private final String joinDataPath;
+
+        /** */
+        private ZkTimeoutObject timeoutObj;
+
+        /**
+         * @param timeout Timeout.
+         * @param joinDataPath0 Node joined data path.
+         * @param rtState0 State.
+         */
+        CheckJoinErrorWatcher(long timeout, String joinDataPath0, 
ZkRuntimeState rtState0) {
+            super(rtState0, ZookeeperDiscoveryImpl.this);
+
+            this.joinDataPath = joinDataPath0;
+
+            timeoutObj = new ZkTimeoutObject(timeout) {
+                @Override public void onTimeout() {
+                    if (rtState.errForClose != null || rtState.joined)
+                        return;
+
+                    synchronized (stateMux) {
+                        if (connState != ConnectionState.STARTED)
+                            return;
+                    }
+
+                    rtState.zkClient.getDataAsync(joinDataPath,
+                        CheckJoinErrorWatcher.this,
+                        CheckJoinErrorWatcher.this);
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            if (rc != 0)
+                return;
+
+            if (!onProcessStart())
+                return;
+
+            try {
+                Object obj = unmarshalZip(data);
+
+                if (obj instanceof ZkInternalJoinErrorMessage) {
+                    ZkInternalJoinErrorMessage joinErr = 
(ZkInternalJoinErrorMessage)obj;
+
+                    onSegmented(new IgniteSpiException(joinErr.err));
+                }
+
+                onProcessEnd();
+            }
+            catch (Throwable e) {
+                onProcessError(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void process0(WatchedEvent evt) {
+            if (rtState.errForClose != null || rtState.joined)
+                return;
+
+            if (evt.getType() == Event.EventType.NodeDataChanged)
+                rtState.zkClient.getDataAsync(evt.getPath(), this, this);
+        }
+    }
+
+    /**
+     * @param aliveNodes Alive nodes.
+     * @throws Exception If failed.
+     */
+    private void checkIsCoordinator(final List<String> aliveNodes) throws 
Exception {
+        assert !locNode.isClient();
+
+        TreeMap<Long, String> aliveSrvs = new TreeMap<>();
+
+        long locInternalOrder = rtState.internalOrder;
+
+        for (String aliveNodePath : aliveNodes) {
+            if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
+                continue;
+
+            Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
+
+            aliveSrvs.put(internalId, aliveNodePath);
+        }
+
+        assert !aliveSrvs.isEmpty();
+
+        Map.Entry<Long, String> crdE = aliveSrvs.firstEntry();
+
+        if (locInternalOrder == crdE.getKey())
+            onBecomeCoordinator(aliveNodes);
+        else {
+            assert aliveSrvs.size() > 1 : aliveSrvs;
+
+            Map.Entry<Long, String> prevE = 
aliveSrvs.floorEntry(locInternalOrder - 1);
+
+            assert prevE != null;
+
+            if (log.isInfoEnabled()) {
+                log.info("Discovery coordinator already exists, watch for 
previous server node [" +
+                    "locId=" + locNode.id() +
+                    ", watchPath=" + prevE.getValue() + ']');
+             }
+
+            PreviousNodeWatcher watcher = new 
ServerPreviousNodeWatcher(rtState);
+
+            rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
prevE.getValue(), watcher, watcher);
+        }
+    }
+
+    /**
+     * @param aliveNodes Alive nodes.
+     * @throws Exception If failed.
+     */
+    private void checkClientsStatus(final List<String> aliveNodes) throws 
Exception {
+        assert locNode.isClient() : locNode;
+        assert rtState.joined;
+        assert rtState.evtsData != null;
+
+        TreeMap<Long, String> aliveClients = new TreeMap<>();
+
+        String srvPath = null;
+        Long srvInternalOrder = null;
+
+        long locInternalOrder = rtState.internalOrder;
+
+        for (String aliveNodePath : aliveNodes) {
+            Long internalId = ZkIgnitePaths.aliveInternalId(aliveNodePath);
+
+            if (ZkIgnitePaths.aliveNodeClientFlag(aliveNodePath))
+                aliveClients.put(internalId, aliveNodePath);
+            else {
+                if (srvInternalOrder == null || internalId < srvInternalOrder) 
{
+                    srvPath = aliveNodePath;
+                    srvInternalOrder = internalId;
+                }
+            }
+        }
+
+        assert !aliveClients.isEmpty();
+
+        Map.Entry<Long, String> oldest = aliveClients.firstEntry();
+
+        boolean oldestClient = locInternalOrder == oldest.getKey();
+
+        if (srvPath == null) {
+            if (oldestClient) {
+                Stat stat = new Stat();
+
+                ZkDiscoveryEventsData prevEvts = rtState.evtsData;
+
+                byte[] evtsBytes = rtState.zkClient.getData(zkPaths.evtsPath, 
stat);
+
+                assert evtsBytes.length > 0;
+
+                ZkDiscoveryEventsData newEvts = unmarshalZip(evtsBytes);
+
+                if (prevEvts.clusterId.equals(newEvts.clusterId)) {
+                    U.warn(log, "All server nodes failed, notify all clients 
[locId=" + locNode.id() + ']');
+
+                    generateNoServersEvent(newEvts, stat);
+                }
+                else
+                    U.warn(log, "All server nodes failed (received events from 
new cluster).");
+            }
+        }
+        else {
+            String watchPath;
+
+            if (oldestClient) {
+                watchPath = srvPath;
+
+                if (log.isInfoEnabled()) {
+                    log.info("Servers exists, watch for server node [locId=" + 
locNode.id() +
+                        ", watchPath=" + watchPath + ']');
+                }
+            }
+            else {
+                assert aliveClients.size() > 1 : aliveClients;
+
+                Map.Entry<Long, String> prevE = 
aliveClients.floorEntry(locInternalOrder - 1);
+
+                assert prevE != null;
+
+                watchPath = prevE.getValue();
+
+                if (log.isInfoEnabled()) {
+                    log.info("Servers exists, watch for previous node [locId=" 
+ locNode.id() +
+                        ", watchPath=" + watchPath + ']');
+                }
+            }
+
+            PreviousNodeWatcher watcher = new 
ClientPreviousNodeWatcher(rtState);
+
+            rtState.zkClient.existsAsync(zkPaths.aliveNodesDir + "/" + 
watchPath, watcher, watcher);
+        }
+    }
+
+    /**
+     * @param evtsData Events data.
+     * @param evtsStat Events zookeeper state.
+     * @throws Exception If failed.
+     */
+    private void generateNoServersEvent(ZkDiscoveryEventsData evtsData, Stat 
evtsStat) throws Exception {
+        evtsData.evtIdGen++;
+
+        ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
+            evtsData.evtIdGen,
+            0L,
+            evtsData.topVer,
+            locNode.id(),
+            new ZkNoServersMessage(),
+            null);
+
+        Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();
+
+        evtsData.addEvent(nodesToAck, evtData);
+
+        byte[] newEvtsBytes = marshalZip(evtsData);
+
+        try {
+            rtState.zkClient.setData(zkPaths.evtsPath, newEvtsBytes, 
evtsStat.getVersion());
+        }
+        catch (KeeperException.BadVersionException e) {
+            // Version can change if new cluster started and saved new events.
+            if (log.isDebugEnabled())
+                log.debug("Failed to save no servers message");
+        }
+    }
+
+    /**
+     * @param lastEvts Last events from previous coordinator.
+     * @throws Exception If failed.
+     */
+    private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) 
throws Exception {
+        for (ZkDiscoveryEventData evtData : lastEvts.evts.values()) {
+            if (evtData instanceof ZkDiscoveryCustomEventData) {
+                ZkDiscoveryCustomEventData evtData0 = 
(ZkDiscoveryCustomEventData)evtData;
+
+                // It is possible previous coordinator failed before finished 
cleanup.
+                if (evtData0.msg instanceof 
ZkCommunicationErrorResolveFinishMessage) {
+                    try {
+                        ZkCommunicationErrorResolveFinishMessage msg =
+                            
(ZkCommunicationErrorResolveFinishMessage)evtData0.msg;
+
+                        ZkCommunicationErrorResolveResult res = unmarshalZip(
+                            
ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, 
msg.futId));
+
+                        deleteAliveNodes(res.killedNodes);
+                    }
+                    catch (KeeperException.NoNodeException ignore) {
+                        // No-op.
+                    }
+                }
+                else if (evtData0.resolvedMsg instanceof 
ZkForceNodeFailMessage)
+                    
deleteAliveNode(((ZkForceNodeFailMessage)evtData0.resolvedMsg).nodeInternalId);
+            }
+        }
+    }
+
+    /**
+     * @param aliveNodes Alive nodes paths.
+     * @throws Exception If failed.
+     */
+    private void onBecomeCoordinator(List<String> aliveNodes) throws Exception 
{
+        ZkDiscoveryEventsData prevEvts = 
processNewEvents(rtState.zkClient.getData(zkPaths.evtsPath));
+
+        rtState.crd = true;
+
+        if (rtState.joined) {
+            if (log.isInfoEnabled())
+                log.info("Node is new discovery coordinator [locId=" + 
locNode.id() + ']');
+
+            assert locNode.order() > 0 : locNode;
+            assert rtState.evtsData != null;
+
+            previousCoordinatorCleanup(rtState.evtsData);
+
+            UUID futId = rtState.evtsData.communicationErrorResolveFutureId();
+
+            if (futId != null) {
+                if (log.isInfoEnabled()) {
+                    log.info("New discovery coordinator will handle already 
started cluster-wide communication " +
+                        "error resolve [reqId=" + futId + ']');
+                }
+
+                ZkCommunicationErrorProcessFuture fut = commErrProcFut.get();
+
+                ZkDistributedCollectDataFuture collectResFut = 
collectCommunicationStatusFuture(futId);
+
+                if (fut != null)
+                    fut.nodeResultCollectFuture(collectResFut);
+            }
+
+            for (ZkDiscoveryEventData evtData : rtState.evtsData.evts.values())
+                evtData.initRemainingAcks(rtState.top.nodesByOrder.values());
+
+            handleProcessedEvents("crd");
+        }
+        else {
+            String locAlivePath = 
rtState.locNodeZkPath.substring(rtState.locNodeZkPath.lastIndexOf('/') + 1);
+
+            deleteJoiningNodeData(locNode.id(),
+                ZkIgnitePaths.aliveNodePrefixId(locAlivePath),
+                rtState.joinDataPartCnt);
+
+            DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
+
+            if (nodeAuth != null) {
+                try {
+                    if (log.isInfoEnabled()) {
+                        log.info("Node is first server node in cluster, try 
authenticate local node " +
+                            "[locId=" + locNode.id() + ']');
+                    }
+
+                    localAuthentication(nodeAuth, 
unmarshalCredentials(locNode));
+                }
+                catch (Exception e) {
+                    U.warn(log, "Local node authentication failed: " + e, e);
+
+                    onSegmented(e);
+
+                    // Stop any further processing.
+                    throw new ZookeeperClientFailedException("Local node 
authentication failed: " + e);
+                }
+            }
+
+            newClusterStarted(prevEvts);
+        }
+
+        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
+        rtState.zkClient.getChildrenAsync(zkPaths.customEvtsDir, 
rtState.watcher, rtState.watcher);
+
+        for (String alivePath : aliveNodes)
+            watchAliveNodeData(alivePath);
+    }
+
+    /**
+     * @param alivePath Node path.
+     */
+    private void watchAliveNodeData(String alivePath) {
+        assert rtState.locNodeZkPath != null;
+
+        String path = zkPaths.aliveNodesDir + "/" + alivePath;
+
+        if (!path.equals(rtState.locNodeZkPath))
+            rtState.zkClient.getDataAsync(path, rtState.aliveNodeDataWatcher, 
rtState.aliveNodeDataWatcher);
+    }
+
+    /**
+     * @param aliveNodes ZK nodes representing alive cluster nodes.
+     * @throws Exception If failed.
+     */
+    private void generateTopologyEvents(List<String> aliveNodes) throws 
Exception {
+        assert rtState.crd;
+
+        if (log.isInfoEnabled())
+            log.info("Process alive nodes change [alives=" + aliveNodes.size() 
+ "]");
+
+        if (rtState.updateAlives) {
+            aliveNodes = rtState.zkClient.getChildren(zkPaths.aliveNodesDir);
+
+            rtState.updateAlives = false;
+        }
+
+        TreeMap<Long, String> alives = new TreeMap<>();
+
+        for (String child : aliveNodes) {
+            Long internalId = ZkIgnitePaths.aliveInternalId(child);
+
+            Object old = alives.put(internalId, child);
+
+            assert old == null;
+        }
+
+        TreeMap<Long, ZookeeperClusterNode> curTop = new 
TreeMap<>(rtState.top.nodesByOrder);
+
+        int newEvts = 0;
+
+        final int MAX_NEW_EVTS = 
IgniteSystemProperties.getInteger(IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS, 100);
+
+        List<ZookeeperClusterNode> failedNodes = null;
+
+        for (Map.Entry<Long, ZookeeperClusterNode> e : 
rtState.top.nodesByInternalId.entrySet()) {
+            if (!alives.containsKey(e.getKey())) {
+                ZookeeperClusterNode failedNode = e.getValue();
+
+                if (failedNodes == null)
+                    failedNodes = new ArrayList<>();
+
+                failedNodes.add(failedNode);
+
+                generateNodeFail(curTop, failedNode);
+
+                newEvts++;
+
+                if (newEvts == MAX_NEW_EVTS) {
+                    saveAndProcessNewEvents();
+
+                    if (log.isInfoEnabled()) {
+                        log.info("Delay alive nodes change process, max event 
threshold reached [newEvts=" + newEvts +
+                            ", totalEvts=" + rtState.evtsData.evts.size() + 
']');
+                    }
+
+                    handleProcessedEventsOnNodesFail(failedNodes);
+
+                    throttleNewEventsGeneration();
+
+                    rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
+
+                    return;
+                }
+            }
+        }
+
+        // Process failures before processing join, otherwise conflicts are 
possible in case of fast node stop/re-start.
+        if (newEvts > 0) {
+            saveAndProcessNewEvents();
+
+            handleProcessedEventsOnNodesFail(failedNodes);
+
+            rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
+
+            return;
+        }
+
+        generateJoinEvents(curTop, alives, MAX_NEW_EVTS);
+
+        if (failedNodes != null)
+            handleProcessedEventsOnNodesFail(failedNodes);
+    }
+
+    /**
+     * @param curTop Current topology.
+     * @param alives Alive znodes.
+     * @param MAX_NEW_EVTS Max event to process.
+     * @throws Exception If failed.
+     */
+    private void generateJoinEvents(TreeMap<Long, ZookeeperClusterNode> curTop,
+        TreeMap<Long, String> alives,
+        final int MAX_NEW_EVTS) throws Exception
+    {
+       ZkBulkJoinContext joinCtx = new ZkBulkJoinContext();
+
+       for (Map.Entry<Long, String> e : alives.entrySet()) {
+           Long internalId = e.getKey();
+
+           if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
+               UUID rslvFutId = 
rtState.evtsData.communicationErrorResolveFutureId();
+
+               if (rslvFutId != null) {
+                   if (log.isInfoEnabled()) {
+                       log.info("Delay alive nodes change process while 
communication error resolve " +
+                           "is in progress [reqId=" + rslvFutId + ']');
+                   }
+
+                   break;
+               }
+
+               processJoinOnCoordinator(joinCtx, curTop, internalId, 
e.getValue());
+
+               if (joinCtx.nodes() == MAX_NEW_EVTS) {
+                   generateBulkJoinEvent(curTop, joinCtx);
+
+                   if (log.isInfoEnabled()) {
+                       log.info("Delay alive nodes change process, max event 
threshold reached [" +
+                           "newEvts=" + joinCtx.nodes() +
+                           ", totalEvts=" + rtState.evtsData.evts.size() + 
']');
+                   }
+
+                   throttleNewEventsGeneration();
+
+                   rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
+
+                   return;
+               }
+           }
+       }
+
+       if (joinCtx.nodes() > 0)
+           generateBulkJoinEvent(curTop, joinCtx);
+    }
+
+    /**
+     * @param curTop Current topology.
+     * @param joinCtx Joined nodes context.
+     * @throws Exception If failed.
+     */
+    private void generateBulkJoinEvent(TreeMap<Long, ZookeeperClusterNode> 
curTop, ZkBulkJoinContext joinCtx)
+        throws Exception
+    {
+        rtState.evtsData.evtIdGen++;
+
+        long evtId = rtState.evtsData.evtIdGen;
+
+        List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes = 
joinCtx.nodes;
+
+        assert nodes != null && nodes.size() > 0;
+
+        int nodeCnt = nodes.size();
+
+        List<ZkJoinedNodeEvtData> joinedNodes = new ArrayList<>(nodeCnt);
+
+        Map<Long, byte[]> discoDataMap = U.newHashMap(nodeCnt);
+        Map<Long, Long> dupDiscoData = null;
+
+        for (int i = 0; i < nodeCnt; i++) {
+            T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>> nodeEvtData = 
nodes.get(i);
+
+            Map<Integer, Serializable> discoData = nodeEvtData.get2();
+
+            byte[] discoDataBytes = U.marshal(marsh, discoData);
+
+            Long dupDataNode = null;
+
+            for (Map.Entry<Long, byte[]> e : discoDataMap.entrySet()) {
+                if (Arrays.equals(discoDataBytes, e.getValue())) {
+                    dupDataNode = e.getKey();
+
+                    break;
+                }
+            }
+
+            long nodeTopVer = nodeEvtData.get1().topVer;
+
+            if (dupDataNode != null) {
+                if (dupDiscoData == null)
+                    dupDiscoData = new HashMap<>();
+
+                Long old = dupDiscoData.put(nodeTopVer, dupDataNode);
+
+                assert old == null : old;
+            }
+            else
+                discoDataMap.put(nodeTopVer, discoDataBytes);
+
+            joinedNodes.add(nodeEvtData.get1());
+        }
+
+        int overhead = 5;
+
+        ZkJoinEventDataForJoined dataForJoined = new ZkJoinEventDataForJoined(
+            new ArrayList<>(curTop.values()),
+            discoDataMap,
+            dupDiscoData);
+
+        byte[] dataForJoinedBytes = marshalZip(dataForJoined);
+
+        long addDataStart = System.currentTimeMillis();
+
+        int dataForJoinedPartCnt = 
saveData(zkPaths.joinEventDataPathForJoined(evtId),
+            dataForJoinedBytes,
+            overhead);
+
+        long addDataTime = System.currentTimeMillis() - addDataStart;
+
+        ZkDiscoveryNodeJoinEventData evtData = new 
ZkDiscoveryNodeJoinEventData(
+            evtId,
+            rtState.evtsData.topVer,
+            joinedNodes,
+            dataForJoinedPartCnt);
+
+        rtState.evtsData.addEvent(curTop.values(), evtData);
+
+        if (log.isInfoEnabled()) {
+            if (nodeCnt > 1) {
+                log.info("Generated NODE_JOINED bulk event [" +
+                    "nodeCnt=" + nodeCnt +
+                    ", dataForJoinedSize=" + dataForJoinedBytes.length +
+                    ", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
+                    ", addDataTime=" + addDataTime +
+                    ", evt=" + evtData + ']');
+            }
+            else {
+                log.info("Generated NODE_JOINED event [" +
+                    "dataForJoinedSize=" + dataForJoinedBytes.length +
+                    ", dataForJoinedPartCnt=" + dataForJoinedPartCnt +
+                    ", addDataTime=" + addDataTime +
+                    ", evt=" + evtData + ']');
+            }
+        }
+
+        saveAndProcessNewEvents();
+    }
+
+    /**
+     *
+     */
+    private void throttleNewEventsGeneration() {
+        long delay = 
IgniteSystemProperties.getLong(IGNITE_ZOOKEEPER_DISCOVERY_SPI_EVTS_THROTTLE, 0);
+
+        if (delay > 0) {
+            if (log.isInfoEnabled())
+                log.info("Sleep delay before generate new events [delay=" + 
delay + ']');
+
+            try {
+                Thread.sleep(delay);
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param prefixId Path prefix.
+     * @return Join data.
+     * @throws Exception If failed.
+     */
+    private ZkJoiningNodeData unmarshalJoinData(UUID nodeId, UUID prefixId) 
throws Exception {
+        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
+
+        byte[] joinData = rtState.zkClient.getData(joinDataPath);
+
+        Object dataObj = unmarshalZip(joinData);
+
+        if (!(dataObj instanceof ZkJoiningNodeData))
+            throw new Exception("Invalid joined node data: " + dataObj);
+
+        ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
+
+        if (joiningNodeData.partCount() > 1) {
+            joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", 
joiningNodeData.partCount());
+
+            joiningNodeData = unmarshalZip(joinData);
+        }
+
+        return joiningNodeData;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param prefixId Path prefix.
+     * @param aliveNodePath Node path.
+     * @return Join data.
+     * @throws Exception If failed.
+     */
+    private Object unmarshalJoinDataOnCoordinator(UUID nodeId, UUID prefixId, 
String aliveNodePath) throws Exception {
+        String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, prefixId);
+
+        byte[] joinData = rtState.zkClient.getData(joinDataPath);
+
+        Object dataObj;
+
+        try {
+            dataObj = unmarshalZip(joinData);
+
+            if (dataObj instanceof ZkInternalJoinErrorMessage)
+                return dataObj;
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to unmarshal joining node data [nodePath=" + 
aliveNodePath + "']", e);
+
+            return new 
ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
+                "Failed to unmarshal join data: " + e);
+        }
+
+        assert dataObj instanceof ZkJoiningNodeData : dataObj;
+
+        ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)dataObj;
+
+        if (joiningNodeData.partCount() > 1) {
+            joinData = readMultipleParts(rtState.zkClient, joinDataPath + ":", 
joiningNodeData.partCount());
+
+            try {
+                joiningNodeData = unmarshalZip(joinData);
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to unmarshal joining node data 
[nodePath=" + aliveNodePath + "']", e);
+
+                return new 
ZkInternalJoinErrorMessage(ZkIgnitePaths.aliveInternalId(aliveNodePath),
+                    "Failed to unmarshal join data: " + e);
+            }
+        }
+
+        assert joiningNodeData.node() != null : joiningNodeData;
+
+        return joiningNodeData;
+    }
+
+    /**
+     * @param joinCtx Joined nodes context.
+     * @param curTop Current nodes.
+     * @param internalId Joined node internal ID.
+     * @param aliveNodePath Joined node path.
+     * @throws Exception If failed.
+     */
+    private void processJoinOnCoordinator(
+        ZkBulkJoinContext joinCtx,
+        TreeMap<Long, ZookeeperClusterNode> curTop,
+        long internalId,
+        String aliveNodePath)
+        throws Exception
+    {
+        UUID nodeId = ZkIgnitePaths.aliveNodeId(aliveNodePath);
+        UUID prefixId = ZkIgnitePaths.aliveNodePrefixId(aliveNodePath);
+
+        Object data = unmarshalJoinDataOnCoordinator(nodeId, prefixId, 
aliveNodePath);
+
+        if (data instanceof ZkJoiningNodeData) {
+            ZkJoiningNodeData joiningNodeData = (ZkJoiningNodeData)data;
+
+            ZkNodeValidateResult validateRes = 
validateJoiningNode(joiningNodeData.node());
+
+            if (validateRes.err == null) {
+                ZookeeperClusterNode joinedNode = joiningNodeData.node();
+
+                assert nodeId.equals(joinedNode.id()) : joiningNodeData.node();
+
+                addJoinedNode(
+                    joinCtx,
+                    curTop,
+                    joiningNodeData,
+                    internalId,
+                    prefixId,
+                    validateRes.secSubjZipBytes);
+
+                watchAliveNodeData(aliveNodePath);
+            }
+            else {
+                ZkInternalJoinErrorMessage joinErr = new 
ZkInternalJoinErrorMessage(
+                    ZkIgnitePaths.aliveInternalId(aliveNodePath),
+                    validateRes.err);
+
+                processJoinError(aliveNodePath, nodeId, prefixId, joinErr);
+            }
+        }
+        else {
+            assert data instanceof ZkInternalJoinErrorMessage : data;
+
+            processJoinError(aliveNodePath, nodeId, prefixId, 
(ZkInternalJoinErrorMessage)data);
+        }
+    }
+
+    /**
+     * @param aliveNodePath Joined node path.
+     * @param nodeId Node ID.
+     * @param prefixId Path prefix ID.
+     * @param joinErr Join error message.
+     * @throws Exception If failed.
+     */
+    private void processJoinError(String aliveNodePath,
+        UUID nodeId,
+        UUID prefixId,
+        ZkInternalJoinErrorMessage joinErr) throws Exception {
+        ZookeeperClient client = rtState.zkClient;
+
+        if (joinErr.notifyNode) {
+            String joinDataPath = zkPaths.joiningNodeDataPath(nodeId, 
prefixId);
+
+            client.setData(joinDataPath, marshalZip(joinErr), -1);
+
+            client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, 
-1);
+        }
+        else {
+            if (log.isInfoEnabled())
+                log.info("Ignore join data, node was failed by previous 
coordinator: " + aliveNodePath);
+
+            client.deleteIfExists(zkPaths.aliveNodesDir + "/" + aliveNodePath, 
-1);
+        }
+    }
+
+    /**
+     * @param node Joining node.
+     * @return Validation result.
+     */
+    private ZkNodeValidateResult validateJoiningNode(ZookeeperClusterNode 
node) {
+        ZookeeperClusterNode node0 = rtState.top.nodesById.get(node.id());
+
+        if (node0 != null) {
+            U.error(log, "Failed to include node in cluster, node with the 
same ID already exists [joiningNode=" + node +
+                ", existingNode=" + node0 + ']');
+
+            // Note: exception message is checked in tests.
+            return new ZkNodeValidateResult("Node with the same ID already 
exists: " + node0);
+        }
+
+        ZkNodeValidateResult res = authenticateNode(node);
+
+        if (res.err != null)
+            return res;
+
+        IgniteNodeValidationResult err = 
spi.getSpiContext().validateNode(node);
+
+        if (err != null) {
+            LT.warn(log, err.message());
+
+            res.err = err.sendMessage();
+        }
+
+        return res;
+    }
+
+    /**
+     * @param node Node.
+     * @return Validation result.
+     */
+    private ZkNodeValidateResult authenticateNode(ZookeeperClusterNode node) {
+        DiscoverySpiNodeAuthenticator nodeAuth = spi.getAuthenticator();
+
+        if (nodeAuth == null)
+            return new ZkNodeValidateResult((byte[])null);
+
+        SecurityCredentials cred;
+
+        try {
+            cred = unmarshalCredentials(node);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to unmarshal node credentials: " + e, e);
+
+            return new ZkNodeValidateResult("Failed to unmarshal node 
credentials");
+        }
+
+        SecurityContext subj = nodeAuth.authenticateNode(node, cred);
+
+        if (subj == null) {
+            U.warn(log, "Authentication failed [nodeId=" + node.id() +
+                    ", addrs=" + U.addressesAsString(node) + ']',
+                "Authentication failed [nodeId=" + U.id8(node.id()) + ", 
addrs=" +
+                    U.addressesAsString(node) + ']');
+
+            // Note: exception message test is checked in tests.
+            return new ZkNodeValidateResult("Authentication failed");
+        }
+
+        if (!(subj instanceof Serializable)) {
+            U.warn(log, "Authentication subject is not Serializable [nodeId=" 
+ node.id() +
+                    ", addrs=" + U.addressesAsString(node) + ']',
+                "Authentication subject is not Serializable [nodeId=" + 
U.id8(node.id()) +
+                    ", addrs=" +
+                    U.addressesAsString(node) + ']');
+
+            return new ZkNodeValidateResult("Authentication subject is not 
serializable");
+        }
+
+        byte[] secSubjZipBytes;
+
+        try {
+            secSubjZipBytes = marshalZip(subj);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to marshal node security subject: " + e, e);
+
+            return new ZkNodeValidateResult("Failed to marshal node security 
subject");
+        }
+
+        return new ZkNodeValidateResult(secSubjZipBytes);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void saveAndProcessNewEvents() throws Exception {
+        if (stopping())
+            return;
+
+        long start = System.currentTimeMillis();
+
+        byte[] evtsBytes = marshalZip(rtState.evtsData);
+
+        rtState.zkClient.setData(zkPaths.evtsPath, evtsBytes, -1);
+
+        long time = System.currentTimeMillis() - start;
+
+        if (prevSavedEvtsTopVer != rtState.evtsData.topVer) {
+            if (log.isInfoEnabled()) {
+                log.info("Discovery coordinator saved new topology events 
[topVer=" + rtState.evtsData.topVer +
+                    ", size=" + evtsBytes.length +
+                    ", evts=" + rtState.evtsData.evts.size() +
+                    ", lastEvt=" + rtState.evtsData.evtIdGen +
+                    ", saveTime=" + time + ']');
+            }
+
+            prevSavedEvtsTopVer = rtState.evtsData.topVer;
+        }
+        else if (log.isDebugEnabled()) {
+            log.debug("Discovery coordinator saved new topology events 
[topVer=" + rtState.evtsData.topVer +
+                ", size=" + evtsBytes.length +
+                ", evts=" + rtState.evtsData.evts.size() +
+                ", lastEvt=" + rtState.evtsData.evtIdGen +
+                ", saveTime=" + time + ']');
+        }
+
+        processNewEvents(rtState.evtsData);
+    }
+
+    /**
+     * @param curTop Current topology.
+     * @param failedNode Failed node.
+     */
+    private void generateNodeFail(TreeMap<Long, ZookeeperClusterNode> curTop, 
ZookeeperClusterNode failedNode) {
+        Object rmvd = curTop.remove(failedNode.order());
+
+        assert rmvd != null;
+
+        rtState.evtsData.topVer++;
+        rtState.evtsData.evtIdGen++;
+
+        ZkDiscoveryNodeFailEventData evtData = new 
ZkDiscoveryNodeFailEventData(
+            rtState.evtsData.evtIdGen,
+            rtState.evtsData.topVer,
+            failedNode.internalId());
+
+        rtState.evtsData.addEvent(curTop.values(), evtData);
+
+        if (log.isInfoEnabled())
+            log.info("Generated NODE_FAILED event [evt=" + evtData + ']');
+    }
+
+    /**
+     * @param curTop Current nodes.
+     * @param joiningNodeData Join data.
+     * @param internalId Joined node internal ID.
+     * @param prefixId Unique path prefix.
+     * @param secSubjZipBytes Marshalled security subject.
+     * @throws Exception If failed.
+     */
+    private void addJoinedNode(
+        ZkBulkJoinContext joinCtx,
+        TreeMap<Long, ZookeeperClusterNode> curTop,
+        ZkJoiningNodeData joiningNodeData,
+        long internalId,
+        UUID prefixId,
+        @Nullable byte[] secSubjZipBytes)
+        throws Exception
+    {
+        ZookeeperClusterNode joinedNode = joiningNodeData.node();
+
+        UUID nodeId = joinedNode.id();
+
+        rtState.evtsData.topVer++;
+
+        joinedNode.order(rtState.evtsData.topVer);
+        joinedNode.internalId(internalId);
+
+        DiscoveryDataBag joiningNodeBag = new DiscoveryDataBag(nodeId, 
joiningNodeData.node().isClient());
+
+        joiningNodeBag.joiningNodeData(joiningNodeData.discoveryData());
+
+        exchange.onExchange(joiningNodeBag);
+
+        DiscoveryDataBag collectBag = new DiscoveryDataBag(nodeId,
+            new HashSet<Integer>(),
+            joiningNodeData.node().isClient());
+
+        collectBag.joiningNodeData(joiningNodeBag.joiningNodeData());
+
+        exchange.collect(collectBag);
+
+        Map<Integer, Serializable> commonData = collectBag.commonData();
+
+        Object old = curTop.put(joinedNode.order(), joinedNode);
+
+        assert old == null;
+
+        int overhead = 5;
+
+        int secSubjPartCnt = 0;
+
+        if (secSubjZipBytes != null) {
+            secSubjPartCnt = 
saveData(zkPaths.joinEventSecuritySubjectPath(joinedNode.order()),
+                secSubjZipBytes,
+                overhead);
+
+            assert secSubjPartCnt > 0 : secSubjPartCnt;
+
+            setNodeSecuritySubject(joinedNode, secSubjZipBytes);
+        }
+
+        ZkJoinedNodeEvtData nodeEvtData = new ZkJoinedNodeEvtData(
+            rtState.evtsData.topVer,
+            joinedNode.id(),
+            joinedNode.internalId(),
+            prefixId,
+            joiningNodeData.partCount(),
+            secSubjPartCnt);
+
+        nodeEvtData.joiningNodeData = joiningNodeData;
+
+        joinCtx.addJoinedNode(nodeEvtData, commonData);
+
+        rtState.evtsData.onNodeJoin(joinedNode);
+    }
+
+    /**
+     * @param path Path to save.
+     * @param bytes Bytes to save.
+     * @param overhead Extra overhead.
+     * @return Parts count.
+     * @throws Exception If failed.
+     */
+    private int saveData(String path, byte[] bytes, int overhead) throws 
Exception {
+        int dataForJoinedPartCnt = 1;
+
+        if (rtState.zkClient.needSplitNodeData(path, bytes, overhead)) {
+            dataForJoinedPartCnt = saveMultipleParts(rtState.zkClient,
+                path,
+                rtState.zkClient.splitNodeData(path, bytes, overhead));
+        }
+        else {
+            rtState.zkClient.createIfNeeded(multipartPathName(path, 0),
+                bytes,
+                PERSISTENT);
+        }
+
+        return dataForJoinedPartCnt;
+    }
+
+    /**
+     * @param prevEvts Events from previous cluster.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void newClusterStarted(@Nullable ZkDiscoveryEventsData prevEvts) 
throws Exception {
+        assert !locNode.isClient() : locNode;
+
+        long locInternalId = rtState.internalOrder;
+
+        assert prevEvts == null || prevEvts.maxInternalOrder < locInternalId;
+
+        spi.getSpiContext().removeTimeoutObject(rtState.joinErrTo);
+
+        cleanupPreviousClusterData(prevEvts != null ? 
prevEvts.maxInternalOrder + 1 : -1L);
+
+        rtState.joined = true;
+        rtState.gridStartTime = System.currentTimeMillis();
+
+        rtState.evtsData = 
ZkDiscoveryEventsData.createForNewCluster(rtState.gridStartTime);
+
+        if (log.isInfoEnabled()) {
+            log.info("New cluster started [locId=" + locNode.id() +
+                ", clusterId=" + rtState.evtsData.clusterId +
+                ", startTime=" + rtState.evtsData.clusterStartTime + ']');
+        }
+
+        locNode.internalId(locInternalId);
+        locNode.order(1);
+
+        rtState.evtsData.onNodeJoin(locNode);
+
+        rtState.top.addNode(locNode);
+
+        final List<ClusterNode> topSnapshot = 
Collections.singletonList((ClusterNode)locNode);
+
+        lsnr.onDiscovery(EVT_NODE_JOINED,
+            1L,
+            locNode,
+            topSnapshot,
+            Collections.<Long, Collection<ClusterNode>>emptyMap(),
+            null);
+
+        // Reset events (this is also notification for clients left from 
previous cluster).
+        rtState.zkClient.setData(zkPaths.evtsPath, 
marshalZip(rtState.evtsData), -1);
+
+        joinFut.onDone();
+    }
+
+    /**
+     * @param startInternalOrder Starting internal order for cluster (znodes 
having lower order belong
+     *      to clients from previous cluster and should be removed).
+
+     * @throws Exception If failed.
+     */
+    private void cleanupPreviousClusterData(long startInternalOrder) throws 
Exception {
+        long start = System.currentTimeMillis();
+
+        ZookeeperClient client = rtState.zkClient;
+
+        // TODO ZK: use multi, better batching + max-size safe + 
NoNodeException safe.
+        List<String> evtChildren = 
rtState.zkClient.getChildren(zkPaths.evtsPath);
+
+        for (String evtPath : evtChildren) {
+            String evtDir = zkPaths.evtsPath + "/" + evtPath;
+
+            removeChildren(evtDir);
+        }
+
+        client.deleteAll(zkPaths.evtsPath, evtChildren, -1);
+
+        client.deleteAll(zkPaths.customEvtsDir,
+            client.getChildren(zkPaths.customEvtsDir),
+            -1);
+
+        rtState.zkClient.deleteAll(zkPaths.customEvtsPartsDir,
+            rtState.zkClient.getChildren(zkPaths.customEvtsPartsDir),
+            -1);
+
+        rtState.zkClient.deleteAll(zkPaths.customEvtsAcksDir,
+            rtState.zkClient.getChildren(zkPaths.customEvtsAcksDir),
+            -1);
+
+        if (startInternalOrder > 0) {
+            for (String alive : 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
+                if (ZkIgnitePaths.aliveInternalId(alive) < startInternalOrder)
+                    rtState.zkClient.deleteIfExists(zkPaths.aliveNodesDir + 
"/" + alive, -1);
+            }
+        }
+
+        long time = System.currentTimeMillis() - start;
+
+        if (time > 0) {
+            if (log.isInfoEnabled())
+                log.info("Previous cluster data cleanup time: " + time);
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @throws Exception If failed.
+     */
+    private void removeChildren(String path) throws Exception {
+        rtState.zkClient.deleteAll(path, rtState.zkClient.getChildren(path), 
-1);
+    }
+
+    /**
+     * @param zkClient Client.
+     * @param evtPath Event path.
+     * @param sndNodeId Sender node ID.
+     * @return Event data.
+     * @throws Exception If failed.
+     */
+    private byte[] readCustomEventData(ZookeeperClient zkClient, String 
evtPath, UUID sndNodeId) throws Exception {
+        int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
+
+        if (partCnt > 1) {
+            String partsBasePath = zkPaths.customEventPartsBasePath(
+                ZkIgnitePaths.customEventPrefix(evtPath), sndNodeId);
+
+            return readMultipleParts(zkClient, partsBasePath, partCnt);
+        }
+        else
+            return zkClient.getData(zkPaths.customEvtsDir + "/" + evtPath);
+    }
+
+    /**
+     * @param customEvtNodes ZK nodes representing custom events to process.
+     * @throws Exception If failed.
+     */
+    private void generateCustomEvents(List<String> customEvtNodes) throws 
Exception {
+        assert rtState.crd;
+
+        ZookeeperClient zkClient = rtState.zkClient;
+        ZkDiscoveryEventsData evtsData = rtState.evtsData;
+
+        TreeMap<Integer, String> unprocessedEvts = null;
+
+        for (int i = 0; i < customEvtNodes.size(); i++) {
+            String evtPath = customEvtNodes.get(i);
+
+            int evtSeq = ZkIgnitePaths.customEventSequence(evtPath);
+
+            if (evtSeq > evtsData.procCustEvt) {
+                if (unprocessedEvts == null)
+                    unprocessedEvts = new TreeMap<>();
+
+                unprocessedEvts.put(evtSeq, evtPath);
+            }
+        }
+
+        if (unprocessedEvts == null)
+            return;
+
+        for (Map.Entry<Integer, String> evtE : unprocessedEvts.entrySet()) {
+            evtsData.procCustEvt = evtE.getKey();
+
+            String evtPath = evtE.getValue();
+
+            UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
+
+            ZookeeperClusterNode sndNode = 
rtState.top.nodesById.get(sndNodeId);
+
+            if (sndNode != null) {
+                byte[] evtBytes = readCustomEventData(zkClient, evtPath, 
sndNodeId);
+
+                DiscoverySpiCustomMessage msg;
+
+                try {
+                    msg = unmarshalZip(evtBytes);
+                }
+                catch (Exception e) {
+                    U.error(log, "Failed to unmarshal custom discovery 
message: " + e, e);
+
+                    deleteCustomEventDataAsync(rtState.zkClient, evtPath);
+
+                    continue;
+                }
+
+                generateAndProcessCustomEventOnCoordinator(evtPath, sndNode, 
msg);
+            }
+            else {
+                U.warn(log, "Ignore custom event from unknown node: " + 
sndNodeId);
+
+                deleteCustomEventDataAsync(rtState.zkClient, evtPath);
+            }
+        }
+    }
+
+    /**
+     * @param evtPath Event data path.
+     * @param sndNode Sender node.
+     * @param msg Message instance.
+     * @throws Exception If failed.
+     */
+    private void generateAndProcessCustomEventOnCoordinator(String evtPath,
+        ZookeeperClusterNode sndNode,
+        DiscoverySpiCustomMessage msg) throws Exception
+    {
+        ZookeeperClient zkClient = rtState.zkClient;
+        ZkDiscoveryEventsData evtsData = rtState.evtsData;
+
+        ZookeeperClusterNode failedNode = null;
+
+        if (msg instanceof ZkForceNodeFailMessage) {
+            ZkForceNodeFailMessage msg0 = (ZkForceNodeFailMessage)msg;
+
+            failedNode = 
rtState.top.nodesByInternalId.get(msg0.nodeInternalId);
+
+            if (failedNode != null)
+                evtsData.topVer++;
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Ignore forcible node fail request for unknown 
node: " + msg0.nodeInternalId);
+
+                deleteCustomEventDataAsync(zkClient, evtPath);
+
+                return;
+            }
+        }
+        else if (msg instanceof ZkCommunicationErrorResolveStartMessage) {
+            ZkCommunicationErrorResolveStartMessage msg0 =
+                (ZkCommunicationErrorResolveStartMessage)msg;
+
+            if (evtsData.communicationErrorResolveFutureId() != null) {
+                if (log.isInfoEnabled()) {
+                    log.info("Ignore communication error resolve message, 
resolve process " +
+                        "already started [sndNode=" + sndNode + ']');
+                }
+
+                deleteCustomEventDataAsync(zkClient, evtPath);
+
+                return;
+            }
+            else {
+                if (log.isInfoEnabled()) {
+                    log.info("Start cluster-wide communication error resolve 
[sndNode=" + sndNode +
+                        ", reqId=" + msg0.id +
+                        ", topVer=" + evtsData.topVer + ']');
+                }
+
+                
zkClient.createIfNeeded(zkPaths.distributedFutureBasePath(msg0.id),
+                    null,
+                    PERSISTENT);
+
+                evtsData.communicationErrorResolveFutureId(msg0.id);
+            }
+        }
+
+        evtsData.evtIdGen++;
+
+        ZkDiscoveryCustomEventData evtData = new ZkDiscoveryCustomEventData(
+            evtsData.evtIdGen,
+            0L,
+            evtsData.topVer,
+            sndNode.id(),
+            null,
+            evtPath);
+
+        evtData.resolvedMsg = msg;
+
+        if (log.isDebugEnabled())
+            log.debug("Generated CUSTOM event [evt=" + evtData + ", msg=" + 
msg + ']');
+
+        boolean fastStopProcess = false;
+
+        if (msg instanceof ZkInternalMessage)
+            processInternalMessage(evtData, (ZkInternalMessage)msg);
+        else {
+            notifyCustomEvent(evtData, msg);
+
+            if (msg.stopProcess()) {
+                if (log.isDebugEnabled())
+                    log.debug("Fast stop process custom event [evt=" + evtData 
+ ", msg=" + msg + ']');
+
+                fastStopProcess = true;
+
+                // No need to process this event on others nodes, skip this 
event.
+                evtsData.evts.remove(evtData.eventId());
+
+                evtsData.evtIdGen--;
+
+                DiscoverySpiCustomMessage ack = msg.ackMessage();
+
+                if (ack != null) {
+                    evtData = createAckEvent(ack, evtData);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Generated CUSTOM event (ack for fast stop 
process) [evt=" + evtData + ", msg=" + msg + ']');
+
+                    notifyCustomEvent(evtData, ack);
+                }
+                else
+                    evtData = null;
+            }
+        }
+
+        if (evtData != null) {
+            evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
+
+            rtState.locNodeInfo.lastProcEvt = evtData.eventId();
+
+            saveAndProcessNewEvents();
+
+            if (fastStopProcess)
+                deleteCustomEventDataAsync(zkClient, evtPath);
+
+            if (failedNode != null) {
+                deleteAliveNode(failedNode.internalId());
+
+                
handleProcessedEventsOnNodesFail(Collections.singletonList(failedNode));
+
+                rtState.updateAlives = true;
+            }
+        }
+    }
+
+    /**
+     * @param internalId Node internal ID.
+     * @throws Exception If failed.
+     */
+    private void deleteAliveNode(long internalId) throws Exception {
+        for (String child : 
rtState.zkClient.getChildren(zkPaths.aliveNodesDir)) {
+            if (ZkIgnitePaths.aliveInternalId(child) == internalId) {
+                // Need use sync delete to do not process again join of this 
node again.
+                rtState.zkClient.deleteIfExists(zkPaths.aliveNodesDir + "/" + 
child, -1);
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * @param zkClient Client.
+     * @param evtPath Event path.
+     */
+    private void deleteCustomEventDataAsync(ZookeeperClient zkClient, String 
evtPath) {
+        if (log.isDebugEnabled())
+            log.debug("Delete custom event data: " + evtPath);
+
+        String prefix = ZkIgnitePaths.customEventPrefix(evtPath);
+        UUID sndNodeId = ZkIgnitePaths.customEventSendNodeId(evtPath);
+        int partCnt = ZkIgnitePaths.customEventPartsCount(evtPath);
+
+        assert partCnt >= 1 : partCnt;
+
+        if (partCnt > 1) {
+            for (int i = 0; i < partCnt; i++) {
+                String path0 = zkPaths.customEventPartPath(prefix, sndNodeId, 
i);
+
+                zkClient.deleteIfExistsAsync(path0);
+            }
+        }
+
+        zkClient.deleteIfExistsAsync(zkPaths.customEvtsDir + "/" + evtPath);
+    }
+
+    /**
+     * @param data Marshalled events.
+     * @throws Exception If failed.
+     * @return Events.
+     */
+    @Nullable private ZkDiscoveryEventsData processNewEvents(byte[] data) 
throws Exception {
+        ZkDiscoveryEventsData newEvts = data.length > 0 ? 
(ZkDiscoveryEventsData)unmarshalZip(data) : null;
+
+        if (rtState.joined && (newEvts == null || 
!rtState.evtsData.clusterId.equals(newEvts.clusterId))) {
+            assert locNode.isClient() : locNode;
+
+            throw localNodeFail("All server nodes failed, client node 
disconnected (received events from new custer) " +
+                "[locId=" + locNode.id() + ']', true);
+        }
+
+        if (newEvts == null)
+            return null;
+
+        assert !rtState.crd;
+
+        // Need keep processed custom events since they contain message object 
which is needed to create ack.
+        if (!locNode.isClient() && rtState.evtsData != null) {
+            for (Map.Entry<Long, ZkDiscoveryEventData> e : 
rtState.evtsData.evts.entrySet()) {
+                ZkDiscoveryEventData evtData = e.getValue();
+
+                if (evtData.eventType() == 
ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT) {
+                    ZkDiscoveryCustomEventData evtData0 =
+                        
(ZkDiscoveryCustomEventData)newEvts.evts.get(evtData.eventId());
+
+                    if (evtData0 != null)
+                        evtData0.resolvedMsg = 
((ZkDiscoveryCustomEventData)evtData).resolvedMsg;
+                }
+            }
+        }
+
+        processNewEvents(newEvts);
+
+        if (rtState.joined)
+            rtState.evtsData = newEvts;
+
+        return newEvts;
+    }
+
+    /**
+     * @param evtsData Ev

<TRUNCATED>

Reply via email to