http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 54b3a78..e89a4c8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -17,11 +17,21 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.GridAbstractCommunicationSelfTest;
@@ -85,6 +95,67 @@ abstract class GridTcpCommunicationSpiAbstractTest extends 
GridAbstractCommunica
         }
     }
 
+    /**
+     *
+     */
+    public void testCheckConnection1() {
+        for (int i = 0; i < 100; i++) {
+            for (Map.Entry<UUID, CommunicationSpi<Message>> entry : 
spis.entrySet()) {
+                TcpCommunicationSpi spi = 
(TcpCommunicationSpi)entry.getValue();
+
+                List<ClusterNode> checkNodes = new ArrayList<>(nodes);
+
+                assert checkNodes.size() > 1;
+
+                IgniteFuture<BitSet> fut = spi.checkConnection(checkNodes);
+
+                BitSet res = fut.get();
+
+                for (int n = 0; n < checkNodes.size(); n++)
+                    assertTrue(res.get(n));
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCheckConnection2() throws Exception {
+        final int THREADS = spis.size();
+
+        final CyclicBarrier b = new CyclicBarrier(THREADS);
+
+        List<IgniteInternalFuture> futs = new ArrayList<>();
+
+        for (Map.Entry<UUID, CommunicationSpi<Message>> entry : 
spis.entrySet()) {
+            final TcpCommunicationSpi spi = 
(TcpCommunicationSpi)entry.getValue();
+
+            futs.add(GridTestUtils.runAsync(new Callable() {
+                @Override public Object call() throws Exception {
+                    List<ClusterNode> checkNodes = new ArrayList<>(nodes);
+
+                    assert checkNodes.size() > 1;
+
+                    b.await();
+
+                    for (int i = 0; i < 100; i++) {
+                        IgniteFuture<BitSet> fut = 
spi.checkConnection(checkNodes);
+
+                        BitSet res = fut.get();
+
+                        for (int n = 0; n < checkNodes.size(); n++)
+                            assertTrue(res.get(n));
+                    }
+
+                    return null;
+                }
+            }));
+        }
+
+        for (IgniteInternalFuture f : futs)
+            f.get();
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
index 54b48e5..9a45d2d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/FilterDataForClientNodeDiscoveryTest.java
@@ -206,6 +206,11 @@ public class FilterDataForClientNodeDiscoveryTest extends 
GridCommonAbstractTest
         }
 
         /** {@inheritDoc} */
+        @Override public boolean stopProcess() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
         @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, 
AffinityTopologyVersion topVer,
             DiscoCache discoCache) {
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index ca05288..51dcb23 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -613,6 +613,16 @@ public class GridSpiTestContext implements 
IgniteSpiContext {
         return Collections.emptyMap();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean communicationFailureResolveSupported() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resolveCommunicationFailure(ClusterNode node, 
Exception err) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
index 4507572..e2594ca 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/config/GridTestProperties.java
@@ -83,6 +83,15 @@ public final class GridTestProperties {
     /** "True value" enables {@link BinaryBasicNameMapper} in {@link 
BinaryTypeConfiguration#getNameMapper()}  */
     public static final String BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER = 
"binary.marshaller.use.simple.name.mapper";
 
+    /**
+     * Name of class which provides static method 
preprocessConfiguration(IgniteConfiguration cfg) to
+     * alter {@link org.apache.ignite.configuration.IgniteConfiguration} 
before node is started.
+     * <p>
+     * Note: this pre-preprocessor is started only if test starts node using 
one of GridAbstractTest's startGrid
+     * method.
+     */
+    public static final String IGNITE_CFG_PREPROCESSOR_CLS = 
"ignite.cfg.preprocessor.class";
+
     /** */
     static {
         // Initialize IGNITE_HOME system property.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index c3b262c..f5784eb 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -46,6 +46,7 @@ import junit.framework.TestCase;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
@@ -84,6 +85,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerContextTestImpl;
@@ -92,6 +94,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
@@ -124,6 +127,7 @@ import static 
org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
 import static 
org.apache.ignite.testframework.config.GridTestProperties.BINARY_MARSHALLER_USE_SIMPLE_NAME_MAPPER;
+import static 
org.apache.ignite.testframework.config.GridTestProperties.IGNITE_CFG_PREPROCESSOR_CLS;
 
 /**
  * Common abstract test for Ignite tests.
@@ -203,13 +207,15 @@ public abstract class GridAbstractTest extends TestCase {
         if (BINARY_MARSHALLER)
             
GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, 
BinaryMarshaller.class.getName());
 
-        Thread timer = new Thread(new GridTestClockTimer(), 
"ignite-clock-for-tests");
+        if (GridTestClockTimer.startTestTimer()) {
+            Thread timer = new Thread(new GridTestClockTimer(), 
"ignite-clock-for-tests");
 
-        timer.setDaemon(true);
+            timer.setDaemon(true);
 
-        timer.setPriority(10);
+            timer.setPriority(10);
 
-        timer.start();
+            timer.start();
+        }
     }
 
     /** */
@@ -838,6 +844,7 @@ public abstract class GridAbstractTest extends TestCase {
     protected Ignite startGrid(String igniteInstanceName, 
GridSpringResourceContext ctx) throws Exception {
         return startGrid(igniteInstanceName, 
optimize(getConfiguration(igniteInstanceName)), ctx);
     }
+
     /**
      * Starts new grid with given name.
      *
@@ -852,12 +859,33 @@ public abstract class GridAbstractTest extends TestCase {
             startingIgniteInstanceName.set(igniteInstanceName);
 
             try {
+                String cfgProcClsName = 
System.getProperty(IGNITE_CFG_PREPROCESSOR_CLS);
+
+                if (cfgProcClsName != null) {
+                    try {
+                        Class<?> cfgProc = Class.forName(cfgProcClsName);
+
+                        Method method = 
cfgProc.getMethod("preprocessConfiguration", IgniteConfiguration.class);
+
+                        if (!Modifier.isStatic(method.getModifiers()))
+                            throw new Exception("Non-static pre-processor 
method in pre-processor class: " + cfgProcClsName);
+
+                        method.invoke(null, cfg);
+                    }
+                    catch (Exception e) {
+                        log.error("Failed to pre-process IgniteConfiguration 
using pre-processor class: " + cfgProcClsName);
+
+                        throw new IgniteException(e);
+                    }
+                }
+
                 Ignite node = IgnitionEx.start(cfg, ctx);
 
                 IgniteConfiguration nodeCfg = node.configuration();
 
                 log.info("Node started with the following configuration [id=" 
+ node.cluster().localNode().id()
                     + ", marshaller=" + nodeCfg.getMarshaller()
+                    + ", discovery=" + nodeCfg.getDiscoverySpi()
                     + ", binaryCfg=" + nodeCfg.getBinaryConfiguration()
                     + ", lateAff=" + nodeCfg.isLateAffinityAssignment() + "]");
 
@@ -967,6 +995,26 @@ public abstract class GridAbstractTest extends TestCase {
         if (cfg == null)
             cfg = optimize(getConfiguration(igniteInstanceName));
 
+        if (locNode != null) {
+            DiscoverySpi discoverySpi = 
locNode.configuration().getDiscoverySpi();
+
+            if (discoverySpi != null && !(discoverySpi instanceof 
TcpDiscoverySpi)) {
+                try {
+                    // Clone added to support ZookeeperDiscoverySpi.
+                    Method m = 
discoverySpi.getClass().getDeclaredMethod("cloneSpiConfiguration");
+
+                    m.setAccessible(true);
+
+                    cfg.setDiscoverySpi((DiscoverySpi) m.invoke(discoverySpi));
+
+                    resetDiscovery = false;
+                }
+                catch (NoSuchMethodException e) {
+                    // Ignore.
+                }
+            }
+        }
+
         return new IgniteProcessProxy(cfg, log, locNode, resetDiscovery);
     }
 
@@ -1075,7 +1123,9 @@ public abstract class GridAbstractTest extends TestCase {
             for (Ignite g : srvs)
                 stopGrid(g.name(), cancel, false);
 
-            assert G.allGrids().isEmpty();
+            List<Ignite> nodes = G.allGrids();
+
+            assert nodes.isEmpty() : nodes;
         }
         finally {
             IgniteProcessProxy.killAll(); // In multi-JVM case.
@@ -1177,6 +1227,14 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param nodeIdx Node index.
+     * @return Node ID.
+     */
+    protected final UUID nodeId(int nodeIdx) {
+        return ignite(nodeIdx).cluster().localNode().id();
+    }
+
+    /**
      * Gets grid for given test.
      *
      * @return Grid for given test.
@@ -1217,7 +1275,11 @@ public abstract class GridAbstractTest extends TestCase {
      * @throws Exception If failed.
      */
     protected Ignite startGrid(String igniteInstanceName, String 
springCfgPath) throws Exception {
-        return startGrid(igniteInstanceName, loadConfiguration(springCfgPath));
+        IgniteConfiguration cfg = loadConfiguration(springCfgPath);
+
+        cfg.setGridLogger(getTestResources().getLogger());
+
+        return startGrid(igniteInstanceName, cfg);
     }
 
     /**
@@ -2142,6 +2204,50 @@ public abstract class GridAbstractTest extends TestCase {
             }
         }
     }
+    /**
+     * @param expSize Expected nodes number.
+     * @throws Exception If failed.
+     */
+    protected void waitForTopology(final int expSize) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                List<Ignite> nodes = G.allGrids();
+
+                if (nodes.size() != expSize) {
+                    info("Wait all nodes [size=" + nodes.size() + ", exp=" + 
expSize + ']');
+
+                    return false;
+                }
+
+                for (Ignite node: nodes) {
+                    try {
+                        IgniteFuture<?> reconnectFut = 
node.cluster().clientReconnectFuture();
+
+                        if (reconnectFut != null && !reconnectFut.isDone()) {
+                            info("Wait for size on node, reconnect is in 
progress [node=" + node.name() + ']');
+
+                            return false;
+                        }
+
+                        int sizeOnNode = node.cluster().nodes().size();
+
+                        if (sizeOnNode != expSize) {
+                            info("Wait for size on node [node=" + node.name() 
+ ", size=" + sizeOnNode + ", exp=" + expSize + ']');
+
+                            return false;
+                        }
+                    }
+                    catch (IgniteClientDisconnectedException e) {
+                        info("Wait for size on node, node disconnected [node=" 
+ node.name() + ']');
+
+                        return false;
+                    }
+                }
+
+                return true;
+            }
+        }, 30_000));
+    }
 
     /**
      * @param millis Time to sleep.
@@ -2172,6 +2278,17 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @return {@code True} if nodes use {@link TcpDiscoverySpi}.
+     */
+    protected static boolean tcpDiscovery() {
+        List<Ignite> nodes = G.allGrids();
+
+        assertFalse("There are no nodes", nodes.isEmpty());
+
+        return nodes.get(0).configuration().getDiscoverySpi() instanceof 
TcpDiscoverySpi;
+    }
+
+    /**
      *
      */
     private static interface WriteReplaceOwner {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
index d7be576..2b3a19c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java
@@ -162,6 +162,8 @@ public class IgniteNodeRunner {
                 cfg.setDiscoverySpi(disco);
             }
 
+            X.println("Configured discovery: " + 
cfg.getDiscoverySpi().getClass().getName());
+
             return cfg;
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 55fab8d..14eb296 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.ClusterNodeMetricsSelfTest;
+import org.apache.ignite.internal.ClusterNodeMetricsUpdateTest;
 import org.apache.ignite.internal.GridAffinityNoCacheSelfTest;
 import org.apache.ignite.internal.GridAffinitySelfTest;
 import org.apache.ignite.internal.GridAlwaysFailoverSpiFailSelfTest;
@@ -122,6 +123,7 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(GridAlwaysFailoverSpiFailSelfTest.class);
         suite.addTestSuite(GridTaskInstanceExecutionSelfTest.class);
         suite.addTestSuite(ClusterNodeMetricsSelfTest.class);
+        suite.addTestSuite(ClusterNodeMetricsUpdateTest.class);
         suite.addTestSuite(GridNonHistoryMetricsSelfTest.class);
         suite.addTestSuite(GridCancelledJobsMetricsSelfTest.class);
         suite.addTestSuite(GridCollisionJobsContextSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
index b9ef1e4..e26b211 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
@@ -117,7 +117,7 @@ public class IgniteCacheDistributedQueryCancelSelfTest 
extends GridCommonAbstrac
                     }
 
                     for (Ignite g : G.allGrids())
-                        if 
(!g.configuration().getDiscoverySpi().isClientMode())
+                        if (!g.configuration().isClientMode())
                             stopGrid(g.name(), true);
                 }
             }, 1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
index 97720d5..bd3b093 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java
@@ -89,11 +89,14 @@ public abstract class DynamicIndexAbstractBasicSelfTest 
extends DynamicIndexAbst
      * @param mode Mode.
      * @param atomicityMode Atomicity mode.
      * @param near Near flag.
+     * @throws Exception If failed.
      */
     private void initialize(CacheMode mode, CacheAtomicityMode atomicityMode, 
boolean near)
-        throws IgniteCheckedException {
+        throws Exception {
         createSqlCache(node(), cacheConfiguration(mode, atomicityMode, near));
 
+        awaitPartitionMapExchange();
+
         grid(IDX_CLI_NEAR_ONLY).getOrCreateNearCache(CACHE_NAME, new 
NearCacheConfiguration<>());
 
         assertNoIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
 
b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
index a181068..5cad167 100644
--- 
a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
+++ 
b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
@@ -44,13 +44,7 @@ public class GridJtaTransactionManagerSelfTest extends 
GridCommonAbstractTest {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName).
             
setCacheConfiguration(defaultCacheConfiguration().setCacheMode(PARTITIONED));
 
-        cfg.getTransactionConfiguration().setTxManagerFactory(new 
Factory<TransactionManager>() {
-            private static final long serialVersionUID = 0L;
-
-            @Override public TransactionManager create() {
-                return jotm.getTransactionManager();
-            }
-        });
+        cfg.getTransactionConfiguration().setTxManagerFactory(new 
TestTxManagerFactory());
 
         return cfg;
     }
@@ -205,4 +199,17 @@ public class GridJtaTransactionManagerSelfTest extends 
GridCommonAbstractTest {
             cache.removeAll();
         }
     }
+
+    /**
+     *
+     */
+    static class TestTxManagerFactory implements Factory<TransactionManager> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public TransactionManager create() {
+            return jotm.getTransactionManager();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
 
b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
index f6fd5c7..14b7fae 100644
--- 
a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
+++ 
b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/jta/GridPartitionedCacheJtaFactorySelfTest.java
@@ -30,12 +30,19 @@ public class GridPartitionedCacheJtaFactorySelfTest extends 
AbstractCacheJtaSelf
     @Override protected void configureJta(IgniteConfiguration cfg) {
         TransactionConfiguration txCfg = cfg.getTransactionConfiguration();
 
-        txCfg.setTxManagerFactory(new Factory<TransactionManager>() {
-            private static final long serialVersionUID = 0L;
+        txCfg.setTxManagerFactory(new TestTxManagerFactory());
+    }
+
+    /**
+     *
+     */
+    static class TestTxManagerFactory implements Factory<TransactionManager> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-            @Override public TransactionManager create() {
-                return jotm.getTransactionManager();
-            }
-        });
+        /** {@inheritDoc} */
+        @Override public TransactionManager create() {
+            return jotm.getTransactionManager();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index fce47a6..d87ea0a 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.processors.cache.query.QueryCursorEx
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata
 import org.apache.ignite.lang.IgniteUuid
 import org.apache.ignite.spark.impl._
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
@@ -91,8 +92,14 @@ class IgniteRDD[K, V] (
     override protected[spark] def getPreferredLocations(split: Partition): 
Seq[String] = {
         ensureCache()
 
-        
ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
+        if 
(ic.ignite().configuration().getDiscoverySpi().isInstanceOf[TcpDiscoverySpi]) {
+          
ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
             
.map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList
+        }
+        else {
+          
ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)
+            .flatten(_.hostNames).toSeq
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
 
b/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
index 75128fc..b453858 100644
--- 
a/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
+++ 
b/modules/spring/src/test/java/org/apache/ignite/internal/GridFactorySelfTest.java
@@ -998,7 +998,8 @@ public class GridFactorySelfTest extends 
GridCommonAbstractTest {
 
             startGrid("1", c);
 
-            assert ((TcpDiscoverySpi)c.getDiscoverySpi()).started();
+            if (tcpDiscovery())
+                assert ((TcpDiscoverySpi)c.getDiscoverySpi()).started();
 
             try {
                 startGrid("2", c);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
 
b/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
index b861e19..46da3cc 100644
--- 
a/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
+++ 
b/modules/spring/src/test/java/org/apache/ignite/p2p/GridP2PUserVersionChangeSelfTest.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testsuites.IgniteIgnore;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_TASK_UNDEPLOYED;
 
@@ -255,12 +256,12 @@ public class GridP2PUserVersionChangeSelfTest extends 
GridCommonAbstractTest {
 
             ignite2.events().localListen(new IgnitePredicate<Event>() {
                 @Override public boolean apply(Event evt) {
-                    if (evt.type() == EVT_NODE_LEFT)
+                    if (evt.type() == EVT_NODE_LEFT || evt.type() == 
EVT_NODE_FAILED)
                         discoLatch.countDown();
 
                     return true;
                 }
-            }, EVT_NODE_LEFT);
+            }, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
             Integer res1 = (Integer)ignite1.compute().execute(task1, 
ignite2.cluster().localNode().id());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/yardstick/pom-standalone.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/pom-standalone.xml 
b/modules/yardstick/pom-standalone.xml
index 577a95e..6905d94 100644
--- a/modules/yardstick/pom-standalone.xml
+++ b/modules/yardstick/pom-standalone.xml
@@ -54,6 +54,12 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-zookeeper</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-log4j</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/yardstick/pom.xml
----------------------------------------------------------------------
diff --git a/modules/yardstick/pom.xml b/modules/yardstick/pom.xml
index 8cad24b..9923bb7 100644
--- a/modules/yardstick/pom.xml
+++ b/modules/yardstick/pom.xml
@@ -55,6 +55,12 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-zookeeper</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-log4j</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml
index c3c3679..2d47ece 100644
--- a/modules/zookeeper/pom.xml
+++ b/modules/zookeeper/pom.xml
@@ -49,6 +49,12 @@
 
         <dependency>
             <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.curator</groupId>
             <artifactId>curator-x-discovery</artifactId>
             <version>${curator.version}</version>
         </dependency>
@@ -109,6 +115,13 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-indexing</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-log4j</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
@@ -122,16 +135,43 @@
         </dependency>
 
         <dependency>
+            <groupId>com.thoughtworks.xstream</groupId>
+            <artifactId>xstream</artifactId>
+            <version>1.4.8</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-indexing</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
             <!-- Generate the OSGi MANIFEST.MF for this bundle. -->
             <plugin>
                 <groupId>org.apache.felix</groupId>

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
new file mode 100644
index 0000000..860c71c
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.curator.utils.PathUtils;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiConfiguration;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
+import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport;
+import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
+import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
+import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+
+/**
+ * Zookeeper Discovery Spi.
+ */
+@IgniteSpiMultipleInstancesSupport(true)
+@DiscoverySpiOrderSupport(true)
+@DiscoverySpiHistorySupport(true)
+@DiscoverySpiMutableCustomMessageSupport(false)
+public class ZookeeperDiscoverySpi extends IgniteSpiAdapter implements 
DiscoverySpi, IgniteDiscoverySpi {
+    /** */
+    public static final String DFLT_ROOT_PATH = "/apacheIgnite";
+
+    /** */
+    public static final long DFLT_JOIN_TIMEOUT = 0;
+
+    /** */
+    @GridToStringInclude
+    private String zkRootPath = DFLT_ROOT_PATH;
+
+    /** */
+    @GridToStringInclude
+    private String zkConnectionString;
+
+    /** */
+    private long joinTimeout = DFLT_JOIN_TIMEOUT;
+
+    /** */
+    @GridToStringInclude
+    private long sesTimeout;
+
+    /** */
+    private boolean clientReconnectDisabled;
+
+    /** */
+    @GridToStringExclude
+    private DiscoverySpiListener lsnr;
+
+    /** */
+    @GridToStringExclude
+    private DiscoverySpiDataExchange exchange;
+
+    /** */
+    @GridToStringExclude
+    private DiscoverySpiNodeAuthenticator nodeAuth;
+
+    /** */
+    @GridToStringExclude
+    private DiscoveryMetricsProvider metricsProvider;
+
+    /** */
+    @GridToStringExclude
+    private ZookeeperDiscoveryImpl impl;
+
+    /** */
+    @GridToStringExclude
+    private Map<String, Object> locNodeAttrs;
+
+    /** */
+    @GridToStringExclude
+    private IgniteProductVersion locNodeVer;
+
+    /** */
+    @GridToStringExclude
+    private Serializable consistentId;
+
+    /** Local node addresses. */
+    private IgniteBiTuple<Collection<String>, Collection<String>> addrs;
+
+    /** */
+    @LoggerResource
+    @GridToStringExclude
+    private IgniteLogger log;
+
+    /** */
+    private IgniteDiscoverySpiInternalListener internalLsnr;
+
+    /**
+     * @return Base path in ZK for znodes created by SPI.
+     */
+    public String getZkRootPath() {
+        return zkRootPath;
+    }
+
+    /**
+     * @param zkRootPath Base path in ZooKeeper for znodes created by SPI.
+     * @return {@code this} for chaining.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public ZookeeperDiscoverySpi setZkRootPath(String zkRootPath) {
+        this.zkRootPath = zkRootPath;
+
+        return this;
+    }
+
+    /**
+     * @return ZooKeeper session timeout.
+     */
+    public long getSessionTimeout() {
+        return sesTimeout;
+    }
+
+    /**
+     * @param sesTimeout ZooKeeper session timeout.
+     * @return {@code this} for chaining.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public ZookeeperDiscoverySpi setSessionTimeout(long sesTimeout) {
+        this.sesTimeout = sesTimeout;
+
+        return this;
+    }
+
+    /**
+     * @return Cluster join timeout.
+     */
+    public long getJoinTimeout() {
+        return joinTimeout;
+    }
+
+    /**
+     * @param joinTimeout Cluster join timeout ({@code 0} means wait forever).
+     * @return {@code this} for chaining.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public ZookeeperDiscoverySpi setJoinTimeout(long joinTimeout) {
+        this.joinTimeout = joinTimeout;
+
+        return this;
+    }
+
+    /**
+     * @return ZooKeeper connection string
+     */
+    public String getZkConnectionString() {
+        return zkConnectionString;
+    }
+
+    /**
+     * @param zkConnectionString ZooKeeper connection string
+     * @return {@code this} for chaining.
+     */
+    @IgniteSpiConfiguration(optional = false)
+    public ZookeeperDiscoverySpi setZkConnectionString(String 
zkConnectionString) {
+        this.zkConnectionString = zkConnectionString;
+
+        return this;
+    }
+
+    /**
+     * If {@code true} client does not try to reconnect.
+     *
+     * @return Client reconnect disabled flag.
+     */
+    public boolean isClientReconnectDisabled() {
+        return clientReconnectDisabled;
+    }
+
+    /**
+     * Sets client reconnect disabled flag.
+     *
+     * @param clientReconnectDisabled Client reconnect disabled flag.
+     * @return {@code this} for chaining.
+     */
+    @IgniteSpiConfiguration(optional = true)
+    public ZookeeperDiscoverySpi setClientReconnectDisabled(boolean 
clientReconnectDisabled) {
+        this.clientReconnectDisabled = clientReconnectDisabled;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientReconnectSupported() {
+        return !clientReconnectDisabled;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clientReconnect() {
+        impl.reconnect();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean knownNode(UUID nodeId) {
+        return impl.knownNode(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean supportsCommunicationFailureResolve() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resolveCommunicationFailure(ClusterNode node, 
Exception err) {
+        impl.resolveCommunicationError(node, err);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable consistentId() throws 
IgniteSpiException {
+        if (consistentId == null) {
+            consistentId = ignite.configuration().getConsistentId();
+
+            if (consistentId == null) {
+                initAddresses();
+
+                final List<String> sortedAddrs = new ArrayList<>(addrs.get1());
+
+                Collections.sort(sortedAddrs);
+
+                if (getBoolean(IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT))
+                    consistentId = U.consistentId(sortedAddrs);
+                else {
+                    Integer commPort = null;
+
+                    if (locNodeAttrs != null) {
+                        commPort = (Integer)locNodeAttrs.get(
+                            TcpCommunicationSpi.class.getSimpleName() + "." + 
TcpCommunicationSpi.ATTR_PORT);
+                    }
+                    else {
+                        CommunicationSpi commSpi = 
ignite.configuration().getCommunicationSpi();
+
+                        if (commSpi instanceof TcpCommunicationSpi) {
+                            commPort = 
((TcpCommunicationSpi)commSpi).boundPort();
+
+                            if (commPort == -1)
+                                commPort = null;
+                        }
+                    }
+
+                    if (commPort == null) {
+                        U.warn(log, "Can not initialize default consistentId, 
TcpCommunicationSpi port is not initialized.");
+
+                        consistentId = ignite.configuration().getNodeId();
+                    }
+                    else
+                        consistentId = U.consistentId(sortedAddrs, commPort);
+                }
+            }
+        }
+
+        return consistentId;
+    }
+
+    /**
+     *
+     */
+    private void initAddresses() {
+        if (addrs == null) {
+            String locHost = ignite != null ? 
ignite.configuration().getLocalHost() : null;
+
+            InetAddress locAddr;
+
+            try {
+                locAddr = U.resolveLocalHost(locHost);
+            }
+            catch (IOException e) {
+                throw new IgniteSpiException("Unknown local address: " + 
locHost, e);
+            }
+
+            try {
+                addrs = U.resolveLocalAddresses(locAddr);
+            }
+            catch (Exception e) {
+                throw new IgniteSpiException("Failed to resolve local host to 
set of external addresses: " + locHost,
+                    e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> getRemoteNodes() {
+        return impl.remoteNodes();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode getLocalNode() {
+        return impl != null ? impl.localNode() : null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode getNode(UUID nodeId) {
+        return impl.node(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        return impl.pingNode(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setNodeAttributes(Map<String, Object> attrs, 
IgniteProductVersion ver) {
+        assert locNodeAttrs == null;
+        assert locNodeVer == null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Node attributes to set: " + attrs);
+            log.debug("Node version to set: " + ver);
+        }
+
+        locNodeAttrs = attrs;
+        locNodeVer = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+        this.lsnr = lsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDataExchange(DiscoverySpiDataExchange exchange) {
+        this.exchange = exchange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setMetricsProvider(DiscoveryMetricsProvider 
metricsProvider) {
+        this.metricsProvider = metricsProvider;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void disconnect() throws IgniteSpiException {
+        impl.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAuthenticator(DiscoverySpiNodeAuthenticator auth) 
{
+        this.nodeAuth = auth;
+    }
+
+    /**
+     * @return Authenticator.
+     */
+    public DiscoverySpiNodeAuthenticator getAuthenticator() {
+        return nodeAuth;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getGridStartTime() {
+        return impl.gridStartTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+        IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr;
+
+        if (internalLsnr != null) {
+            if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
+                return;
+        }
+
+        impl.sendCustomMessage(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+        impl.failNode(nodeId, warning);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClientMode() throws IllegalStateException {
+        return impl.localNode().isClient();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) 
throws IgniteSpiException {
+        super.onContextInitialized0(spiCtx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStart(@Nullable String igniteInstanceName) throws 
IgniteSpiException {
+        if (sesTimeout == 0)
+            sesTimeout = 
ignite.configuration().getFailureDetectionTimeout().intValue();
+
+        assertParameter(sesTimeout > 0, "sessionTimeout > 0");
+
+        A.notNullOrEmpty(zkConnectionString, "zkConnectionString can not be 
empty");
+
+        A.notNullOrEmpty(zkRootPath, "zkRootPath can not be empty");
+
+        zkRootPath = zkRootPath.trim();
+
+        if (zkRootPath.endsWith("/"))
+            zkRootPath = zkRootPath.substring(0, zkRootPath.length() - 1);
+
+        try {
+            PathUtils.validatePath(zkRootPath);
+        }
+        catch (IllegalArgumentException e) {
+            throw new IgniteSpiException("zkRootPath is invalid: " + 
zkRootPath, e);
+        }
+
+        ZookeeperClusterNode locNode = initLocalNode();
+
+        if (log.isInfoEnabled()) {
+            log.info("Start Zookeeper discovery [zkConnectionString=" + 
zkConnectionString +
+                ", sessionTimeout=" + sesTimeout +
+                ", zkRootPath=" + zkRootPath + ']');
+        }
+
+        impl = new ZookeeperDiscoveryImpl(
+            this,
+            igniteInstanceName,
+            log,
+            zkRootPath,
+            locNode,
+            lsnr,
+            exchange,
+            internalLsnr);
+
+        try {
+            impl.startJoinAndWait();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteSpiException("Failed to join cluster, thread was 
interrupted", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+        if (impl != null)
+            impl.internalLsnr = lsnr;
+        else
+            internalLsnr = lsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void simulateNodeFailure() {
+        impl.simulateNodeFailure();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void spiStop() throws IgniteSpiException {
+        if (impl != null)
+            impl.stop();
+    }
+
+    /**
+     * @return Local node instance.
+     */
+    private ZookeeperClusterNode initLocalNode() {
+        assert ignite != null;
+
+        initAddresses();
+
+        ZookeeperClusterNode locNode = new ZookeeperClusterNode(
+            ignite.configuration().getNodeId(),
+            addrs.get1(),
+            addrs.get2(),
+            locNodeVer,
+            locNodeAttrs,
+            consistentId(),
+            sesTimeout,
+            ignite.configuration().isClientMode(),
+            metricsProvider);
+
+        locNode.local(true);
+
+        DiscoverySpiListener lsnr = this.lsnr;
+
+        if (lsnr != null)
+            lsnr.onLocalNodeInitialized(locNode);
+
+        if (log.isDebugEnabled())
+            log.debug("Local node initialized: " + locNode);
+
+        if (metricsProvider != null) {
+            locNode.setMetrics(metricsProvider.metrics());
+            locNode.setCacheMetrics(metricsProvider.cacheMetrics());
+        }
+
+        return locNode;
+    }
+
+    /**
+     * Used in tests (called via reflection).
+     *
+     * @return Copy of SPI.
+     */
+    private ZookeeperDiscoverySpi cloneSpiConfiguration() {
+        ZookeeperDiscoverySpi spi = new ZookeeperDiscoverySpi();
+
+        spi.setZkRootPath(zkRootPath);
+        spi.setZkConnectionString(zkConnectionString);
+        spi.setSessionTimeout(sesTimeout);
+        spi.setJoinTimeout(joinTimeout);
+        spi.setClientReconnectDisabled(clientReconnectDisabled);
+
+        return spi;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZookeeperDiscoverySpi.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
new file mode 100644
index 0000000..b80a9dd
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractCallabck.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+
+/**
+ *
+ */
+abstract class ZkAbstractCallabck {
+    /** */
+    final ZkRuntimeState rtState;
+
+    /** */
+    private final ZookeeperDiscoveryImpl impl;
+
+    /** */
+    private final GridSpinBusyLock busyLock;
+
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkAbstractCallabck(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+        this.rtState = rtState;
+        this.impl = impl;
+
+        busyLock = impl.busyLock;
+    }
+
+    /**
+     * @return {@code True} if is able to start processing.
+     */
+    final boolean onProcessStart() {
+        boolean start = rtState.errForClose == null && busyLock.enterBusy();
+
+        if (!start) {
+            assert rtState.errForClose != null;
+
+            onStartFailed();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     *
+     */
+    void onStartFailed() {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    final void onProcessEnd() {
+        busyLock.leaveBusy();
+    }
+
+    /**
+     * @param e Error.
+     */
+    final void onProcessError(Throwable e) {
+        impl.onFatalError(busyLock, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
new file mode 100644
index 0000000..2292e35
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractChildrenCallback.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.List;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ */
+abstract class ZkAbstractChildrenCallback extends ZkAbstractCallabck 
implements AsyncCallback.Children2Callback {
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkAbstractChildrenCallback(ZkRuntimeState rtState, ZookeeperDiscoveryImpl 
impl) {
+        super(rtState, impl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void processResult(int rc, String path, Object ctx, 
List<String> children, Stat stat) {
+        if (!onProcessStart())
+            return;
+
+        try {
+            processResult0(rc, path, ctx, children, stat);
+
+            onProcessEnd();
+        }
+        catch (Throwable e) {
+            onProcessError(e);
+        }
+    }
+
+    /**
+     * @param rc
+     * @param path
+     * @param ctx
+     * @param children
+     * @param stat
+     * @throws Exception If failed.
+     */
+    abstract void processResult0(int rc, String path, Object ctx, List<String> 
children, Stat stat)
+        throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
new file mode 100644
index 0000000..9098d05
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAbstractWatcher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+abstract class ZkAbstractWatcher extends ZkAbstractCallabck implements Watcher 
{
+    /**
+     * @param rtState Runtime state.
+     * @param impl Discovery impl.
+     */
+    ZkAbstractWatcher(ZkRuntimeState rtState, ZookeeperDiscoveryImpl impl) {
+        super(rtState, impl);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void process(WatchedEvent evt) {
+        if (!onProcessStart())
+            return;
+
+        try {
+            process0(evt);
+
+            onProcessEnd();
+        }
+        catch (Throwable e) {
+            onProcessError(e);
+        }
+    }
+
+    /**
+     * @param evt Event.
+     * @throws Exception If failed.
+     */
+    protected abstract void process0(WatchedEvent evt) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
new file mode 100644
index 0000000..d824377
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkAliveNodeData.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Zk Alive Node Data.
+ */
+public class ZkAliveNodeData implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    long lastProcEvt = -1;
+
+    /** */
+    transient boolean needUpdate;
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkAliveNodeData.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
new file mode 100644
index 0000000..a186aed
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ *
+ */
+class ZkBulkJoinContext {
+    /** */
+    List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes;
+
+    /**
+     * @param nodeEvtData Node event data.
+     * @param discoData Discovery data for node.
+     */
+    void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer, 
Serializable> discoData) {
+        if (nodes == null)
+            nodes = new ArrayList<>();
+
+        nodes.add(new T2<>(nodeEvtData, discoData));
+    }
+
+    /**
+     * @return Number of joined nodes.
+     */
+    int nodes() {
+        return nodes != null ? nodes.size() : 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
new file mode 100644
index 0000000..7e2ea7b
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkClusterNodes.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Zk Cluster Nodes.
+ */
+public class ZkClusterNodes {
+    /** */
+    final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByOrder = new 
ConcurrentSkipListMap<>();
+
+    /** */
+    final ConcurrentSkipListMap<Long, ZookeeperClusterNode> nodesByInternalId 
= new ConcurrentSkipListMap<>();
+
+    /** */
+    final ConcurrentHashMap<UUID, ZookeeperClusterNode> nodesById = new 
ConcurrentHashMap<>();
+
+    /**
+     * @return Remote nodes.
+     */
+    public Collection<ClusterNode> remoteNodes() {
+        List<ClusterNode> nodes = new ArrayList<>();
+
+        for (ClusterNode node : nodesById.values()) {
+            if (!node.isLocal())
+                nodes.add(node);
+        }
+
+        return nodes;
+    }
+
+    /**
+     * @return Current nodes in topology.
+     */
+    @SuppressWarnings("unchecked")
+    List<ClusterNode> topologySnapshot() {
+        return new ArrayList<>((Collection)nodesByOrder.values());
+    }
+
+    /**
+     * @param node New node.
+     */
+    void addNode(ZookeeperClusterNode node) {
+        assert node.id() != null : node;
+        assert node.order() > 0 : node;
+
+        ZookeeperClusterNode old = nodesById.put(node.id(), node);
+
+        assert old == null : old;
+
+        old = nodesByOrder.put(node.order(), node);
+
+        assert old == null : old;
+
+        old = nodesByInternalId.put(node.internalId(), node);
+
+        assert old == null : old;
+    }
+
+    /**
+     * @param internalId Node internal ID.
+     * @return Removed node.
+     */
+    ZookeeperClusterNode removeNode(long internalId) {
+        ZookeeperClusterNode node = nodesByInternalId.remove(internalId);
+
+        assert node != null : internalId;
+        assert node.order() > 0 : node;
+
+        Object rvmd = nodesByOrder.remove(node.order());
+
+        assert rvmd != null;
+
+        rvmd = nodesById.remove(node.id());
+
+        assert rvmd != null;
+
+        return node;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
new file mode 100644
index 0000000..9c21f13
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorNodeState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/**
+ *
+ */
+class ZkCommunicationErrorNodeState implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final BitSet commState;
+
+    /** */
+    final Exception err;
+
+    /**
+     * @param commState Communication state.
+     * @param err Error if failed get communication state..
+     */
+    ZkCommunicationErrorNodeState(BitSet commState, Exception err) {
+        assert commState != null || err != null;
+
+        this.commState = commState;
+        this.err = err;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
new file mode 100644
index 0000000..accda6e
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorProcessFuture.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future is created on each node when either connection error occurs or 
resolve communication error request
+ * received.
+ */
+class ZkCommunicationErrorProcessFuture extends GridFutureAdapter<Void> 
implements IgniteSpiTimeoutObject, Runnable {
+    /** */
+    private final ZookeeperDiscoveryImpl impl;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final Map<Long, GridFutureAdapter<Boolean>> nodeFuts = new 
ConcurrentHashMap<>();
+
+    /** */
+    private final long endTime;
+
+    /** */
+    private final IgniteUuid id;
+
+    /** */
+    private State state;
+
+    /** */
+    private long resolveTopVer;
+
+    /** */
+    private Set<Long> resFailedNodes;
+
+    /** */
+    private Exception resErr;
+
+    /** */
+    private ZkDistributedCollectDataFuture collectResFut;
+
+    /**
+     * @param impl Discovery impl.
+     * @param timeout Timeout to wait before initiating resolve process.
+     * @return Future.
+     */
+    static ZkCommunicationErrorProcessFuture 
createOnCommunicationError(ZookeeperDiscoveryImpl impl, long timeout) {
+        return new ZkCommunicationErrorProcessFuture(impl, State.WAIT_TIMEOUT, 
timeout);
+    }
+
+    /**
+     * @param impl Discovery impl.
+     * @return Future.
+     */
+    static ZkCommunicationErrorProcessFuture 
createOnStartResolveRequest(ZookeeperDiscoveryImpl impl) {
+        return new ZkCommunicationErrorProcessFuture(impl, 
State.RESOLVE_STARTED, 0);
+    }
+
+    /**
+     * @param impl Discovery implementation.
+     * @param state Initial state.
+     * @param timeout Wait timeout before initiating communication errors 
resolve.
+     */
+    private ZkCommunicationErrorProcessFuture(ZookeeperDiscoveryImpl impl, 
State state, long timeout) {
+        assert state != State.DONE;
+
+        this.impl = impl;
+        this.log = impl.log();
+
+        if (state == State.WAIT_TIMEOUT) {
+            assert timeout > 0 : timeout;
+
+            id = IgniteUuid.fromUuid(impl.localNode().id());
+            endTime = System.currentTimeMillis() + timeout;
+        }
+        else {
+            id = null;
+            endTime = 0;
+        }
+
+        this.state = state;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteLogger logger() {
+        return log;
+    }
+
+    /**
+     * @param collectResFut Collect nodes' communication status future.
+     */
+    void nodeResultCollectFuture(ZkDistributedCollectDataFuture collectResFut) 
{
+        assert this.collectResFut == null : collectResFut;
+
+        this.collectResFut = collectResFut;
+    }
+
+    /**
+     * @param top Topology.
+     * @throws Exception If failed.
+     */
+    void onTopologyChange(ZkClusterNodes top) throws Exception {
+        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : 
nodeFuts.entrySet()) {
+            if (!top.nodesByOrder.containsKey(e.getKey()))
+                e.getValue().onDone(false);
+        }
+
+        if (collectResFut != null)
+            collectResFut.onTopologyChange(top);
+    }
+
+    /**
+     * @param rtState Runtime state.
+     * @param futPath Future path.
+     * @param nodes Nodes to ping.
+     */
+    void checkConnection(final ZkRuntimeState rtState, final String futPath, 
List<ClusterNode> nodes) {
+        final TcpCommunicationSpi spi = 
(TcpCommunicationSpi)impl.spi.ignite().configuration().getCommunicationSpi();
+
+        IgniteFuture<BitSet> fut = spi.checkConnection(nodes);
+
+        fut.listen(new IgniteInClosure<IgniteFuture<BitSet>>() {
+            @Override public void apply(final IgniteFuture<BitSet> fut) {
+                // Future completed either from NIO thread or timeout worker, 
save result from another thread.
+                impl.runInWorkerThread(new ZkRunnable(rtState, impl) {
+                    @Override public void run0() throws Exception {
+                        BitSet commState = null;
+                        Exception err = null;
+
+                        try {
+                            commState = fut.get();
+                        }
+                        catch (Exception e) {
+                            err = e;
+                        }
+
+                        ZkCommunicationErrorNodeState state = new 
ZkCommunicationErrorNodeState(commState, err);
+
+                        ZkDistributedCollectDataFuture.saveNodeResult(futPath,
+                            rtState.zkClient,
+                            impl.localNode().order(),
+                            impl.marshalZip(state));
+                    }
+
+                    @Override void onStartFailed() {
+                        onError(rtState.errForClose);
+                    }
+                });
+
+            }
+        });
+    }
+
+    /**
+     *
+     */
+    void scheduleCheckOnTimeout() {
+        synchronized (this) {
+            if (state == State.WAIT_TIMEOUT)
+                impl.spi.getSpiContext().addTimeoutObject(this);
+        }
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code False} if future was already completed and need create 
another future instance.
+     */
+    boolean onStartResolveRequest(long topVer) {
+        synchronized (this) {
+            if (state == State.DONE)
+                return false;
+
+            if (state == State.WAIT_TIMEOUT)
+                impl.spi.getSpiContext().removeTimeoutObject(this);
+
+            assert resolveTopVer == 0 : resolveTopVer;
+
+            resolveTopVer = topVer;
+
+            state = State.RESOLVE_STARTED;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param err Error.
+     */
+    void onError(Exception err) {
+        assert err != null;
+
+        Map<Long, GridFutureAdapter<Boolean>> futs;
+
+        synchronized (this) {
+            if (state == State.DONE) {
+                assert resErr != null;
+
+                return;
+            }
+
+            state = State.DONE;
+
+            resErr = err;
+
+            futs = nodeFuts; // nodeFuts should not be modified after state 
changed to DONE.
+        }
+
+        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet())
+            e.getValue().onDone(err);
+
+        onDone(err);
+    }
+
+    /**
+     * @param failedNodes Node failed as result of resolve process.
+     */
+    void onFinishResolve(Set<Long> failedNodes) {
+        Map<Long, GridFutureAdapter<Boolean>> futs;
+
+        synchronized (this) {
+            if (state == State.DONE) {
+                assert resErr != null;
+
+                return;
+            }
+
+            assert state == State.RESOLVE_STARTED : state;
+
+            state = State.DONE;
+
+            resFailedNodes = failedNodes;
+
+            futs = nodeFuts; // nodeFuts should not be modified after state 
changed to DONE.
+        }
+
+        for (Map.Entry<Long, GridFutureAdapter<Boolean>> e : futs.entrySet()) {
+            Boolean res = !F.contains(resFailedNodes, e.getKey());
+
+            e.getValue().onDone(res);
+        }
+
+        onDone();
+    }
+
+    /**
+     * @param node Node.
+     * @return Future finished when communication error resolve is done or 
{@code null} if another
+     *      resolve process should be started.
+     */
+    @Nullable IgniteInternalFuture<Boolean> nodeStatusFuture(ClusterNode node) 
{
+        GridFutureAdapter<Boolean> fut;
+
+        synchronized (this) {
+            if (state == State.DONE) {
+                if (resolveTopVer != 0 && node.order() <= resolveTopVer) {
+                    Boolean res = !F.contains(resFailedNodes, node.order());
+
+                    return new GridFinishedFuture<>(res);
+                }
+                else
+                    return null;
+            }
+
+            fut = nodeFuts.get(node.order());
+
+            if (fut == null)
+                nodeFuts.put(node.order(), fut = new GridFutureAdapter<>());
+        }
+
+        if (impl.node(node.order()) == null)
+            fut.onDone(false);
+
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        // Run from zk discovery worker pool after timeout.
+        if (needProcessTimeout()) {
+            try {
+                UUID reqId = UUID.randomUUID();
+
+                if (log.isInfoEnabled()) {
+                    log.info("Initiate cluster-wide communication error 
resolve process [reqId=" + reqId +
+                        ", errNodes=" + nodeFuts.size() + ']');
+                }
+
+                impl.sendCustomMessage(new 
ZkCommunicationErrorResolveStartMessage(reqId));
+            }
+            catch (Exception e) {
+                Collection<GridFutureAdapter<Boolean>> futs;
+
+                synchronized (this) {
+                    if (state != State.WAIT_TIMEOUT)
+                        return;
+
+                    state = State.DONE;
+                    resErr = e;
+
+                    futs = nodeFuts.values(); // nodeFuts should not be 
modified after state changed to DONE.
+                }
+
+                for (GridFutureAdapter<Boolean> fut : futs)
+                    fut.onDone(e);
+
+                onDone(e);
+            }
+        }
+    }
+
+    /**
+     * @return {@code True} if need initiate resolve process after timeout 
expired.
+     */
+    private boolean needProcessTimeout() {
+        synchronized (this) {
+            if (state != State.WAIT_TIMEOUT)
+                return false;
+
+            for (GridFutureAdapter<Boolean> fut : nodeFuts.values()) {
+                if (!fut.isDone())
+                    return true;
+            }
+
+            state = State.DONE;
+        }
+
+        onDone(null, null);
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return endTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (needProcessTimeout())
+            impl.runInWorkerThread(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable 
err) {
+        if (super.onDone(res, err)) {
+            impl.clearCommunicationErrorProcessFuture(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkCommunicationErrorProcessFuture.class, this);
+    }
+
+    /**
+     *
+     */
+    enum State {
+        /** */
+        DONE,
+
+        /** */
+        WAIT_TIMEOUT,
+
+        /** */
+        RESOLVE_STARTED
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
new file mode 100644
index 0000000..9b7476c
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkCommunicationErrorResolveFinishMessage implements 
DiscoverySpiCustomMessage, ZkInternalMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final UUID futId;
+
+    /** */
+    final long topVer;
+
+    /** */
+    transient ZkCommunicationErrorResolveResult res;
+
+    /**
+     * @param futId Future ID.
+     * @param topVer Topology version when resolve process finished.
+     */
+    ZkCommunicationErrorResolveFinishMessage(UUID futId, long topVer) {
+        this.futId = futId;
+        this.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ZkCommunicationErrorResolveFinishMessage.class, 
this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
new file mode 100644
index 0000000..23495aa
--- /dev/null
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.GridLongList;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class ZkCommunicationErrorResolveResult implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    final GridLongList killedNodes;
+
+    /** */
+    final Exception err;
+
+    /**
+     * @param killedNodes Killed nodes.
+     * @param err Error.
+     */
+    ZkCommunicationErrorResolveResult(@Nullable GridLongList killedNodes, 
Exception err) {
+        this.killedNodes = killedNodes;
+        this.err = err;
+    }
+}

Reply via email to