http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
new file mode 100644
index 0000000..21703c6
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClient.java
@@ -0,0 +1,1219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Zookeeper Client.
+ */
+public class ZookeeperClient implements Watcher {
+    /** */
+    private static final long RETRY_TIMEOUT =
+        
IgniteSystemProperties.getLong("IGNITE_ZOOKEEPER_DISCOVERY_RETRY_TIMEOUT", 
2000);
+
+    /** */
+    private static final int MAX_RETRY_COUNT =
+        
IgniteSystemProperties.getInteger("IGNITE_ZOOKEEPER_DISCOVERY_MAX_RETRY_COUNT", 
10);
+
+    /** */
+    private final AtomicInteger retryCount = new AtomicInteger();
+
+    /** */
+    private static final int MAX_REQ_SIZE = 1048528;
+
+    /** */
+    private static final List<ACL> ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    /** */
+    private static final byte[] EMPTY_BYTES = {};
+
+    /** */
+    private final ZooKeeper zk;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private ConnectionState state = ConnectionState.Disconnected;
+
+    /** */
+    private long connLossTimeout;
+
+    /** */
+    private volatile long connStartTime;
+
+    /** */
+    private final Object stateMux = new Object();
+
+    /** */
+    private final IgniteRunnable connLostC;
+
+    /** */
+    private final Timer connTimer;
+
+    /** */
+    private final ArrayDeque<ZkAsyncOperation> retryQ = new ArrayDeque<>();
+
+    /** */
+    private volatile boolean closing;
+
+    /**
+     * @param log Logger.
+     * @param connectString ZK connection string.
+     * @param sesTimeout ZK session timeout.
+     * @param connLostC Lost connection callback.
+     * @throws Exception If failed.
+     */
+    ZookeeperClient(IgniteLogger log, String connectString, int sesTimeout, 
IgniteRunnable connLostC) throws Exception {
+        this(null, log, connectString, sesTimeout, connLostC);
+    }
+
+    /**
+     * @param igniteInstanceName Ignite instance name.
+     * @param log Logger.
+     * @param connectString ZK connection string.
+     * @param sesTimeout ZK session timeout.
+     * @param connLostC Lost connection callback.
+     * @throws Exception If failed.
+     */
+    ZookeeperClient(String igniteInstanceName,
+        IgniteLogger log,
+        String connectString,
+        int sesTimeout,
+        IgniteRunnable connLostC)
+        throws Exception
+    {
+        this.log = log.getLogger(getClass());
+        this.connLostC = connLostC;
+
+        connLossTimeout = sesTimeout;
+
+        long connStartTime = this.connStartTime = System.currentTimeMillis();
+
+        connTimer = new Timer("zk-client-timer-" + igniteInstanceName);
+
+        String threadName = Thread.currentThread().getName();
+
+        // ZK generates internal threads' names using current thread name.
+        Thread.currentThread().setName("zk-" + igniteInstanceName);
+
+        try {
+            zk = new ZooKeeper(connectString, sesTimeout, this);
+        }
+        finally {
+            Thread.currentThread().setName(threadName);
+        }
+
+        synchronized (stateMux) {
+            if (connStartTime == this.connStartTime && state == 
ConnectionState.Disconnected)
+                scheduleConnectionCheck();
+        }
+    }
+
+    /**
+     * @return Zookeeper client.
+     */
+    ZooKeeper zk() {
+        return zk;
+    }
+
+    /**
+     * @return {@code True} if connected to ZooKeeper.
+     */
+    boolean connected() {
+        synchronized (stateMux) {
+            return state == ConnectionState.Connected;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void process(WatchedEvent evt) {
+        if (closing)
+            return;
+
+        if (evt.getType() == Event.EventType.None) {
+            ConnectionState newState;
+
+            synchronized (stateMux) {
+                if (state == ConnectionState.Lost) {
+                    U.warn(log, "Received event after connection was lost 
[evtState=" + evt.getState() + "]");
+
+                    return;
+                }
+
+                if (!zk.getState().isAlive())
+                    return;
+
+                Event.KeeperState zkState = evt.getState();
+
+                switch (zkState) {
+                    case SaslAuthenticated:
+                        return; // No-op.
+
+                    case AuthFailed:
+                        newState = state;
+
+                        break;
+
+                    case Disconnected:
+                        newState = ConnectionState.Disconnected;
+
+                        break;
+
+                    case SyncConnected:
+                        newState = ConnectionState.Connected;
+
+                        break;
+
+                    case Expired:
+                        U.warn(log, "Session expired, changing state to Lost");
+
+                        newState = ConnectionState.Lost;
+
+                        break;
+
+                    default:
+                        U.error(log, "Unexpected state for ZooKeeper client, 
close connection: " + zkState);
+
+                        newState = ConnectionState.Lost;
+                }
+
+                if (newState != state) {
+                    if (log.isInfoEnabled())
+                        log.info("ZooKeeper client state changed [prevState=" 
+ state + ", newState=" + newState + ']');
+
+                    state = newState;
+
+                    if (newState == ConnectionState.Disconnected) {
+                        connStartTime = System.currentTimeMillis();
+
+                        scheduleConnectionCheck();
+                    }
+                    else if (newState == ConnectionState.Connected) {
+                        retryCount.set(0);
+
+                        stateMux.notifyAll();
+                    }
+                    else
+                        assert state == ConnectionState.Lost : state;
+                }
+                else
+                    return;
+            }
+
+            if (newState == ConnectionState.Lost) {
+                closeClient();
+
+                notifyConnectionLost();
+            }
+            else if (newState == ConnectionState.Connected) {
+                for (ZkAsyncOperation op : retryQ)
+                    op.execute();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private void notifyConnectionLost() {
+        if (!closing && state == ConnectionState.Lost && connLostC != null)
+            connLostC.run();
+
+        connTimer.cancel();
+    }
+
+    /**
+     * @param path Path.
+     * @return {@code True} if node exists.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    boolean exists(String path) throws ZookeeperClientFailedException, 
InterruptedException {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.exists(path, false) != null;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     *
+     * @param paths Paths to create.
+     * @param createMode Create mode.
+     * @throws KeeperException.NodeExistsException If at least one of target 
node already exists.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    void createAll(List<String> paths, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NodeExistsException
+    {
+        // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8188
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths)
+            ops.add(Op.create(path, EMPTY_BYTES, ZK_ACL, createMode));
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (KeeperException.NodeExistsException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param overhead Extra overhead.
+     * @return {@code True} If data size exceeds max request size and should 
be splitted into multiple parts.
+     */
+    boolean needSplitNodeData(String path, byte[] data, int overhead) {
+        return requestOverhead(path) + data.length + overhead > MAX_REQ_SIZE;
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param overhead Extra overhead.
+     * @return Splitted data.
+     */
+    List<byte[]> splitNodeData(String path, byte[] data, int overhead) {
+        int partSize = MAX_REQ_SIZE - requestOverhead(path) - overhead;
+
+        int partCnt = data.length / partSize;
+
+        if (data.length % partSize != 0)
+            partCnt++;
+
+        assert partCnt > 1 : "Do not need split";
+
+        List<byte[]> parts = new ArrayList<>(partCnt);
+
+        int remaining = data.length;
+
+        for (int i = 0; i < partCnt; i++) {
+            int partSize0 = Math.min(remaining, partSize);
+
+            byte[] part = new byte[partSize0];
+
+            System.arraycopy(data, i * partSize, part, 0, part.length);
+
+            remaining -= partSize0;
+
+            parts.add(part);
+        }
+
+        assert remaining == 0 : remaining;
+
+        return parts;
+    }
+
+    /**
+     * TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8187
+     * @param path Request path.
+     * @return Marshalled request overhead.
+     */
+    private int requestOverhead(String path) {
+        return path.length();
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param createMode Create mode.
+     * @return Created path.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    String createIfNeeded(String path, byte[] data, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        assert !createMode.isSequential() : createMode;
+
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.create(path, data, ZK_ACL, createMode);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Node already exists: " + path);
+
+                return path;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param checkPrefix Unique prefix to check in case of retry.
+     * @param parentPath Parent node path.
+     * @param path Node to create.
+     * @param data Node data.
+     * @param createMode Create mode.
+     * @return Create path.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    String createSequential(String checkPrefix, String parentPath, String 
path, byte[] data, CreateMode createMode)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        assert createMode.isSequential() : createMode;
+
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        boolean first = true;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                if (!first) {
+                    List<String> children = zk.getChildren(parentPath, false);
+
+                    for (int i = 0; i < children.size(); i++) {
+                        String child = children.get(i);
+
+                        if (children.get(i).startsWith(checkPrefix)) {
+                            String resPath = parentPath + "/" + child;
+
+                            if (log.isDebugEnabled())
+                                log.debug("Check before retry, node already 
created: " + resPath);
+
+                            return resPath;
+                        }
+                    }
+                }
+
+                return zk.create(path, data, ZK_ACL, createMode);
+            }
+            catch (KeeperException.NodeExistsException e) {
+                assert !createMode.isSequential() : createMode;
+
+                if (log.isDebugEnabled())
+                    log.debug("Node already exists: " + path);
+
+                return path;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+
+            first = false;
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @return Children nodes.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    List<String> getChildren(String path)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.getChildren(path, false);
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException In case of error.
+     * @return {@code True} if given path exists.
+     */
+    boolean existsNoRetry(String path) throws InterruptedException, 
KeeperException {
+        return zk.exists(path, false) != null;
+    }
+
+    /**
+     * @param path Path.
+     * @param ver Expected version.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException In case of error.
+     */
+    void deleteIfExistsNoRetry(String path, int ver) throws 
InterruptedException, KeeperException {
+        try {
+            zk.delete(path, ver);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // No-op if znode does not exist.
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param ver Version.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    void deleteIfExists(String path, int ver)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        try {
+            delete(path, ver);
+        }
+        catch (KeeperException.NoNodeException e) {
+            // No-op if znode does not exist.
+        }
+    }
+
+    /**
+     * @param parent Parent path.
+     * @param paths Children paths.
+     * @param ver Version.
+     * @throws KeeperException.NoNodeException If at least one of nodes does 
not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    void deleteAll(@Nullable String parent, List<String> paths, int ver)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        if (paths.isEmpty())
+            return;
+
+        // TODO ZK: https://issues.apache.org/jira/browse/IGNITE-8188
+        List<Op> ops = new ArrayList<>(paths.size());
+
+        for (String path : paths) {
+            String path0 = parent != null ? parent + "/" + path : path;
+
+            ops.add(Op.delete(path0, ver));
+        }
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.multi(ops);
+
+                return;
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param ver Version.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    private void delete(String path, int ver)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.delete(path, ver);
+
+                return;
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param ver Version.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     * @throws KeeperException.NoNodeException If node does not exist.
+     * @throws KeeperException.BadVersionException If version does not match.
+     */
+    void setData(String path, byte[] data, int ver)
+        throws ZookeeperClientFailedException, InterruptedException, 
KeeperException.NoNodeException,
+        KeeperException.BadVersionException
+    {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                zk.setData(path, data, ver);
+
+                return;
+            }
+            catch (KeeperException.BadVersionException | 
KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @param stat Optional {@link Stat} instance to return znode state.
+     * @return Data.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    byte[] getData(String path, @Nullable Stat stat)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException {
+        for (;;) {
+            long connStartTime = this.connStartTime;
+
+            try {
+                return zk.getData(path, false, stat);
+            }
+            catch (KeeperException.NoNodeException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                onZookeeperError(connStartTime, e);
+            }
+        }
+    }
+
+    /**
+     * @param path Path.
+     * @return Data.
+     * @throws KeeperException.NoNodeException If target node does not exist.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    byte[] getData(String path)
+        throws KeeperException.NoNodeException, 
ZookeeperClientFailedException, InterruptedException
+    {
+        return getData(path, null);
+    }
+
+    /**
+     * @param path Path.
+     */
+    void deleteIfExistsAsync(String path) {
+        new DeleteIfExistsOperation(path).execute();
+    }
+
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
+    void existsAsync(String path, Watcher watcher, AsyncCallback.StatCallback 
cb) {
+        ExistsOperation op = new ExistsOperation(path, watcher, cb);
+
+        zk.exists(path, watcher, new StatCallbackWrapper(op), null);
+    }
+
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
+    void getChildrenAsync(String path, Watcher watcher, 
AsyncCallback.Children2Callback cb) {
+        GetChildrenOperation op = new GetChildrenOperation(path, watcher, cb);
+
+        zk.getChildren(path, watcher, new ChildrenCallbackWrapper(op), null);
+    }
+
+    /**
+     * @param path Path.
+     * @param watcher Watcher.
+     * @param cb Callback.
+     */
+    void getDataAsync(String path, Watcher watcher, AsyncCallback.DataCallback 
cb) {
+        GetDataOperation op = new GetDataOperation(path, watcher, cb);
+
+        zk.getData(path, watcher, new DataCallbackWrapper(op), null);
+    }
+
+    /**
+     * @param path Path.
+     * @param data Data.
+     * @param createMode Create mode.
+     * @param cb Callback.
+     */
+    private void createAsync(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
+        if (data == null)
+            data = EMPTY_BYTES;
+
+        CreateOperation op = new CreateOperation(path, data, createMode, cb);
+
+        zk.create(path, data, ZK_ACL, createMode, new 
CreateCallbackWrapper(op), null);
+    }
+
+    /**
+     *
+     */
+    void onCloseStart() {
+        closing = true;
+
+        synchronized (stateMux) {
+            stateMux.notifyAll();
+        }
+    }
+
+    /**
+     *
+     */
+    public void close() {
+        closeClient();
+    }
+
+    /**
+     * @param prevConnStartTime Time when connection was established.
+     * @param e Error.
+     * @throws ZookeeperClientFailedException If connection to zk was lost.
+     * @throws InterruptedException If interrupted.
+     */
+    private void onZookeeperError(long prevConnStartTime, Exception e)
+        throws ZookeeperClientFailedException, InterruptedException
+    {
+        ZookeeperClientFailedException err = null;
+
+        synchronized (stateMux) {
+            if (closing)
+                throw new ZookeeperClientFailedException("ZooKeeper client is 
closed.");
+
+            U.warn(log, "Failed to execute ZooKeeper operation [err=" + e + ", 
state=" + state + ']');
+
+            if (state == ConnectionState.Lost) {
+                U.error(log, "Operation failed with unexpected error, 
connection lost: " + e, e);
+
+                throw new ZookeeperClientFailedException(e);
+            }
+
+            boolean retry = (e instanceof KeeperException) && 
needRetry(((KeeperException)e).code().intValue());
+
+            if (retry) {
+                long remainingTime;
+
+                if (state == ConnectionState.Connected && connStartTime == 
prevConnStartTime) {
+                    state = ConnectionState.Disconnected;
+
+                    connStartTime = System.currentTimeMillis();
+
+                    remainingTime = connLossTimeout;
+                }
+                else {
+                    assert connStartTime != 0;
+
+                    assert state == ConnectionState.Disconnected : state;
+
+                    remainingTime = connLossTimeout - 
(System.currentTimeMillis() - connStartTime);
+
+                    if (remainingTime <= 0) {
+                        state = ConnectionState.Lost;
+
+                        U.warn(log, "Failed to establish ZooKeeper connection, 
close client " +
+                            "[timeout=" + connLossTimeout + ']');
+
+                        err = new ZookeeperClientFailedException(e);
+                    }
+                }
+
+                if (err == null) {
+                    U.warn(log, "ZooKeeper operation failed, will retry [err=" 
+ e +
+                        ", retryTimeout=" + RETRY_TIMEOUT +
+                        ", connLossTimeout=" + connLossTimeout +
+                        ", path=" + ((KeeperException)e).getPath() +
+                        ", remainingWaitTime=" + remainingTime + ']');
+
+                    stateMux.wait(RETRY_TIMEOUT);
+
+                    if (closing)
+                        throw new ZookeeperClientFailedException("ZooKeeper 
client is closed.");
+                }
+            }
+            else {
+                U.error(log, "Operation failed with unexpected error, close 
ZooKeeper client: " + e, e);
+
+                state = ConnectionState.Lost;
+
+                err = new ZookeeperClientFailedException(e);
+            }
+        }
+
+        if (err != null) {
+            closeClient();
+
+            notifyConnectionLost();
+
+            throw err;
+        }
+    }
+
+    /**
+     * @param code Zookeeper error code.
+     * @return {@code True} if can retry operation.
+     */
+    private boolean needRetry(int code) {
+        boolean retryByErrorCode = code == 
KeeperException.Code.CONNECTIONLOSS.intValue() ||
+            code == KeeperException.Code.SESSIONMOVED.intValue() ||
+            code == KeeperException.Code.OPERATIONTIMEOUT.intValue();
+
+        if (retryByErrorCode) {
+            if (MAX_RETRY_COUNT <= 0 || retryCount.incrementAndGet() < 
MAX_RETRY_COUNT)
+                return true;
+            else
+                return false;
+        }
+        else
+            return false;
+    }
+
+    /**
+     *
+     */
+    private void closeClient() {
+        try {
+            zk.close();
+        }
+        catch (Exception closeErr) {
+            U.warn(log, "Failed to close ZooKeeper client: " + closeErr, 
closeErr);
+        }
+
+        connTimer.cancel();
+    }
+
+    /**
+     *
+     */
+    private void scheduleConnectionCheck() {
+        assert state == ConnectionState.Disconnected : state;
+
+        connTimer.schedule(new ConnectionTimeoutTask(connStartTime), 
connLossTimeout);
+    }
+
+    /**
+     *
+     */
+    interface ZkAsyncOperation {
+        /**
+         *
+         */
+        void execute();
+    }
+
+    /**
+     *
+     */
+    class GetChildrenOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.Children2Callback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        GetChildrenOperation(String path, Watcher watcher, 
AsyncCallback.Children2Callback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            getChildrenAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class GetDataOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.DataCallback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        GetDataOperation(String path, Watcher watcher, 
AsyncCallback.DataCallback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            getDataAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class ExistsOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final Watcher watcher;
+
+        /** */
+        private final AsyncCallback.StatCallback cb;
+
+        /**
+         * @param path Path.
+         * @param watcher Watcher.
+         * @param cb Callback.
+         */
+        ExistsOperation(String path, Watcher watcher, 
AsyncCallback.StatCallback cb) {
+            this.path = path;
+            this.watcher = watcher;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            existsAsync(path, watcher, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class CreateOperation implements ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /** */
+        private final byte[] data;
+
+        /** */
+        private final CreateMode createMode;
+
+        /** */
+        private final AsyncCallback.StringCallback cb;
+
+        /**
+         * @param path path.
+         * @param data Data.
+         * @param createMode Create mode.
+         * @param cb Callback.
+         */
+        CreateOperation(String path, byte[] data, CreateMode createMode, 
AsyncCallback.StringCallback cb) {
+            this.path = path;
+            this.data = data;
+            this.createMode = createMode;
+            this.cb = cb;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            createAsync(path, data, createMode, cb);
+        }
+    }
+
+    /**
+     *
+     */
+    class DeleteIfExistsOperation implements AsyncCallback.VoidCallback, 
ZkAsyncOperation {
+        /** */
+        private final String path;
+
+        /**
+         * @param path Path.
+         */
+        DeleteIfExistsOperation(String path) {
+            this.path = path;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute() {
+            zk.delete(path, -1, this, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx) {
+            if (closing)
+                return;
+
+            if (rc == KeeperException.Code.NONODE.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [" +
+                    "path=" + path + ']');
+
+                retryQ.add(this);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                assert rc == 0 : KeeperException.Code.get(rc);
+        }
+    }
+
+    /**
+     *
+     */
+    class CreateCallbackWrapper implements AsyncCallback.StringCallback {
+        /** */
+        final CreateOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        CreateCallbackWrapper(CreateOperation op) {
+            this.op = op;
+        }
+
+        @Override public void processResult(int rc, String path, Object ctx, 
String name) {
+            if (closing)
+                return;
+
+            if (rc == KeeperException.Code.NODEEXISTS.intValue())
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else {
+                if (op.cb != null)
+                    op.cb.processResult(rc, path, ctx, name);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    class ChildrenCallbackWrapper implements AsyncCallback.Children2Callback {
+        /** */
+        private final GetChildrenOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private ChildrenCallbackWrapper(GetChildrenOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+            if (closing)
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, children, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    class DataCallbackWrapper implements AsyncCallback.DataCallback {
+        /** */
+        private final GetDataOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private DataCallbackWrapper(GetDataOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
+            if (closing)
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, data, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    class StatCallbackWrapper implements AsyncCallback.StatCallback {
+        /** */
+        private final ExistsOperation op;
+
+        /**
+         * @param op Operation.
+         */
+        private StatCallbackWrapper(ExistsOperation op) {
+            this.op = op;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void processResult(int rc, String path, Object ctx, 
Stat stat) {
+            if (closing)
+                return;
+
+            if (needRetry(rc)) {
+                U.warn(log, "Failed to execute async operation, connection 
lost. Will retry after connection restore [path=" + path + ']');
+
+                retryQ.add(op);
+            }
+            else if (rc == KeeperException.Code.SESSIONEXPIRED.intValue())
+                U.warn(log, "Failed to execute async operation, connection 
lost [path=" + path + ']');
+            else
+                op.cb.processResult(rc, path, ctx, stat);
+        }
+    }
+
+    /**
+     *
+     */
+    private class ConnectionTimeoutTask extends TimerTask {
+        /** */
+        private final long connectStartTime;
+
+        /**
+         * @param connectStartTime Time was connection started.
+         */
+        ConnectionTimeoutTask(long connectStartTime) {
+            this.connectStartTime = connectStartTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            boolean connLoss = false;
+
+            synchronized (stateMux) {
+                if (closing)
+                    return;
+
+                if (state == ConnectionState.Disconnected &&
+                    ZookeeperClient.this.connStartTime == connectStartTime) {
+
+                    state = ConnectionState.Lost;
+
+                    U.warn(log, "Failed to establish ZooKeeper connection, 
close client " +
+                        "[timeout=" + connLossTimeout + ']');
+
+                    connLoss = true;
+                }
+            }
+
+            if (connLoss) {
+                closeClient();
+
+                notifyConnectionLost();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private enum ConnectionState {
+        /** */
+        Connected,
+        /** */
+        Disconnected,
+        /** */
+        Lost
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
new file mode 100644
index 0000000..01d011b
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClientFailedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+/**
+ *
+ */
+class ZookeeperClientFailedException extends Exception {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Message.
+     */
+    ZookeeperClientFailedException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * @param cause Cause.
+     */
+    ZookeeperClientFailedException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
new file mode 100644
index 0000000..3cb5fad
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperClusterNode.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.managers.discovery.IgniteClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
+
+/**
+ * Zookeeper Cluster Node.
+ */
+public class ZookeeperClusterNode implements IgniteClusterNode, Serializable, 
Comparable<ZookeeperClusterNode> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final byte CLIENT_NODE_MASK = 0x01;
+
+    /** */
+    private UUID id;
+
+    /** */
+    private Serializable consistentId;
+
+    /** */
+    private long internalId;
+
+    /** */
+    private long order;
+
+    /** */
+    private IgniteProductVersion ver;
+
+    /** Node attributes. */
+    private Map<String, Object> attrs;
+
+    /** Internal discovery addresses as strings. */
+    private Collection<String> addrs;
+
+    /** Internal discovery host names as strings. */
+    private Collection<String> hostNames;
+
+    /** */
+    private long sesTimeout;
+
+    /** Metrics provider. */
+    private transient DiscoveryMetricsProvider metricsProvider;
+
+    /** */
+    private transient boolean loc;
+
+    /** */
+    private transient volatile ClusterMetrics metrics;
+
+    /** Node cache metrics. */
+    @GridToStringExclude
+    private transient volatile Map<Integer, CacheMetrics> cacheMetrics;
+
+    /** */
+    private byte flags;
+
+    /** Daemon node flag. */
+    @GridToStringExclude
+    private transient boolean daemon;
+
+    /** Daemon node initialization flag. */
+    @GridToStringExclude
+    private transient volatile boolean daemonInit;
+
+    /**
+     * @param id Node ID.
+     * @param addrs Node addresses.
+     * @param hostNames Node host names.
+     * @param ver Node version.
+     * @param attrs Node attributes.
+     * @param consistentId Consistent ID.
+     * @param sesTimeout Zookeeper session timeout.
+     * @param client Client node flag.
+     * @param metricsProvider Metrics provider.
+     */
+    public ZookeeperClusterNode(
+        UUID id,
+        Collection<String> addrs,
+        Collection<String> hostNames,
+        IgniteProductVersion ver,
+        Map<String, Object> attrs,
+        Serializable consistentId,
+        long sesTimeout,
+        boolean client,
+        DiscoveryMetricsProvider metricsProvider
+    ) {
+        assert id != null;
+        assert consistentId != null;
+
+        this.id = id;
+        this.ver = ver;
+        this.attrs = Collections.unmodifiableMap(attrs);
+        this.addrs = addrs;
+        this.hostNames = hostNames;
+        this.consistentId = consistentId;
+        this.sesTimeout = sesTimeout;
+        this.metricsProvider = metricsProvider;
+
+        if (client)
+            flags |= CLIENT_NODE_MASK;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object consistentId() {
+        return consistentId;
+    }
+
+    /** {@inheritDoc} */
+    public void setConsistentId(Serializable consistentId) {
+        this.consistentId = consistentId;
+
+        final Map<String, Object> map = new HashMap<>(attrs);
+
+        map.put(ATTR_NODE_CONSISTENT_ID, consistentId);
+
+        attrs = Collections.unmodifiableMap(map);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isCacheClient() {
+        return isClient();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T attribute(String name) {
+        // Even though discovery SPI removes this attribute after 
authentication, keep this check for safety.
+        if (IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(name))
+            return null;
+
+        return (T)attrs.get(name);
+    }
+
+    /**
+     * Sets node attributes.
+     *
+     * @param attrs Node attributes.
+     */
+    void setAttributes(Map<String, Object> attrs) {
+        this.attrs = U.sealMap(attrs);
+    }
+
+    /**
+     * Gets node attributes without filtering.
+     *
+     * @return Node attributes without filtering.
+     */
+    Map<String, Object> getAttributes() {
+        return attrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterMetrics metrics() {
+        if (metricsProvider != null) {
+            ClusterMetrics metrics0 = metricsProvider.metrics();
+
+            assert metrics0 != null;
+
+            metrics = metrics0;
+
+            return metrics0;
+        }
+
+        return metrics;
+    }
+
+    /** {@inheritDoc} */
+    public void setMetrics(ClusterMetrics metrics) {
+        assert metrics != null;
+
+        this.metrics = metrics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+        if (metricsProvider != null) {
+            Map<Integer, CacheMetrics> cacheMetrics0 = 
metricsProvider.cacheMetrics();
+
+            cacheMetrics = cacheMetrics0;
+
+            return cacheMetrics0;
+        }
+
+        return cacheMetrics;
+    }
+
+    /** {@inheritDoc} */
+    public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) {
+        this.cacheMetrics = cacheMetrics != null ? cacheMetrics : 
Collections.<Integer, CacheMetrics>emptyMap();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Object> attributes() {
+        // Even though discovery SPI removes this attribute after 
authentication, keep this check for safety.
+        return F.view(attrs, new IgnitePredicate<String>() {
+            @Override public boolean apply(String s) {
+                return 
!IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS.equals(s);
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> addresses() {
+        return addrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<String> hostNames() {
+        return hostNames;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long order() {
+        return order;
+    }
+
+    /**
+     * @return Internal ID corresponds to Zookeeper sequential node.
+     */
+    long internalId() {
+        return internalId;
+    }
+
+    /**
+     * @param internalId Internal ID corresponds to Zookeeper sequential node.
+     */
+    void internalId(long internalId) {
+        this.internalId = internalId;
+    }
+
+    /**
+     * @param order Node order.
+     */
+    void order(long order) {
+        assert order > 0 : order;
+
+        this.order = order;
+    }
+
+    /**
+     * @param newId New node ID.
+     */
+    public void onClientDisconnected(UUID newId) {
+        id = newId;
+    }
+
+    /**
+     * @return Session timeout.
+     */
+    long sessionTimeout() {
+        return sesTimeout;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteProductVersion version() {
+        return ver;
+    }
+
+    /**
+     * @param loc Local node flag.
+     */
+    public void local(boolean loc) {
+        this.loc = loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocal() {
+        return loc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDaemon() {
+        if (!daemonInit) {
+            daemon = "true".equalsIgnoreCase((String)attribute(ATTR_DAEMON));
+
+            daemonInit = true;
+        }
+
+        return daemon;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClient() {
+        return (CLIENT_NODE_MASK & flags) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@Nullable ZookeeperClusterNode node) {
+        if (node == null)
+            return 1;
+
+        int res = Long.compare(order, node.order);
+
+        if (res == 0) {
+            assert id().equals(node.id()) : "Duplicate order [this=" + this + 
", other=" + node + ']';
+
+            res = id().compareTo(node.id());
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return id.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return F.eqNodes(this, obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ZookeeperClusterNode [id=" + id +
+            ", addrs=" + addrs +
+            ", order=" + order +
+            ", loc=" + loc +
+            ", client=" + isClient() + ']';
+    }
+}

Reply via email to