http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
new file mode 100644
index 0000000..fb12c3a
--- /dev/null
+++ 
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTest.java
@@ -0,0 +1,4847 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingZooKeeperServer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CommunicationFailureContext;
+import org.apache.ignite.configuration.CommunicationFailureResolver;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.DiscoverySpiTestListener;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import 
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.security.SecurityCredentials;
+import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.plugin.security.SecuritySubject;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
+import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestSuite2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZkTestClientCnxnSocketNIO;
+import org.apache.zookeeper.ZooKeeper;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2;
+import static 
org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD;
+import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ *
+ */
+@SuppressWarnings("deprecation")
+public class ZookeeperDiscoverySpiTest extends GridCommonAbstractTest {
+    /** */
+    private static final String IGNITE_ZK_ROOT = 
ZookeeperDiscoverySpi.DFLT_ROOT_PATH;
+
+    /** */
+    private static final int ZK_SRVS = 3;
+
+    /** */
+    private static TestingCluster zkCluster;
+
+    /** To run test with real local ZK. */
+    private static final boolean USE_TEST_CLUSTER = true;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static ThreadLocal<Boolean> clientThreadLoc = new ThreadLocal<>();
+
+    /** */
+    private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = 
new ConcurrentHashMap<>();
+
+    /** */
+    private static volatile boolean err;
+
+    /** */
+    private boolean testSockNio;
+
+    /** */
+    private boolean testCommSpi;
+
+    /** */
+    private long sesTimeout;
+
+    /** */
+    private long joinTimeout;
+
+    /** */
+    private boolean clientReconnectDisabled;
+
+    /** */
+    private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new 
ConcurrentHashMap<>();
+
+    /** */
+    private Map<String, Object> userAttrs;
+
+    /** */
+    private boolean dfltConsistenId;
+
+    /** */
+    private UUID nodeId;
+
+    /** */
+    private boolean persistence;
+
+    /** */
+    private IgniteOutClosure<CommunicationFailureResolver> commFailureRslvr;
+
+    /** */
+    private IgniteOutClosure<DiscoverySpiNodeAuthenticator> auth;
+
+    /** */
+    private String zkRootPath;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String 
igniteInstanceName) throws Exception {
+        if (testSockNio)
+            System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, 
ZkTestClientCnxnSocketNIO.class.getName());
+
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (nodeId != null)
+            cfg.setNodeId(nodeId);
+
+        if (!dfltConsistenId)
+            cfg.setConsistentId(igniteInstanceName);
+
+        ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+
+        if (joinTimeout != 0)
+            zkSpi.setJoinTimeout(joinTimeout);
+
+        zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
+
+        zkSpi.setClientReconnectDisabled(clientReconnectDisabled);
+
+        // Set authenticator for basic sanity tests.
+        if (auth != null) {
+            zkSpi.setAuthenticator(auth.apply());
+
+            zkSpi.setInternalListener(new IgniteDiscoverySpiInternalListener() 
{
+                @Override public void beforeJoin(ClusterNode locNode, 
IgniteLogger log) {
+                    ZookeeperClusterNode locNode0 = 
(ZookeeperClusterNode)locNode;
+
+                    Map<String, Object> attrs = new 
HashMap<>(locNode0.getAttributes());
+
+                    attrs.put(ATTR_SECURITY_CREDENTIALS, new 
SecurityCredentials(null, null, igniteInstanceName));
+
+                    locNode0.setAttributes(attrs);
+                }
+
+                @Override public boolean beforeSendCustomEvent(DiscoverySpi 
spi, IgniteLogger log, DiscoverySpiCustomMessage msg) {
+                    return false;
+                }
+            });
+        }
+
+        spis.put(igniteInstanceName, zkSpi);
+
+        if (USE_TEST_CLUSTER) {
+            assert zkCluster != null;
+
+            zkSpi.setZkConnectionString(zkCluster.getConnectString());
+
+            if (zkRootPath != null)
+                zkSpi.setZkRootPath(zkRootPath);
+        }
+        else
+            zkSpi.setZkConnectionString("localhost:2181");
+
+        cfg.setDiscoverySpi(zkSpi);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        Boolean clientMode = clientThreadLoc.get();
+
+        if (clientMode != null)
+            cfg.setClientMode(clientMode);
+        else
+            cfg.setClientMode(client);
+
+        if (userAttrs != null)
+            cfg.setUserAttributes(userAttrs);
+
+        Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>();
+
+        lsnrs.put(new IgnitePredicate<Event>() {
+            /** */
+            @IgniteInstanceResource
+            private Ignite ignite;
+
+            
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+            @Override public boolean apply(Event evt) {
+                try {
+                    DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt;
+
+                    UUID locId = 
((IgniteKernal)ignite).context().localNodeId();
+
+                    Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId);
+
+                    if (nodeEvts == null) {
+                        Object old = evts.put(locId, nodeEvts = new 
TreeMap<>());
+
+                        assertNull(old);
+
+                        synchronized (nodeEvts) {
+                            DiscoveryLocalJoinData locJoin = 
((IgniteKernal)ignite).context().discovery().localJoin();
+
+                            nodeEvts.put(locJoin.event().topologyVersion(), 
locJoin.event());
+                        }
+                    }
+
+                    synchronized (nodeEvts) {
+                        DiscoveryEvent old = 
nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt);
+
+                        assertNull(old);
+                    }
+                }
+                catch (Throwable e) {
+                    error("Unexpected error [evt=" + evt + ", err=" + e + ']', 
e);
+
+                    err = true;
+                }
+
+                return true;
+            }
+        }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT});
+
+        cfg.setLocalEventListeners(lsnrs);
+
+        if (persistence) {
+            DataStorageConfiguration memCfg = new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setMaxSize(100 * 1024 * 1024).
+                    setPersistenceEnabled(true))
+                .setPageSize(1024)
+                .setWalMode(WALMode.LOG_ONLY);
+
+            cfg.setDataStorageConfiguration(memCfg);
+        }
+
+        if (testCommSpi)
+            cfg.setCommunicationSpi(new ZkTestCommunicationSpi());
+
+        if (commFailureRslvr != null)
+            cfg.setCommunicationFailureResolver(commFailureRslvr.apply());
+
+        return cfg;
+    }
+
+    /**
+     * @param clientMode Client mode flag for started nodes.
+     */
+    private void clientMode(boolean clientMode) {
+        client = clientMode;
+    }
+
+    /**
+     * @param clientMode Client mode flag for nodes started from current 
thread.
+     */
+    private void clientModeThreadLocal(boolean clientMode) {
+        clientThreadLoc.set(clientMode);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        
System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT,
 "1000");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopZkCluster();
+
+        
System.clearProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_TIMEOUT);
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     *
+     */
+    private void stopZkCluster() {
+        if (zkCluster != null) {
+            try {
+                zkCluster.close();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to stop Zookeeper client: " + e, e);
+            }
+
+            zkCluster = null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static void ackEveryEventSystemProperty() {
+        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
+    }
+
+    /**
+     *
+     */
+    private void clearAckEveryEventSystemProperty() {
+        System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        if (USE_TEST_CLUSTER && zkCluster == null) {
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
+
+            zkCluster.start();
+        }
+
+        reset();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        clearAckEveryEventSystemProperty();
+
+        try {
+            assertFalse("Unexpected error, see log for details", err);
+
+            checkEventsConsistency();
+
+            checkInternalStructuresCleanup();
+
+            //TODO uncomment when 
https://issues.apache.org/jira/browse/IGNITE-8193 is fixed
+//            checkZkNodesCleanup();
+        }
+        finally {
+            reset();
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkInternalStructuresCleanup() throws Exception {
+        for (Ignite node : G.allGrids()) {
+            final AtomicReference<?> res = 
GridTestUtils.getFieldValue(spi(node), "impl", "commErrProcFut");
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return res.get() == null;
+                }
+            }, 30_000);
+
+            assertNull(res.get());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testZkRootNotExists() throws Exception {
+        zkRootPath = "/a/b/c";
+
+        for (int i = 0; i < 3; i++) {
+            reset();
+
+            startGridsMultiThreaded(5);
+
+            waitForTopology(5);
+
+            stopAllGrids();
+
+            checkEventsConsistency();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMetadataUpdate() throws Exception {
+        startGrid(0);
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite(0).configuration().getMarshaller().marshal(new C1());
+                ignite(0).configuration().getMarshaller().marshal(new C2());
+
+                return null;
+            }
+        }, 64, "marshal");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeAddresses() throws Exception {
+        startGridsMultiThreaded(3);
+
+        clientMode(true);
+
+        startGridsMultiThreaded(3, 3);
+
+        waitForTopology(6);
+
+        for (Ignite node : G.allGrids()) {
+            ClusterNode locNode0 = node.cluster().localNode();
+
+            assertTrue(locNode0.addresses().size() > 0);
+            assertTrue(locNode0.hostNames().size() > 0);
+
+            for (ClusterNode node0 : node.cluster().nodes()) {
+                assertTrue(node0.addresses().size() > 0);
+                assertTrue(node0.hostNames().size() > 0);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSetConsistentId() throws Exception {
+        startGridsMultiThreaded(3);
+
+        clientMode(true);
+
+        startGridsMultiThreaded(3, 3);
+
+        waitForTopology(6);
+
+        for (Ignite node : G.allGrids()) {
+            ClusterNode locNode0 = node.cluster().localNode();
+
+            assertEquals(locNode0.attribute(ATTR_IGNITE_INSTANCE_NAME),
+                locNode0.consistentId());
+
+            for (ClusterNode node0 : node.cluster().nodes()) {
+                assertEquals(node0.attribute(ATTR_IGNITE_INSTANCE_NAME),
+                    node0.consistentId());
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultConsistentId() throws Exception {
+        dfltConsistenId = true;
+
+        startGridsMultiThreaded(3);
+
+        clientMode(true);
+
+        startGridsMultiThreaded(3, 3);
+
+        waitForTopology(6);
+
+        for (Ignite node : G.allGrids()) {
+            ClusterNode locNode0 = node.cluster().localNode();
+
+            assertNotNull(locNode0.consistentId());
+
+            for (ClusterNode node0 : node.cluster().nodes())
+                assertNotNull(node0.consistentId());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientNodesStatus() throws Exception {
+        startGrid(0);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(0, node.cluster().forClients().nodes().size());
+            assertEquals(1, node.cluster().forServers().nodes().size());
+        }
+
+        clientMode(true);
+
+        startGrid(1);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(1, node.cluster().forClients().nodes().size());
+            assertEquals(1, node.cluster().forServers().nodes().size());
+        }
+
+        clientMode(false);
+
+        startGrid(2);
+
+        clientMode(true);
+
+        startGrid(3);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(2, node.cluster().forClients().nodes().size());
+            assertEquals(2, node.cluster().forServers().nodes().size());
+        }
+
+        stopGrid(1);
+
+        waitForTopology(3);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(1, node.cluster().forClients().nodes().size());
+            assertEquals(2, node.cluster().forServers().nodes().size());
+        }
+
+        stopGrid(2);
+
+        waitForTopology(2);
+
+        for (Ignite node : G.allGrids()) {
+            assertEquals(1, node.cluster().forClients().nodes().size());
+            assertEquals(1, node.cluster().forServers().nodes().size());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testLocalAuthenticationFails() throws Exception {
+        auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(0));
+
+        Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(0);
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class);
+
+        assertNotNull(spiErr);
+        assertTrue(spiErr.getMessage().contains("Authentication failed for 
local node"));
+
+        startGrid(1);
+        startGrid(2);
+
+        checkTestSecuritySubject(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAuthentication() throws Exception {
+        auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(1),
+            getTestIgniteInstanceName(5));
+
+        startGrid(0);
+
+        checkTestSecuritySubject(1);
+
+        {
+            clientMode(false);
+            checkStartFail(1);
+
+            clientMode(true);
+            checkStartFail(1);
+
+            clientMode(false);
+        }
+
+        startGrid(2);
+
+        checkTestSecuritySubject(2);
+
+        stopGrid(2);
+
+        checkTestSecuritySubject(1);
+
+        startGrid(2);
+
+        checkTestSecuritySubject(2);
+
+        stopGrid(0);
+
+        checkTestSecuritySubject(1);
+
+        checkStartFail(1);
+
+        clientMode(false);
+
+        startGrid(3);
+
+        clientMode(true);
+
+        startGrid(4);
+
+        clientMode(false);
+
+        startGrid(0);
+
+        checkTestSecuritySubject(4);
+
+        checkStartFail(1);
+        checkStartFail(5);
+
+        clientMode(true);
+
+        checkStartFail(1);
+        checkStartFail(5);
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     */
+    private void checkStartFail(final int nodeIdx) {
+        Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                startGrid(nodeIdx);
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class);
+
+        assertNotNull(spiErr);
+        assertTrue(spiErr.getMessage().contains("Authentication failed"));
+    }
+
+    /**
+     * @param expNodes Expected nodes number.
+     * @throws Exception If failed.
+     */
+    private void checkTestSecuritySubject(int expNodes) throws Exception {
+        waitForTopology(expNodes);
+
+        List<Ignite> nodes = G.allGrids();
+
+        JdkMarshaller marsh = new JdkMarshaller();
+
+        for (Ignite ignite : nodes) {
+            Collection<ClusterNode> nodes0 = ignite.cluster().nodes();
+
+            assertEquals(nodes.size(), nodes0.size());
+
+            for (ClusterNode node : nodes0) {
+                byte[] secSubj = node.attribute(ATTR_SECURITY_SUBJECT_V2);
+
+                assertNotNull(secSubj);
+
+                ZkTestNodeAuthenticator.TestSecurityContext secCtx = 
marsh.unmarshal(secSubj, null);
+
+                assertEquals(node.attribute(ATTR_IGNITE_INSTANCE_NAME), 
secCtx.nodeName);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStopNode_1() throws Exception {
+        startGrids(5);
+
+        waitForTopology(5);
+
+        stopGrid(3);
+
+        waitForTopology(4);
+
+        startGrid(3);
+
+        waitForTopology(5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventsSimple1_SingleNode() throws Exception {
+        ackEveryEventSystemProperty();
+
+        Ignite srv0 = startGrid(0);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+
+        waitForEventsAcks(srv0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEventsSimple1_5_Nodes() throws Exception {
+        ackEveryEventSystemProperty();
+
+        Ignite srv0 = startGrids(5);
+
+        srv0.createCache(new CacheConfiguration<>("c1"));
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(srv0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEvents_FastStopProcess_1() throws Exception {
+        customEvents_FastStopProcess(1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCustomEvents_FastStopProcess_2() throws Exception {
+        customEvents_FastStopProcess(5, 5);
+    }
+
+    /**
+     * @param srvs Servers number.
+     * @param clients Clients number.
+     * @throws Exception If failed.
+     */
+    private void customEvents_FastStopProcess(int srvs, int clients) throws 
Exception {
+        ackEveryEventSystemProperty();
+
+        Map<UUID, List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>>> rcvdMsgs =
+            new ConcurrentHashMap<>();
+
+        Ignite crd = startGrid(0);
+
+        UUID crdId = crd.cluster().localNode().id();
+
+        if (srvs > 1)
+            startGridsMultiThreaded(1, srvs - 1);
+
+        if (clients > 0) {
+            client = true;
+
+            startGridsMultiThreaded(srvs, clients);
+        }
+
+        awaitPartitionMapExchange();
+
+        List<Ignite> nodes = G.allGrids();
+
+        assertEquals(srvs + clients, nodes.size());
+
+        for (Ignite node : nodes)
+            registerTestEventListeners(node, rcvdMsgs);
+
+        int payload = 0;
+
+        AffinityTopologyVersion topVer = 
((IgniteKernal)crd).context().discovery().topologyVersionEx();
+
+        for (Ignite node : nodes) {
+            UUID sndId = node.cluster().localNode().id();
+
+            info("Send from node: " + sndId);
+
+            GridDiscoveryManager discoveryMgr = 
((IgniteKernal)node).context().discovery();
+
+            {
+                List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
+                List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>> expNodesMsgs = Collections.emptyList();
+
+                TestFastStopProcessCustomMessage msg = new 
TestFastStopProcessCustomMessage(false, payload++);
+
+                expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>(topVer, sndId, msg));
+
+                discoveryMgr.sendCustomEvent(msg);
+
+                doSleep(200); // Wait some time to check extra messages are 
not received.
+
+                checkEvents(crd, rcvdMsgs, expCrdMsgs);
+
+                for (Ignite node0 : nodes) {
+                    if (node0 != crd)
+                        checkEvents(node0, rcvdMsgs, expNodesMsgs);
+                }
+
+                rcvdMsgs.clear();
+            }
+            {
+                List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>();
+                List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>> expNodesMsgs = new ArrayList<>();
+
+                TestFastStopProcessCustomMessage msg = new 
TestFastStopProcessCustomMessage(true, payload++);
+
+                expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>(topVer, sndId, msg));
+
+                discoveryMgr.sendCustomEvent(msg);
+
+                TestFastStopProcessCustomMessageAck ackMsg = new 
TestFastStopProcessCustomMessageAck(msg.payload);
+
+                expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>(topVer, crdId, ackMsg));
+                expNodesMsgs.add(new T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>(topVer, crdId, ackMsg));
+
+                doSleep(200); // Wait some time to check extra messages are 
not received.
+
+                checkEvents(crd, rcvdMsgs, expCrdMsgs);
+
+                for (Ignite node0 : nodes) {
+                    if (node0 != crd)
+                        checkEvents(node0, rcvdMsgs, expNodesMsgs);
+                }
+
+                rcvdMsgs.clear();
+            }
+
+            waitForEventsAcks(crd);
+        }
+    }
+
+    /**
+     * @param node Node to check.
+     * @param rcvdMsgs Received messages.
+     * @param expMsgs Expected messages.
+     * @throws Exception If failed.
+     */
+    private void checkEvents(
+        Ignite node,
+        final Map<UUID, List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>>> rcvdMsgs,
+        final List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> 
expMsgs) throws Exception {
+        final UUID nodeId = node.cluster().localNode().id();
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId);
+
+                int size = msgs == null ? 0 : msgs.size();
+
+                return size >= expMsgs.size();
+            }
+        }, 5000));
+
+        List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = 
rcvdMsgs.get(nodeId);
+
+        if (msgs == null)
+            msgs = Collections.emptyList();
+
+        assertEqualsCollections(expMsgs, msgs);
+    }
+
+    /**
+     * @param node Node.
+     * @param rcvdMsgs Map to store received events.
+     */
+    private void registerTestEventListeners(Ignite node,
+        final Map<UUID, List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>>> rcvdMsgs) {
+        GridDiscoveryManager discoveryMgr = 
((IgniteKernal)node).context().discovery();
+
+        final UUID nodeId = node.cluster().localNode().id();
+
+        
discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessage.class,
+            new CustomEventListener<TestFastStopProcessCustomMessage>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion 
topVer, ClusterNode snd, TestFastStopProcessCustomMessage msg) {
+                    List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
+
+                    if (list == null)
+                        rcvdMsgs.put(nodeId, list = new ArrayList<>());
+
+                    list.add(new T3<>(topVer, snd.id(), 
(DiscoveryCustomMessage)msg));
+                }
+            }
+        );
+        
discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessageAck.class,
+            new CustomEventListener<TestFastStopProcessCustomMessageAck>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion 
topVer, ClusterNode snd, TestFastStopProcessCustomMessageAck msg) {
+                    List<T3<AffinityTopologyVersion, UUID, 
DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId);
+
+                    if (list == null)
+                        rcvdMsgs.put(nodeId, list = new ArrayList<>());
+
+                    list.add(new T3<>(topVer, snd.id(), 
(DiscoveryCustomMessage)msg));
+                }
+            }
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSegmentation1() throws Exception {
+        sesTimeout = 2000;
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        final CountDownLatch l = new CountDownLatch(1);
+
+        node0.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                l.countDown();
+
+                return false;
+            }
+        }, EventType.EVT_NODE_SEGMENTED);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(true);
+
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1_000);
+
+            if (l.getCount() == 0)
+                break;
+        }
+
+        info("Allow connect");
+
+        c0.allowConnect();
+
+        assertTrue(l.await(10, TimeUnit.SECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSegmentation2() throws Exception {
+        sesTimeout = 2000;
+
+        Ignite node0 = startGrid(0);
+
+        final CountDownLatch l = new CountDownLatch(1);
+
+        node0.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                l.countDown();
+
+                return false;
+            }
+        }, EventType.EVT_NODE_SEGMENTED);
+
+        try {
+            zkCluster.close();
+
+            assertTrue(l.await(10, TimeUnit.SECONDS));
+        }
+        finally {
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
+
+            zkCluster.start();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSegmentation3() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8183";);
+
+        sesTimeout = 5000;
+
+        Ignite node0 = startGrid(0);
+
+        final CountDownLatch l = new CountDownLatch(1);
+
+        node0.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                l.countDown();
+
+                return false;
+            }
+        }, EventType.EVT_NODE_SEGMENTED);
+
+        List<TestingZooKeeperServer> srvs = zkCluster.getServers();
+
+        assertEquals(3, srvs.size());
+
+        try {
+            srvs.get(0).stop();
+            srvs.get(1).stop();
+
+            assertTrue(l.await(20, TimeUnit.SECONDS));
+        }
+        finally {
+            zkCluster.close();
+
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
+
+            zkCluster.start();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQuorumRestore() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8180";);
+
+        sesTimeout = 15_000;
+
+        startGrids(3);
+
+        waitForTopology(3);
+
+        List<TestingZooKeeperServer> srvs = zkCluster.getServers();
+
+        assertEquals(3, srvs.size());
+
+        try {
+            srvs.get(0).stop();
+            srvs.get(1).stop();
+
+            U.sleep(2000);
+
+            srvs.get(1).restart();
+
+            U.sleep(4000);
+
+            startGrid(4);
+
+            waitForTopology(4);
+        }
+        finally {
+            zkCluster.close();
+
+            zkCluster = 
ZookeeperDiscoverySpiTestSuite2.createTestingCluster(ZK_SRVS);
+
+            zkCluster.start();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore1() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGrid(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore2() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGridsMultiThreaded(1, 5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator1() throws Exception {
+        connectionRestore_NonCoordinator(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_NonCoordinator2() throws Exception {
+        connectionRestore_NonCoordinator(true);
+    }
+
+    /**
+     * @param failWhenDisconnected {@code True} if fail node while another 
node is disconnected.
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_NonCoordinator(boolean 
failWhenDisconnected) throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+        Ignite node1 = startGrid(1);
+
+        ZkTestClientCnxnSocketNIO c1 = 
ZkTestClientCnxnSocketNIO.forNode(node1);
+
+        c1.closeSocket(true);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+            @Override public Void call() {
+                try {
+                    startGrid(2);
+                }
+                catch (Exception e) {
+                    info("Start error: " + e);
+                }
+
+                return null;
+            }
+        }, "start-node");
+
+        checkEvents(node0, joinEvent(3));
+
+        if (failWhenDisconnected) {
+            ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2));
+
+            closeZkClient(spi);
+
+            checkEvents(node0, failEvent(4));
+        }
+
+        c1.allowConnect();
+
+        checkEvents(ignite(1), joinEvent(3));
+
+        if (failWhenDisconnected) {
+            checkEvents(ignite(1), failEvent(4));
+
+            IgnitionEx.stop(getTestIgniteInstanceName(2), true, true);
+        }
+
+        fut.get();
+
+        waitForTopology(failWhenDisconnected ? 2 : 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator1_1() throws Exception {
+        connectionRestore_Coordinator(1, 1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator2() throws Exception {
+        connectionRestore_Coordinator(1, 3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator3() throws Exception {
+        connectionRestore_Coordinator(3, 3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore_Coordinator4() throws Exception {
+        connectionRestore_Coordinator(3, 3, 1);
+    }
+
+    /**
+     * @param initNodes Number of initially started nodes.
+     * @param startNodes Number of nodes to start after coordinator loose 
connection.
+     * @param failCnt Number of nodes to stop after coordinator loose 
connection.
+     * @throws Exception If failed.
+     */
+    private void connectionRestore_Coordinator(final int initNodes, int 
startNodes, int failCnt) throws Exception {
+        sesTimeout = 30_000;
+        testSockNio = true;
+
+        Ignite node0 = startGrids(initNodes);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(true);
+
+        final AtomicInteger nodeIdx = new AtomicInteger(initNodes);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Callable<Void>() {
+            @Override public Void call() {
+                try {
+                    startGrid(nodeIdx.getAndIncrement());
+                }
+                catch (Exception e) {
+                    error("Start failed: " + e);
+                }
+
+                return null;
+            }
+        }, startNodes, "start-node");
+
+        int cnt = 0;
+
+        DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt];
+
+        int expEvtCnt = 0;
+
+        sesTimeout = 1000;
+
+        List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>();
+
+        final List<String> failedZkNodes = new ArrayList<>(failCnt);
+
+        for (int i = initNodes; i < initNodes + startNodes; i++) {
+            final ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(i));
+
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    Object spiImpl = GridTestUtils.getFieldValue(spi, "impl");
+
+                    if (spiImpl == null)
+                        return false;
+
+                    long internalOrder = GridTestUtils.getFieldValue(spiImpl, 
"rtState", "internalOrder");
+
+                    return internalOrder > 0;
+                }
+            }, 10_000));
+
+            if (cnt++ < failCnt) {
+                ZkTestClientCnxnSocketNIO c = 
ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i));
+
+                c.closeSocket(true);
+
+                blockedC.add(c);
+
+                failedZkNodes.add(aliveZkNodePath(spi));
+            }
+            else {
+                expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1);
+
+                expEvtCnt++;
+            }
+        }
+
+        waitNoAliveZkNodes(log, zkCluster.getConnectString(), failedZkNodes, 
30_000);
+
+        c0.allowConnect();
+
+        for (ZkTestClientCnxnSocketNIO c : blockedC)
+            c.allowConnect();
+
+        if (expEvts.length > 0) {
+            for (int i = 0; i < initNodes; i++)
+                checkEvents(ignite(i), expEvts);
+        }
+
+        fut.get();
+
+        waitForTopology(initNodes + startNodes - failCnt);
+    }
+
+    /**
+     * @param node Node.
+     * @return Corresponding znode.
+     */
+    private static String aliveZkNodePath(Ignite node) {
+        return aliveZkNodePath(node.configuration().getDiscoverySpi());
+    }
+
+    /**
+     * @param spi SPI.
+     * @return Znode related to given SPI.
+     */
+    private static String aliveZkNodePath(DiscoverySpi spi) {
+        String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", 
"locNodeZkPath");
+
+        return path.substring(path.lastIndexOf('/') + 1);
+    }
+
+    /**
+     * @param log Logger.
+     * @param connectString Zookeeper connect string.
+     * @param failedZkNodes Znodes which should be removed.
+     * @param timeout Timeout.
+     * @throws Exception If failed.
+     */
+    private static void waitNoAliveZkNodes(final IgniteLogger log,
+        String connectString,
+        final List<String> failedZkNodes,
+        long timeout)
+        throws Exception
+    {
+        final ZookeeperClient zkClient = new ZookeeperClient(log, 
connectString, 10_000, null);
+
+        try {
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        List<String> c = zkClient.getChildren(IGNITE_ZK_ROOT + 
"/" + ZkIgnitePaths.ALIVE_NODES_DIR);
+
+                        for (String failedZkNode : failedZkNodes) {
+                            if (c.contains(failedZkNode)) {
+                                log.info("Alive node is not removed [node=" + 
failedZkNode + ", all=" + c + ']');
+
+                                return false;
+                            }
+                        }
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+
+                        fail();
+
+                        return true;
+                    }
+                }
+            }, timeout));
+        }
+        finally {
+            zkClient.close();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartWithClient() throws Exception {
+        final int NODES = 20;
+
+        for (int i = 0; i < 3; i++) {
+            info("Iteration: " + i);
+
+            final int srvIdx = ThreadLocalRandom.current().nextInt(NODES);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int threadIdx = idx.getAndIncrement();
+
+                    clientModeThreadLocal(threadIdx == srvIdx || 
ThreadLocalRandom.current().nextBoolean());
+
+                    startGrid(threadIdx);
+
+                    return null;
+                }
+            }, NODES, "start-node");
+
+            waitForTopology(NODES);
+
+            stopAllGrids();
+
+            checkEventsConsistency();
+
+            evts.clear();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStart() throws Exception {
+        final int NODES = 20;
+
+        for (int i = 0; i < 3; i++) {
+            info("Iteration: " + i);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            final CyclicBarrier b = new CyclicBarrier(NODES);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    b.await();
+
+                    int threadIdx = idx.getAndIncrement();
+
+                    startGrid(threadIdx);
+
+                    return null;
+                }
+            }, NODES, "start-node");
+
+            waitForTopology(NODES);
+
+            stopAllGrids();
+
+            checkEventsConsistency();
+
+            evts.clear();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartStop1() throws Exception {
+       concurrentStartStop(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartStop2() throws Exception {
+        concurrentStartStop(5);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentStartStop2_EventsThrottle() throws Exception {
+        
System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS,
 "1");
+
+        try {
+            concurrentStartStop(5);
+        }
+        finally {
+            
System.clearProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS);
+        }
+    }
+
+    /**
+     * @param initNodes Number of initially started nnodes.
+     * @throws Exception If failed.
+     */
+    private void concurrentStartStop(final int initNodes) throws Exception {
+        startGrids(initNodes);
+
+        final int NODES = 5;
+
+        long topVer = initNodes;
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            DiscoveryEvent[] expEvts = new DiscoveryEvent[NODES];
+
+            startGridsMultiThreaded(initNodes, NODES);
+
+            for (int j = 0; j < NODES; j++)
+                expEvts[j] = joinEvent(++topVer);
+
+            checkEvents(ignite(0), expEvts);
+
+            checkEventsConsistency();
+
+            final CyclicBarrier b = new CyclicBarrier(NODES);
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    try {
+                        b.await();
+
+                        stopGrid(initNodes + idx);
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+
+                        fail();
+                    }
+                }
+            }, NODES, "stop-node");
+
+            for (int j = 0; j < NODES; j++)
+                expEvts[j] = failEvent(++topVer);
+
+            checkEventsConsistency();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClusterRestart() throws Exception {
+        startGridsMultiThreaded(3, false);
+
+        stopAllGrids();
+
+        evts.clear();
+
+        startGridsMultiThreaded(3, false);
+
+        waitForTopology(3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConnectionRestore4() throws Exception {
+        testSockNio = true;
+
+        Ignite node0 = startGrid(0);
+
+        ZkTestClientCnxnSocketNIO c0 = 
ZkTestClientCnxnSocketNIO.forNode(node0);
+
+        c0.closeSocket(false);
+
+        startGrid(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_1_Node() throws Exception {
+        startGrid(0);
+
+        waitForTopology(1);
+
+        stopGrid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestarts_2_Nodes() throws Exception {
+        startGrid(0);
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            startGrid(1);
+
+            waitForTopology(2);
+
+            stopGrid(1);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_2_Nodes_WithCache() throws Exception {
+        startGrids(2);
+
+        for (Ignite node : G.allGrids()) {
+            IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME);
+
+            assertNotNull(cache);
+
+            for (int i = 0; i < 100; i++) {
+                cache.put(i, node.name());
+
+                assertEquals(node.name(), cache.get(i));
+            }
+        }
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop_2_Nodes() throws Exception {
+        ackEveryEventSystemProperty();
+
+        startGrid(0);
+
+        waitForTopology(1);
+
+        startGrid(1);
+
+        waitForTopology(2);
+
+        for (Ignite node : G.allGrids())
+            node.compute().broadcast(new DummyCallable(null));
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleClusters() throws Exception {
+        Ignite c0 = startGrid(0);
+
+        zkRootPath = "/cluster2";
+
+        Ignite c1 = startGridsMultiThreaded(1, 5);
+
+        zkRootPath = "/cluster3";
+
+        Ignite c2 = startGridsMultiThreaded(6, 3);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 5);
+        checkNodesNumber(c2, 3);
+
+        stopGrid(2);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 4);
+        checkNodesNumber(c2, 3);
+
+        for (int i = 0; i < 3; i++)
+            stopGrid(i + 6);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 4);
+
+        c2 = startGridsMultiThreaded(6, 2);
+
+        checkNodesNumber(c0, 1);
+        checkNodesNumber(c1, 4);
+        checkNodesNumber(c2, 2);
+
+        evts.clear();
+    }
+
+    /**
+     * @param node Node.
+     * @param expNodes Expected node in cluster.
+     * @throws Exception If failed.
+     */
+    private void checkNodesNumber(final Ignite node, final int expNodes) 
throws Exception {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return node.cluster().nodes().size() == expNodes;
+            }
+        }, 5000);
+
+        assertEquals(expNodes, node.cluster().nodes().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop1() throws Exception {
+        ackEveryEventSystemProperty();
+
+        startGridsMultiThreaded(5, false);
+
+        waitForTopology(5);
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(ignite(0));
+
+        stopGrid(0);
+
+        waitForTopology(4);
+
+        for (Ignite node : G.allGrids())
+            node.compute().broadcast(new DummyCallable(null));
+
+        startGrid(0);
+
+        waitForTopology(5);
+
+        awaitPartitionMapExchange();
+
+        waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes())));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop3() throws Exception {
+        startGrids(4);
+
+        awaitPartitionMapExchange();
+
+        stopGrid(0);
+
+        startGrid(5);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop4() throws Exception {
+        startGrids(6);
+
+        awaitPartitionMapExchange();
+
+        stopGrid(2);
+
+        if (ThreadLocalRandom.current().nextBoolean())
+            awaitPartitionMapExchange();
+
+        stopGrid(1);
+
+        if (ThreadLocalRandom.current().nextBoolean())
+            awaitPartitionMapExchange();
+
+        stopGrid(0);
+
+        if (ThreadLocalRandom.current().nextBoolean())
+            awaitPartitionMapExchange();
+
+        startGrid(7);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStop2() throws Exception {
+        startGridsMultiThreaded(10, false);
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                stopGrid(idx);
+            }
+        }, 3, "stop-node-thread");
+
+        waitForTopology(7);
+
+        startGridsMultiThreaded(0, 3);
+
+        waitForTopology(10);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartStopWithClients() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        clientMode(true);
+
+        final int THREADS = 30;
+
+        for (int i = 0; i < 5; i++) {
+            info("Iteration: " + i);
+
+            startGridsMultiThreaded(SRVS, THREADS);
+
+            waitForTopology(SRVS + THREADS);
+
+            GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer idx) {
+                    stopGrid(idx + SRVS);
+                }
+            }, THREADS, "stop-node");
+
+            waitForTopology(SRVS);
+
+            checkEventsConsistency();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyChangeMultithreaded() throws Exception {
+        topologyChangeWithRestarts(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyChangeMultithreaded_RestartZk() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8184";);
+
+        try {
+            topologyChangeWithRestarts(true, false);
+        }
+        finally {
+            zkCluster.stop();
+
+            zkCluster = null;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTopologyChangeMultithreaded_RestartZk_CloseClients() 
throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8184";);
+
+        try {
+            topologyChangeWithRestarts(true, true);
+        }
+        finally {
+            zkCluster.stop();
+
+            zkCluster = null;
+        }
+    }
+
+    /**
+     * @param restartZk If {@code true} in background restarts on of ZK 
servers.
+     * @param closeClientSock If {@code true} in background closes zk clients' 
sockets.
+     * @throws Exception If failed.
+     */
+    private void topologyChangeWithRestarts(boolean restartZk, boolean 
closeClientSock) throws Exception {
+        sesTimeout = 30_000;
+
+        if (closeClientSock)
+            testSockNio = true;
+
+        long stopTime = System.currentTimeMillis() + 60_000;
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut1 = null;
+
+        IgniteInternalFuture<?> fut2 = null;
+
+        try {
+            fut1 = restartZk ? startRestartZkServers(stopTime, stop) : null;
+            fut2 = closeClientSock ? startCloseZkClientSocket(stopTime, stop) 
: null;
+
+            int INIT_NODES = 10;
+
+            startGridsMultiThreaded(INIT_NODES);
+
+            final int MAX_NODES = 20;
+
+            final List<Integer> startedNodes = new ArrayList<>();
+
+            for (int i = 0; i < INIT_NODES; i++)
+                startedNodes.add(i);
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            final AtomicInteger startIdx = new AtomicInteger(INIT_NODES);
+
+            while (System.currentTimeMillis() < stopTime) {
+                if (startedNodes.size() >= MAX_NODES) {
+                    int stopNodes = rnd.nextInt(5) + 1;
+
+                    log.info("Next, stop nodes: " + stopNodes);
+
+                    final List<Integer> idxs = new ArrayList<>();
+
+                    while (idxs.size() < stopNodes) {
+                        Integer stopIdx = rnd.nextInt(startedNodes.size());
+
+                        if (!idxs.contains(stopIdx))
+                            idxs.add(startedNodes.get(stopIdx));
+                    }
+
+                    GridTestUtils.runMultiThreaded(new 
IgniteInClosure<Integer>() {
+                        @Override public void apply(Integer threadIdx) {
+                            int stopNodeIdx = idxs.get(threadIdx);
+
+                            info("Stop node: " + stopNodeIdx);
+
+                            stopGrid(stopNodeIdx);
+                        }
+                    }, stopNodes, "stop-node");
+
+                    startedNodes.removeAll(idxs);
+                }
+                else {
+                    int startNodes = rnd.nextInt(5) + 1;
+
+                    log.info("Next, start nodes: " + startNodes);
+
+                    GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            int idx = startIdx.incrementAndGet();
+
+                            log.info("Start node: " + idx);
+
+                            startGrid(idx);
+
+                            synchronized (startedNodes) {
+                                startedNodes.add(idx);
+                            }
+
+                            return null;
+                        }
+                    }, startNodes, "start-node");
+                }
+
+                U.sleep(rnd.nextInt(100) + 1);
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        if (fut1 != null)
+            fut1.get();
+
+        if (fut2 != null)
+            fut2.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomTopologyChanges() throws Exception {
+        randomTopologyChanges(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkZkNodesCleanup() throws Exception {
+        final ZookeeperClient zkClient = new 
ZookeeperClient(getTestResources().getLogger(),
+            zkCluster.getConnectString(),
+            30_000,
+            null);
+
+        final String basePath = IGNITE_ZK_ROOT + "/";
+
+        final String aliveDir = basePath + ZkIgnitePaths.ALIVE_NODES_DIR + "/";
+
+        try {
+            List<String> znodes = listSubTree(zkClient.zk(), IGNITE_ZK_ROOT);
+
+            boolean foundAlive = false;
+
+            for (String znode : znodes) {
+                if (znode.startsWith(aliveDir)) {
+                    foundAlive = true;
+
+                    break;
+                }
+            }
+
+            assertTrue(foundAlive); // Sanity check to make sure we check 
correct directory.
+
+            assertTrue("Failed to wait for unused znodes cleanup", 
GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    try {
+                        List<String> znodes = listSubTree(zkClient.zk(), 
IGNITE_ZK_ROOT);
+
+                        for (String znode : znodes) {
+                            if (znode.startsWith(aliveDir) || znode.length() < 
basePath.length())
+                                continue;
+
+                            znode = znode.substring(basePath.length());
+
+                            if (!znode.contains("/")) // Ignore roots.
+                                continue;
+
+                            // TODO ZK: 
https://issues.apache.org/jira/browse/IGNITE-8193
+                            if (znode.startsWith("jd/"))
+                                continue;
+
+                            log.info("Found unexpected znode: " + znode);
+
+                            return false;
+                        }
+
+                        return true;
+                    }
+                    catch (Exception e) {
+                        error("Unexpected error: " + e, e);
+
+                        fail("Unexpected error: " + e);
+                    }
+
+                    return false;
+                }
+            }, 10_000));
+        }
+        finally {
+            zkClient.close();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomTopologyChanges_RestartZk() throws Exception {
+        randomTopologyChanges(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomTopologyChanges_CloseClients() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8182";);
+
+        randomTopologyChanges(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployService1() throws Exception {
+        startGridsMultiThreaded(3);
+
+        grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new 
GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployService2() throws Exception {
+        clientMode(false);
+
+        startGrid(0);
+
+        clientMode(true);
+
+        startGrid(1);
+
+        grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new 
GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeployService3() throws Exception {
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                clientModeThreadLocal(true);
+
+                startGrid(0);
+
+                return null;
+            }
+        }, "start-node");
+
+        clientModeThreadLocal(false);
+
+        startGrid(1);
+
+        fut.get();
+
+        grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new 
GridCacheAbstractFullApiSelfTest.DummyServiceImpl());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeUserAttribute1() throws Exception {
+        initLargeAttribute();
+
+        startGrid(0);
+
+        checkZkNodesCleanup();
+
+        userAttrs = null;
+
+        startGrid(1);
+
+        waitForEventsAcks(ignite(0));
+
+        waitForTopology(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeUserAttribute2() throws Exception {
+        startGrid(0);
+
+        initLargeAttribute();
+
+        startGrid(1);
+
+        waitForEventsAcks(ignite(0));
+
+        checkZkNodesCleanup();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeUserAttribute3() throws Exception {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        long stopTime = System.currentTimeMillis() + 60_000;
+
+        int nodes = 0;
+
+        for (int i = 0; i < 25; i++) {
+            info("Iteration: " + i);
+
+            if (rnd.nextBoolean())
+                initLargeAttribute();
+            else
+                userAttrs = null;
+
+            clientMode(i > 5);
+
+            startGrid(i);
+
+            nodes++;
+
+            if (System.currentTimeMillis() >= stopTime)
+                break;
+        }
+
+        waitForTopology(nodes);
+    }
+
+    /**
+     *
+     */
+    private void initLargeAttribute() {
+        userAttrs = new HashMap<>();
+
+        int[] attr = new int[1024 * 1024 + 
ThreadLocalRandom.current().nextInt(1024)];
+
+        for (int i = 0; i < attr.length; i++)
+            attr[i] = i;
+
+        userAttrs.put("testAttr", attr);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLargeCustomEvent() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        // Send large message, single node in topology.
+        IgniteCache<Object, Object> cache = 
srv0.createCache(largeCacheConfiguration("c1"));
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        assertEquals(1, cache.get(1));
+
+        waitForEventsAcks(ignite(0));
+
+        startGridsMultiThreaded(1, 3);
+
+        srv0.destroyCache("c1");
+
+        // Send large message, multiple nodes in topology.
+        cache = srv0.createCache(largeCacheConfiguration("c1"));
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, i);
+
+        waitForTopology(4);
+
+        ignite(3).createCache(largeCacheConfiguration("c2"));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectSessionExpire1_1() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8131";);
+
+        clientReconnectSessionExpire(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectSessionExpire1_2() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8131";);
+
+        clientReconnectSessionExpire(true);
+    }
+
+    /**
+     * @param closeSock Test mode flag.
+     * @throws Exception If failed.
+     */
+    private void clientReconnectSessionExpire(boolean closeSock) throws 
Exception {
+        startGrid(0);
+
+        sesTimeout = 2000;
+        clientMode(true);
+        testSockNio = true;
+
+        Ignite client = startGrid(1);
+
+        client.cache(DEFAULT_CACHE_NAME).put(1, 1);
+
+        reconnectClientNodes(log, Collections.singletonList(client), 
closeSock);
+
+        assertEquals(1, client.cache(DEFAULT_CACHE_NAME).get(1));
+
+        client.compute().broadcast(new DummyCallable(null));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testForceClientReconnect() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        clientMode(true);
+
+        startGrid(SRVS);
+
+        reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(SRVS));
+
+                spi.clientReconnect();
+
+                return null;
+            }
+        });
+
+        waitForTopology(SRVS + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testForcibleClientFail() throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        clientMode(true);
+
+        startGrid(SRVS);
+
+        reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new 
Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(0));
+
+                spi.failNode(ignite(SRVS).cluster().localNode().id(), "Test 
forcible node fail");
+
+                return null;
+            }
+        });
+
+        waitForTopology(SRVS + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDuplicatedNodeId() throws Exception {
+        UUID nodeId0 = nodeId = UUID.randomUUID();
+
+        startGrid(0);
+
+        int failingNodeIdx = 100;
+
+        for (int i = 0; i < 5; i++) {
+            final int idx = failingNodeIdx++;
+
+            nodeId = nodeId0;
+
+            info("Start node with duplicated ID [iter=" + i + ", nodeId=" + 
nodeId + ']');
+
+            Throwable err = GridTestUtils.assertThrows(log, new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    startGrid(idx);
+
+                    return null;
+                }
+            }, IgniteCheckedException.class, null);
+
+            IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class);
+
+            assertNotNull(spiErr);
+            assertTrue(spiErr.getMessage().contains("Node with the same ID 
already exists"));
+
+            nodeId = null;
+
+            info("Start node with unique ID [iter=" + i + ']');
+
+            Ignite ignite = startGrid(idx);
+
+            nodeId0 = ignite.cluster().localNode().id();
+
+            waitForTopology(i + 2);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPing() throws Exception {
+        sesTimeout = 5000;
+
+        startGrids(3);
+
+        final ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(1));
+
+        final UUID nodeId = ignite(2).cluster().localNode().id();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new 
Runnable() {
+            @Override public void run() {
+                assertTrue(spi.pingNode(nodeId));
+            }
+        }, 32, "ping");
+
+        fut.get();
+
+        fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                spi.pingNode(nodeId);
+            }
+        }, 32, "ping");
+
+        U.sleep(100);
+
+        stopGrid(2);
+
+        fut.get();
+
+        fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                assertFalse(spi.pingNode(nodeId));
+            }
+        }, 32, "ping");
+
+        fut.get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithPersistence1() throws Exception {
+        startWithPersistence(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithPersistence2() throws Exception {
+        startWithPersistence(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationFailureResolve_1() throws Exception {
+        communicationFailureResolve_Simple(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_2() throws Exception {
+        communicationFailureResolve_Simple(10);
+    }
+
+    /**
+     * @param nodes Nodes number.
+     * @throws Exception If failed.
+     */
+    private void communicationFailureResolve_Simple(int nodes) throws 
Exception {
+        assert nodes > 1;
+
+        sesTimeout = 2000;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
+
+        startGridsMultiThreaded(nodes);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 3; i++) {
+            info("Iteration: " + i);
+
+            int idx1 = rnd.nextInt(nodes);
+
+            int idx2;
+
+            do {
+                idx2 = rnd.nextInt(nodes);
+            }
+            while (idx1 == idx2);
+
+            ZookeeperDiscoverySpi spi = spi(ignite(idx1));
+
+            
spi.resolveCommunicationFailure(ignite(idx2).cluster().localNode(), new 
Exception("test"));
+
+            checkInternalStructuresCleanup();
+        }
+    }
+
+    /**
+     * Tests case when one node fails before sending communication status.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_3() throws Exception {
+        sesTimeout = 2000;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
+
+        startGridsMultiThreaded(3);
+
+        sesTimeout = 10_000;
+
+        testSockNio = true;
+        sesTimeout = 5000;
+
+        startGrid(3);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() {
+                ZookeeperDiscoverySpi spi = spi(ignite(0));
+
+                
spi.resolveCommunicationFailure(ignite(1).cluster().localNode(), new 
Exception("test"));
+
+                return null;
+            }
+        });
+
+        U.sleep(1000);
+
+        ZkTestClientCnxnSocketNIO nio = 
ZkTestClientCnxnSocketNIO.forNode(ignite(3));
+
+        nio.closeSocket(true);
+
+        try {
+            stopGrid(3);
+
+            fut.get();
+        }
+        finally {
+            nio.allowConnect();
+        }
+
+        waitForTopology(3);
+    }
+
+    /**
+     * Tests case when Coordinator fails while resolve process is in progress.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_4() throws Exception {
+        testCommSpi = true;
+
+        sesTimeout = 2000;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
+
+        startGrid(0);
+
+        startGridsMultiThreaded(1, 3);
+
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.testSpi(ignite(3));
+
+        commSpi.pingLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() {
+                ZookeeperDiscoverySpi spi = spi(ignite(1));
+
+                
spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new 
Exception("test"));
+
+                return null;
+            }
+        });
+
+        U.sleep(1000);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(0);
+
+        commSpi.pingLatch.countDown();
+
+        fut.get();
+
+        waitForTopology(3);
+    }
+
+    /**
+     * Tests that nodes join is delayed while resolve is in progress.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_5() throws Exception {
+        testCommSpi = true;
+
+        sesTimeout = 2000;
+        commFailureRslvr = NoOpCommunicationFailureResolver.FACTORY;
+
+        startGrid(0);
+
+        startGridsMultiThreaded(1, 3);
+
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.testSpi(ignite(3));
+
+        commSpi.pingStartLatch = new CountDownLatch(1);
+        commSpi.pingLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() {
+                ZookeeperDiscoverySpi spi = spi(ignite(1));
+
+                
spi.resolveCommunicationFailure(ignite(2).cluster().localNode(), new 
Exception("test"));
+
+                return null;
+            }
+        });
+
+        assertTrue(commSpi.pingStartLatch.await(10, SECONDS));
+
+        try {
+            assertFalse(fut.isDone());
+
+            final AtomicInteger nodeIdx = new AtomicInteger(3);
+
+            IgniteInternalFuture<?> startFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    startGrid(nodeIdx.incrementAndGet());
+
+                    return null;
+                }
+            }, 3, "start-node");
+
+            U.sleep(1000);
+
+            assertFalse(startFut.isDone());
+
+            assertEquals(4, ignite(0).cluster().nodes().size());
+
+            commSpi.pingLatch.countDown();
+
+            startFut.get();
+            fut.get();
+
+            waitForTopology(7);
+        }
+        finally {
+            commSpi.pingLatch.countDown();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillNode_1() throws Exception {
+        communicationFailureResolve_KillNodes(2, Collections.singleton(2L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillNode_2() throws Exception {
+        communicationFailureResolve_KillNodes(3, Collections.singleton(2L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillNode_3() throws Exception {
+        communicationFailureResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillCoordinator_1() throws 
Exception {
+        communicationFailureResolve_KillNodes(2, Collections.singleton(1L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillCoordinator_2() throws 
Exception {
+        communicationFailureResolve_KillNodes(3, Collections.singleton(1L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillCoordinator_3() throws 
Exception {
+        communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationErrorResolve_KillCoordinator_4() throws 
Exception {
+        communicationFailureResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L));
+    }
+
+    /**
+     * @param startNodes Number of nodes to start.
+     * @param killNodes Nodes to kill by resolve process.
+     * @throws Exception If failed.
+     */
+    private void communicationFailureResolve_KillNodes(int startNodes, 
Collection<Long> killNodes) throws Exception {
+        testCommSpi = true;
+
+        commFailureRslvr = 
TestNodeKillCommunicationFailureResolver.factory(killNodes);
+
+        startGrids(startNodes);
+
+        ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.testSpi(ignite(0));
+
+        commSpi.checkRes = new BitSet(startNodes);
+
+        ZookeeperDiscoverySpi spi = null;
+        UUID killNodeId = null;
+
+        for (Ignite node : G.allGrids()) {
+            ZookeeperDiscoverySpi spi0 = spi(node);
+
+            if (!killNodes.contains(node.cluster().localNode().order()))
+                spi = spi0;
+            else
+                killNodeId = node.cluster().localNode().id();
+        }
+
+        assertNotNull(spi);
+        assertNotNull(killNodeId);
+
+        try {
+            spi.resolveCommunicationFailure(spi.getNode(killNodeId), new 
Exception("test"));
+
+            fail("Exception is not thrown");
+        }
+        catch (IgniteSpiException e) {
+            assertTrue("Unexpected exception: " + e, e.getCause() instanceof 
ClusterTopologyCheckedException);
+        }
+
+        int expNodes = startNodes - killNodes.size();
+
+        waitForTopology(expNodes);
+
+        for (Ignite node : G.allGrids())
+            
assertFalse(killNodes.contains(node.cluster().localNode().order()));
+
+        startGrid(startNodes);
+
+        waitForTopology(expNodes + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationFailureResolve_KillCoordinator_5() throws 
Exception {
+        sesTimeout = 2000;
+
+        testCommSpi = true;
+        commFailureRslvr = KillCoordinatorCommunicationFailureResolver.FACTORY;
+
+        startGrids(10);
+
+        int crd = 0;
+
+        int nodeIdx = 10;
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            for (Ignite node : G.allGrids())
+                ZkTestCommunicationSpi.testSpi(node).initCheckResult(10);
+
+            UUID crdId = ignite(crd).cluster().localNode().id();
+
+            ZookeeperDiscoverySpi spi = spi(ignite(crd + 1));
+
+            try {
+                spi.resolveCommunicationFailure(spi.getNode(crdId), new 
Exception("test"));
+
+                fail("Exception is not thrown");
+            }
+            catch (IgniteSpiException e) {
+                assertTrue("Unexpected exception: " + e, e.getCause() 
instanceof ClusterTopologyCheckedException);
+            }
+
+            waitForTopology(9);
+
+            startGrid(nodeIdx++);
+
+            waitForTopology(10);
+
+            crd++;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationFailureResolve_KillRandom() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-8179";);
+
+        sesTimeout = 2000;
+
+        testCommSpi = true;
+        commFailureRslvr = KillRandomCommunicationFailureResolver.FACTORY;
+
+        startGridsMultiThreaded(10);
+
+        clientMode(true);
+
+        startGridsMultiThreaded(10, 5);
+
+        int nodeIdx = 15;
+
+        for (int i = 0; i < 10; i++) {
+            info("Iteration: " + i);
+
+            ZookeeperDiscoverySpi spi = null;
+
+            for (Ignite node : G.allGrids()) {
+                ZkTestCommunicationSpi.testSpi(node).initCheckResult(100);
+
+                spi = spi(node);
+            }
+
+            assert spi != null;
+
+            try {
+                
spi.resolveCommunicationFailure(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
+            }
+            catch (IgniteSpiException ignore) {
+                // No-op.
+            }
+
+            clientMode(ThreadLocalRandom.current().nextBoolean());
+
+            startGrid(nodeIdx++);
+
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationFailureResolver1() throws Exception {
+        testCommSpi = true;
+        sesTimeout = 5000;
+
+        startGrids(3);
+
+        ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(3, 2);
+
+        UUID killedId = nodeId(2);
+
+        assertNotNull(ignite(0).cluster().node(killedId));
+
+        ZookeeperDiscoverySpi spi = spi(ignite(0));
+
+        
spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()),
 new Exception("test"));
+
+        waitForTopology(2);
+
+        assertNull(ignite(0).cluster().node(killedId));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationFailureResolver2() throws Exception {
+        testCommSpi = true;
+        sesTimeout = 5000;
+
+        startGrids(3);
+
+        clientMode(true);
+
+        startGridsMultiThreaded(3, 2);
+
+        ZkTestCommunicationSpi.testSpi(ignite(0)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(1)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.testSpi(ignite(2)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.testSpi(ignite(3)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.testSpi(ignite(4)).initCheckResult(5, 2, 3, 4);
+
+        ZookeeperDiscoverySpi spi = spi(ignite(0));
+
+        
spi.resolveCommunicationFailure(spi.getNode(ignite(1).cluster().localNode().id()),
 new Exception("test"));
+
+        waitForTopology(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationFailureResolver3() throws Exception {
+        defaultCommunicationFailureResolver_BreakCommunication(3, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationFailureResolver4() throws Exception {
+        defaultCommunicationFailureResolver_BreakCommunication(3, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationFailureResolver5() throws Exception {
+        defaultCommunicationFailureResolver_BreakCommunication(10, 1, 3, 6);
+    }
+
+    /**
+     * @param startNodes Initial nodes number.
+     * @param breakNodes Node indices where communication server is closed.
+     * @throws Exception If failed.
+     */
+    private void defaultCommunicationFailureResolver_BreakCommunication(int 
startNodes, final int...breakNodes) throws Exception {
+        sesTimeout = 5000;
+
+        startGridsMultiThreaded(startNodes);
+
+        final CyclicBarrier b = new CyclicBarrier(breakNodes.length);
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer threadIdx) {
+                try {
+                    b.await();
+
+                    int nodeIdx = breakNodes[threadIdx];
+
+                    info("Close communication: " + nodeIdx);
+
+                    
((TcpCommunicationSpi)ignite(nodeIdx).configuration().getCommunicationSpi()).simulateNodeFailure();
+                }
+                catch (Exception e) {
+                    fail("Unexpected error: " + e);
+                }
+            }
+        }, breakNodes.length, "break-communication");
+
+        waitForTopology(startNodes - breakNodes.length);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationFailureResolve_CachesInfo1() throws Exception 
{
+        testCommSpi = true;
+        sesTimeout = 5000;
+
+        final CacheInfoCommunicationFailureResolver rslvr = new 
CacheInfoCommunicationFailureResolver();
+
+        commFailureRslvr = new 
IgniteOutClosure<CommunicationFailureResolver>() {
+            @Override public CommunicationFailureResolver apply() {
+                return rslvr;
+            }
+        };
+
+        startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        Map<String, T3<Integer, Integer, Integer>> expCaches = new HashMap<>();
+
+        expCaches.put(DEFAULT_CACHE_NAME, new 
T3<>(RendezvousAffinityFunction.DFLT_PARTITION_COUNT, 0, 1));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        List<CacheConfiguration> caches = new ArrayList<>();
+
+        CacheConfiguration c1 = new CacheConfiguration("c1");
+        c1.setBackups(1);
+        c1.setAffinity(new RendezvousAffinityFunction(false, 64));
+        caches.add(c1);
+
+        CacheConfiguration c2 = new CacheConfiguration("c2");
+        c2.setBackups(2);
+        c2.setAffinity(new RendezvousAffinityFunction(false, 128));
+        caches.add(c2);
+
+        CacheConfiguration c3 = new CacheConfiguration("c3");
+        c3.setCacheMode(CacheMode.REPLICATED);
+        c3.setAffinity(new RendezvousAffinityFunction(false, 256));
+        caches.add(c3);
+
+        ignite(0).createCaches(caches);
+
+        expCaches.put("c1", new T3<>(64, 1, 2));
+        expCaches.put("c2", new T3<>(128, 2, 2));
+        expCaches.put("c3", new T3<>(256, 1, 2));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        startGrid(2);
+        startGrid(3);
+
+        awaitPartitionMapExchange();
+
+        expCaches.put("c2", new T3<>(128, 2, 3));
+        expCaches.put("c3", new T3<>(256, 1, 4));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        CacheConfiguration c4 = new CacheConfiguration("c4");
+        c4.setCacheMode(CacheMode.PARTITIONED);
+        c4.setBackups(0);
+        c4.setAffinity(new RendezvousAffinityFunction(false, 256));
+        c4.setNodeFilter(new 
TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0), 
getTestIgniteInstanceName(1)));
+
+        ignite(2).createCache(c4);
+
+        expCaches.put("c4", new T3<>(256, 0, 1));
+
+        checkResolverCachesInfo(ignite(0), expCaches);
+
+        stopGrid(0); // Stop current coordinator, check new coordinator will 
initialize required caches information.
+
+        awaitPartitionMapExchange();
+
+        expCaches.put("c3", new T3<>(256, 1, 3));
+
+        checkResolverCachesInfo(ignite(1), expCaches);
+
+        startGrid(0);
+
+        expCaches.put("c3", new T3<>(256, 1, 4));
+
+        checkResolverCachesInfo(ignite(1), expCaches);
+
+        stopGrid(1);
+
+        expCaches.put("c3", new T3<>(256, 1, 3));
+
+        checkResolverCachesInfo(ignite(3), expCaches);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCommunicationFailureResolve_CachesInfo2() throws Exception 
{
+        testCommSpi = true;
+        sesTimeout = 5000;
+
+        final CacheInfoCommunicationFailureResolver rslvr = new 
CacheInfoCommunicationFailureResolver();
+
+        commFailureRslvr = new 
IgniteOutClosure<CommunicationFailureResolver>() {
+            @Override public CommunicationFailureResolver apply() {
+                return rslvr;
+            }
+        };
+
+        Ignite srv0 = startGrid(0);
+
+        CacheConfiguration ccfg = new CacheConfiguration("c1

<TRUNCATED>

Reply via email to