http://git-wip-us.apache.org/repos/asf/ignite/blob/26ffa0db/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java deleted file mode 100644 index 7c2f642..0000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ /dev/null @@ -1,4247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.zk.internal; - -import java.io.File; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.curator.test.InstanceSpec; -import org.apache.curator.test.TestingCluster; -import org.apache.curator.test.TestingZooKeeperServer; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.CommunicationProblemContext; -import org.apache.ignite.configuration.CommunicationProblemResolver; -import org.apache.ignite.configuration.DataRegionConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.DiscoverySpiTestListener; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.discovery.CustomEventListener; -import org.apache.ignite.internal.managers.discovery.DiscoCache; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; -import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; -import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; -import org.apache.ignite.internal.processors.security.SecurityContext; -import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; -import org.apache.ignite.internal.util.lang.GridAbsPredicate; -import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.internal.util.typedef.T3; -import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteCallable; -import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.logger.java.JavaLogger; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; -import org.apache.ignite.plugin.security.SecurityCredentials; -import org.apache.ignite.plugin.security.SecurityPermission; -import org.apache.ignite.plugin.security.SecuritySubject; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; -import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; -import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZkTestClientCnxnSocketNIO; -import org.apache.zookeeper.ZooKeeper; -import org.jetbrains.annotations.Nullable; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; -import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; -import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; -import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; -import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_CREDENTIALS; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_SUBJECT_V2; -import static org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD; -import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; - -/** - * TODO ZK: test with max client connections limit error. - */ -@SuppressWarnings("deprecation") -public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { - /** */ - private static final String IGNITE_ZK_ROOT = ZookeeperDiscoverySpi.DFLT_ROOT_PATH; - - /** */ - private static final int ZK_SRVS = 3; - - /** */ - private static TestingCluster zkCluster; - - /** */ - private static final boolean USE_TEST_CLUSTER = true; - - /** */ - private boolean client; - - /** */ - private static ThreadLocal<Boolean> clientThreadLoc = new ThreadLocal<>(); - - /** */ - private static ConcurrentHashMap<UUID, Map<Long, DiscoveryEvent>> evts = new ConcurrentHashMap<>(); - - /** */ - private static volatile boolean err; - - /** */ - private boolean testSockNio; - - /** */ - private boolean testCommSpi; - - /** */ - private long sesTimeout; - - /** */ - private long joinTimeout; - - /** */ - private boolean clientReconnectDisabled; - - /** */ - private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new ConcurrentHashMap<>(); - - /** */ - private Map<String, Object> userAttrs; - - /** */ - private boolean dfltConsistenId; - - /** */ - private UUID nodeId; - - /** */ - private boolean persistence; - - /** */ - private IgniteOutClosure<CommunicationProblemResolver> commProblemRslvr; - - /** */ - private IgniteOutClosure<DiscoverySpiNodeAuthenticator> auth; - - /** */ - private String zkRootPath; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(final String igniteInstanceName) throws Exception { - if (testSockNio) - System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, ZkTestClientCnxnSocketNIO.class.getName()); - - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - if (nodeId != null) - cfg.setNodeId(nodeId); - - if (!dfltConsistenId) - cfg.setConsistentId(igniteInstanceName); - - ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi(); - - if (joinTimeout != 0) - zkSpi.setJoinTimeout(joinTimeout); - - zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000); - - zkSpi.setClientReconnectDisabled(clientReconnectDisabled); - - // Set authenticator for basic sanity tests. - if (auth != null) { - zkSpi.setAuthenticator(auth.apply()); - - zkSpi.setInternalListener(new IgniteDiscoverySpiInternalListener() { - @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) { - ZookeeperClusterNode locNode0 = (ZookeeperClusterNode)locNode; - - Map<String, Object> attrs = new HashMap<>(locNode0.getAttributes()); - - attrs.put(ATTR_SECURITY_CREDENTIALS, new SecurityCredentials(null, null, igniteInstanceName)); - - locNode0.setAttributes(attrs); - } - - @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) { - return false; - } - }); - } - - spis.put(igniteInstanceName, zkSpi); - - if (USE_TEST_CLUSTER) { - assert zkCluster != null; - - zkSpi.setZkConnectionString(zkCluster.getConnectString()); - - if (zkRootPath != null) - zkSpi.setZkRootPath(zkRootPath); - } - else - zkSpi.setZkConnectionString("localhost:2181"); - - cfg.setDiscoverySpi(zkSpi); - - CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); - - ccfg.setWriteSynchronizationMode(FULL_SYNC); - - cfg.setCacheConfiguration(ccfg); - - Boolean clientMode = clientThreadLoc.get(); - - if (clientMode != null) - cfg.setClientMode(clientMode); - else - cfg.setClientMode(client); - - if (userAttrs != null) - cfg.setUserAttributes(userAttrs); - - Map<IgnitePredicate<? extends Event>, int[]> lsnrs = new HashMap<>(); - - lsnrs.put(new IgnitePredicate<Event>() { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - @Override public boolean apply(Event evt) { - try { - DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; - - UUID locId = ((IgniteKernal)ignite).context().localNodeId(); - - Map<Long, DiscoveryEvent> nodeEvts = evts.get(locId); - - if (nodeEvts == null) { - Object old = evts.put(locId, nodeEvts = new TreeMap<>()); - - assertNull(old); - - synchronized (nodeEvts) { - DiscoveryLocalJoinData locJoin = ((IgniteKernal)ignite).context().discovery().localJoin(); - - nodeEvts.put(locJoin.event().topologyVersion(), locJoin.event()); - } - } - - synchronized (nodeEvts) { - DiscoveryEvent old = nodeEvts.put(discoveryEvt.topologyVersion(), discoveryEvt); - - assertNull(old); - } - } - catch (Throwable e) { - error("Unexpected error [evt=" + evt + ", err=" + e + ']', e); - - err = true; - } - - return true; - } - }, new int[]{EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT}); - - cfg.setLocalEventListeners(lsnrs); - - if (persistence) { - DataStorageConfiguration memCfg = new DataStorageConfiguration() - .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024). - setPersistenceEnabled(true)) - .setPageSize(1024) - .setWalMode(WALMode.LOG_ONLY); - - cfg.setDataStorageConfiguration(memCfg); - } - - if (testCommSpi) - cfg.setCommunicationSpi(new ZkTestCommunicationSpi()); - - if (commProblemRslvr != null) - cfg.setCommunicationProblemResolver(commProblemRslvr.apply()); - - return cfg; - } - - /** - * @param clientMode Client mode flag for started nodes. - */ - private void clientMode(boolean clientMode) { - client = clientMode; - } - - /** - * @param clientMode Client mode flag for nodes started from current thread. - */ - private void clientModeThreadLocal(boolean clientMode) { - clientThreadLoc.set(clientMode); - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - IgnitionEx.TEST_ZK = false; - } - - /** - * @param instances Number of instances. - * @return Cluster. - */ - private static TestingCluster createTestingCluster(int instances) { - String tmpDir = System.getProperty("java.io.tmpdir"); - - List<InstanceSpec> specs = new ArrayList<>(); - - for (int i = 0; i < instances; i++) { - File file = new File(tmpDir, "apacheIgniteTestZk-" + i); - - if (file.isDirectory()) - deleteRecursively0(file); - else { - if (!file.mkdirs()) - throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath()); - } - - - specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, 1000, -1)); - } - - return new TestingCluster(specs); - } - - /** - * @param file Directory to delete. - */ - private static void deleteRecursively0(File file) { - File[] files = file.listFiles(); - - if (files == null) - return; - - for (File f : files) { - if (f.isDirectory()) - deleteRecursively0(f); - else { - if (!f.delete()) - throw new IgniteException("Failed to delete file: " + f.getAbsolutePath()); - } - } - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopZkCluster(); - - super.afterTestsStopped(); - } - - /** - * - */ - private void stopZkCluster() { - if (zkCluster != null) { - try { - zkCluster.close(); - } - catch (Exception e) { - U.error(log, "Failed to stop Zookeeper client: " + e, e); - } - - zkCluster = null; - } - } - - /** - * - */ - private static void ackEveryEventSystemProperty() { - System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); - } - - /** - * - */ - private void clearAckEveryEventSystemProperty() { - System.setProperty(IGNITE_ZOOKEEPER_DISCOVERY_SPI_ACK_THRESHOLD, "1"); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - if (USE_TEST_CLUSTER && zkCluster == null) { - zkCluster = createTestingCluster(ZK_SRVS); - - zkCluster.start(); - } - - reset(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - clearAckEveryEventSystemProperty(); - - try { - assertFalse("Unexpected error, see log for details", err); - - checkEventsConsistency(); - - checkInternalStructuresCleanup(); - } - finally { - reset(); - - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - private void checkInternalStructuresCleanup() throws Exception { - for (Ignite node : G.allGrids()) { - final AtomicReference<?> res = GridTestUtils.getFieldValue(spi(node), "impl", "commErrProcFut"); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return res.get() == null; - } - }, 30_000); - - assertNull(res.get()); - } - } - - /** - * @throws Exception If failed. - */ - public void testZkRootNotExists() throws Exception { - zkRootPath = "/a/b/c"; - - for (int i = 0; i < 3; i++) { - reset(); - - startGridsMultiThreaded(5); - - waitForTopology(5); - - stopAllGrids(); - - checkEventsConsistency(); - } - } - - /** - * @throws Exception If failed. - */ - public void testMetadataUpdate() throws Exception { - startGrid(0); - - GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override public Void call() throws Exception { - ignite(0).configuration().getMarshaller().marshal(new C1()); - ignite(0).configuration().getMarshaller().marshal(new C2()); - - return null; - } - }, 64, "marshal"); - } - - /** - * @throws Exception If failed. - */ - public void testNodeAddresses() throws Exception { - startGridsMultiThreaded(3); - - clientMode(true); - - startGridsMultiThreaded(3, 3); - - waitForTopology(6); - - for (Ignite node : G.allGrids()) { - ClusterNode locNode0 = node.cluster().localNode(); - - assertTrue(locNode0.addresses().size() > 0); - assertTrue(locNode0.hostNames().size() > 0); - - for (ClusterNode node0 : node.cluster().nodes()) { - assertTrue(node0.addresses().size() > 0); - assertTrue(node0.hostNames().size() > 0); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testSetConsistentId() throws Exception { - startGridsMultiThreaded(3); - - clientMode(true); - - startGridsMultiThreaded(3, 3); - - waitForTopology(6); - - for (Ignite node : G.allGrids()) { - ClusterNode locNode0 = node.cluster().localNode(); - - assertEquals(locNode0.attribute(ATTR_IGNITE_INSTANCE_NAME), - locNode0.consistentId()); - - for (ClusterNode node0 : node.cluster().nodes()) { - assertEquals(node0.attribute(ATTR_IGNITE_INSTANCE_NAME), - node0.consistentId()); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testDefaultConsistentId() throws Exception { - dfltConsistenId = true; - - startGridsMultiThreaded(3); - - clientMode(true); - - startGridsMultiThreaded(3, 3); - - waitForTopology(6); - - for (Ignite node : G.allGrids()) { - ClusterNode locNode0 = node.cluster().localNode(); - - assertNotNull(locNode0.consistentId()); - - for (ClusterNode node0 : node.cluster().nodes()) - assertNotNull(node0.consistentId()); - } - } - - /** - * @throws Exception If failed. - */ - public void testClientNodesStatus() throws Exception { - startGrid(0); - - for (Ignite node : G.allGrids()) { - assertEquals(0, node.cluster().forClients().nodes().size()); - assertEquals(1, node.cluster().forServers().nodes().size()); - } - - clientMode(true); - - startGrid(1); - - for (Ignite node : G.allGrids()) { - assertEquals(1, node.cluster().forClients().nodes().size()); - assertEquals(1, node.cluster().forServers().nodes().size()); - } - - clientMode(false); - - startGrid(2); - - clientMode(true); - - startGrid(3); - - for (Ignite node : G.allGrids()) { - assertEquals(2, node.cluster().forClients().nodes().size()); - assertEquals(2, node.cluster().forServers().nodes().size()); - } - - stopGrid(1); - - waitForTopology(3); - - for (Ignite node : G.allGrids()) { - assertEquals(1, node.cluster().forClients().nodes().size()); - assertEquals(2, node.cluster().forServers().nodes().size()); - } - - stopGrid(2); - - waitForTopology(2); - - for (Ignite node : G.allGrids()) { - assertEquals(1, node.cluster().forClients().nodes().size()); - assertEquals(1, node.cluster().forServers().nodes().size()); - } - } - - /** - * @throws Exception If failed. - */ - public void testLocalAuthenticationFails() throws Exception { - auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(0)); - - Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(0); - - return null; - } - }, IgniteCheckedException.class, null); - - IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); - - assertNotNull(spiErr); - assertTrue(spiErr.getMessage().contains("Authentication failed for local node")); - - startGrid(1); - startGrid(2); - - checkTestSecuritySubject(2); - } - - /** - * @throws Exception If failed. - */ - public void testAuthentication() throws Exception { - auth = ZkTestNodeAuthenticator.factory(getTestIgniteInstanceName(1), - getTestIgniteInstanceName(5)); - - startGrid(0); - - checkTestSecuritySubject(1); - - { - clientMode(false); - checkStartFail(1); - - clientMode(true); - checkStartFail(1); - - clientMode(false); - } - - startGrid(2); - - checkTestSecuritySubject(2); - - stopGrid(2); - - checkTestSecuritySubject(1); - - startGrid(2); - - checkTestSecuritySubject(2); - - stopGrid(0); - - checkTestSecuritySubject(1); - - checkStartFail(1); - - clientMode(false); - - startGrid(3); - - clientMode(true); - - startGrid(4); - - clientMode(false); - - startGrid(0); - - checkTestSecuritySubject(4); - - checkStartFail(1); - checkStartFail(5); - - clientMode(true); - - checkStartFail(1); - checkStartFail(5); - } - - /** - * @param nodeIdx Node index. - */ - private void checkStartFail(final int nodeIdx) { - Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(nodeIdx); - - return null; - } - }, IgniteCheckedException.class, null); - - IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); - - assertNotNull(spiErr); - assertTrue(spiErr.getMessage().contains("Authentication failed")); - } - - /** - * @param expNodes Expected nodes number. - * @throws Exception If failed. - */ - private void checkTestSecuritySubject(int expNodes) throws Exception { - waitForTopology(expNodes); - - List<Ignite> nodes = G.allGrids(); - - JdkMarshaller marsh = new JdkMarshaller(); - - for (Ignite ignite : nodes) { - Collection<ClusterNode> nodes0 = ignite.cluster().nodes(); - - assertEquals(nodes.size(), nodes0.size()); - - for (ClusterNode node : nodes0) { - byte[] secSubj = node.attribute(ATTR_SECURITY_SUBJECT_V2); - - assertNotNull(secSubj); - - ZkTestNodeAuthenticator.TestSecurityContext secCtx = marsh.unmarshal(secSubj, null); - - assertEquals(node.attribute(ATTR_IGNITE_INSTANCE_NAME), secCtx.nodeName); - } - } - } - - /** - * @throws Exception If failed. - */ - public void testStopNode_1() throws Exception { - startGrids(5); - - waitForTopology(5); - - stopGrid(3); - - waitForTopology(4); - - startGrid(3); - - waitForTopology(5); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEventsSimple1_SingleNode() throws Exception { - ackEveryEventSystemProperty(); - - Ignite srv0 = startGrid(0); - - srv0.createCache(new CacheConfiguration<>("c1")); - - waitForEventsAcks(srv0); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEventsSimple1_5_Nodes() throws Exception { - ackEveryEventSystemProperty(); - - Ignite srv0 = startGrids(5); - - srv0.createCache(new CacheConfiguration<>("c1")); - - awaitPartitionMapExchange(); - - waitForEventsAcks(srv0); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEvents_FastStopProcess_1() throws Exception { - customEvents_FastStopProcess(1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testCustomEvents_FastStopProcess_2() throws Exception { - customEvents_FastStopProcess(5, 5); - } - - /** - * @param srvs Servers number. - * @param clients Clients number. - * @throws Exception If failed. - */ - private void customEvents_FastStopProcess(int srvs, int clients) throws Exception { - ackEveryEventSystemProperty(); - - Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs = - new ConcurrentHashMap<>(); - - Ignite crd = startGrid(0); - - UUID crdId = crd.cluster().localNode().id(); - - if (srvs > 1) - startGridsMultiThreaded(1, srvs - 1); - - if (clients > 0) { - client = true; - - startGridsMultiThreaded(srvs, clients); - } - - awaitPartitionMapExchange(); - - List<Ignite> nodes = G.allGrids(); - - assertEquals(srvs + clients, nodes.size()); - - for (Ignite node : nodes) - registerTestEventListeners(node, rcvdMsgs); - - int payload = 0; - - AffinityTopologyVersion topVer = ((IgniteKernal)crd).context().discovery().topologyVersionEx(); - - for (Ignite node : nodes) { - UUID sndId = node.cluster().localNode().id(); - - info("Send from node: " + sndId); - - GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery(); - - { - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>(); - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = Collections.emptyList(); - - TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(false, payload++); - - expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg)); - - discoveryMgr.sendCustomEvent(msg); - - doSleep(200); // Wait some time to check extra messages are not received. - - checkEvents(crd, rcvdMsgs, expCrdMsgs); - - for (Ignite node0 : nodes) { - if (node0 != crd) - checkEvents(node0, rcvdMsgs, expNodesMsgs); - } - - rcvdMsgs.clear(); - } - { - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expCrdMsgs = new ArrayList<>(); - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expNodesMsgs = new ArrayList<>(); - - TestFastStopProcessCustomMessage msg = new TestFastStopProcessCustomMessage(true, payload++); - - expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, sndId, msg)); - - discoveryMgr.sendCustomEvent(msg); - - TestFastStopProcessCustomMessageAck ackMsg = new TestFastStopProcessCustomMessageAck(msg.payload); - - expCrdMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg)); - expNodesMsgs.add(new T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>(topVer, crdId, ackMsg)); - - doSleep(200); // Wait some time to check extra messages are not received. - - checkEvents(crd, rcvdMsgs, expCrdMsgs); - - for (Ignite node0 : nodes) { - if (node0 != crd) - checkEvents(node0, rcvdMsgs, expNodesMsgs); - } - - rcvdMsgs.clear(); - } - - waitForEventsAcks(crd); - } - } - - /** - * @param node Node to check. - * @param rcvdMsgs Received messages. - * @param expMsgs Expected messages. - * @throws Exception If failed. - */ - private void checkEvents( - Ignite node, - final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs, - final List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> expMsgs) throws Exception { - final UUID nodeId = node.cluster().localNode().id(); - - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId); - - int size = msgs == null ? 0 : msgs.size(); - - return size >= expMsgs.size(); - } - }, 5000)); - - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> msgs = rcvdMsgs.get(nodeId); - - if (msgs == null) - msgs = Collections.emptyList(); - - assertEqualsCollections(expMsgs, msgs); - } - - /** - * @param node Node. - * @param rcvdMsgs Map to store received events. - */ - private void registerTestEventListeners(Ignite node, - final Map<UUID, List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>>> rcvdMsgs) { - GridDiscoveryManager discoveryMgr = ((IgniteKernal)node).context().discovery(); - - final UUID nodeId = node.cluster().localNode().id(); - - discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessage.class, - new CustomEventListener<TestFastStopProcessCustomMessage>() { - @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessage msg) { - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId); - - if (list == null) - rcvdMsgs.put(nodeId, list = new ArrayList<>()); - - list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg)); - } - } - ); - discoveryMgr.setCustomEventListener(TestFastStopProcessCustomMessageAck.class, - new CustomEventListener<TestFastStopProcessCustomMessageAck>() { - @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, TestFastStopProcessCustomMessageAck msg) { - List<T3<AffinityTopologyVersion, UUID, DiscoveryCustomMessage>> list = rcvdMsgs.get(nodeId); - - if (list == null) - rcvdMsgs.put(nodeId, list = new ArrayList<>()); - - list.add(new T3<>(topVer, snd.id(), (DiscoveryCustomMessage)msg)); - } - } - ); - } - - /** - * - */ - static class TestFastStopProcessCustomMessage implements DiscoveryCustomMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteUuid id = IgniteUuid.randomUuid();; - - /** */ - private final boolean createAck; - - /** */ - private final int payload; - - /** - * @param createAck Create ack message flag. - * @param payload Payload. - */ - TestFastStopProcessCustomMessage(boolean createAck, int payload) { - this.createAck = createAck; - this.payload = payload; - - } - - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return id; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return createAck ? new TestFastStopProcessCustomMessageAck(payload) : null; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean stopProcess() { - return true; - } - - /** {@inheritDoc} */ - @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, - AffinityTopologyVersion topVer, - DiscoCache discoCache) { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - TestFastStopProcessCustomMessage that = (TestFastStopProcessCustomMessage)o; - - return createAck == that.createAck && payload == that.payload; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return Objects.hash(createAck, payload); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TestFastStopProcessCustomMessage.class, this); - } - } - - /** - * - */ - static class TestFastStopProcessCustomMessageAck implements DiscoveryCustomMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteUuid id = IgniteUuid.randomUuid();; - - /** */ - private final int payload; - - /** - * @param payload Payload. - */ - TestFastStopProcessCustomMessageAck(int payload) { - this.payload = payload; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return id; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean stopProcess() { - return true; - } - - /** {@inheritDoc} */ - @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, - AffinityTopologyVersion topVer, - DiscoCache discoCache) { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - TestFastStopProcessCustomMessageAck that = (TestFastStopProcessCustomMessageAck)o; - return payload == that.payload; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return Objects.hash(payload); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TestFastStopProcessCustomMessageAck.class, this); - } - } - - /** - * @throws Exception If failed. - */ - public void testSegmentation1() throws Exception { - sesTimeout = 2000; - testSockNio = true; - - Ignite node0 = startGrid(0); - - final CountDownLatch l = new CountDownLatch(1); - - node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - l.countDown(); - - return false; - } - }, EventType.EVT_NODE_SEGMENTED); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(true); - - for (int i = 0; i < 10; i++) { - Thread.sleep(1_000); - - if (l.getCount() == 0) - break; - } - - info("Allow connect"); - - c0.allowConnect(); - - assertTrue(l.await(10, TimeUnit.SECONDS)); - } - - /** - * @throws Exception If failed. - */ - public void testSegmentation2() throws Exception { - sesTimeout = 2000; - - Ignite node0 = startGrid(0); - - final CountDownLatch l = new CountDownLatch(1); - - node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - l.countDown(); - - return false; - } - }, EventType.EVT_NODE_SEGMENTED); - - try { - zkCluster.close(); - - assertTrue(l.await(10, TimeUnit.SECONDS)); - } - finally { - zkCluster = createTestingCluster(ZK_SRVS); - - zkCluster.start(); - } - } - - /** - * @throws Exception If failed. - */ - public void testSegmentation3() throws Exception { - sesTimeout = 5000; - - Ignite node0 = startGrid(0); - - final CountDownLatch l = new CountDownLatch(1); - - node0.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - l.countDown(); - - return false; - } - }, EventType.EVT_NODE_SEGMENTED); - - List<TestingZooKeeperServer> srvs = zkCluster.getServers(); - - assertEquals(3, srvs.size()); - - try { - srvs.get(0).stop(); - srvs.get(1).stop(); - - assertTrue(l.await(20, TimeUnit.SECONDS)); - } - finally { - zkCluster.close(); - - zkCluster = createTestingCluster(ZK_SRVS); - - zkCluster.start(); - } - } - - /** - * @throws Exception If failed. - */ - public void testQuorumRestore() throws Exception { - sesTimeout = 15_000; - - startGrids(3); - - waitForTopology(3); - - List<TestingZooKeeperServer> srvs = zkCluster.getServers(); - - assertEquals(3, srvs.size()); - - try { - srvs.get(0).stop(); - srvs.get(1).stop(); - - U.sleep(2000); - - srvs.get(1).restart(); - - startGrid(4); - - waitForTopology(4); - } - finally { - zkCluster.close(); - - zkCluster = createTestingCluster(ZK_SRVS); - - zkCluster.start(); - } - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore1() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGrid(1); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore2() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGridsMultiThreaded(1, 5); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_NonCoordinator1() throws Exception { - connectionRestore_NonCoordinator(false); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_NonCoordinator2() throws Exception { - connectionRestore_NonCoordinator(true); - } - - /** - * @param failWhenDisconnected {@code True} if fail node while another node is disconnected. - * @throws Exception If failed. - */ - private void connectionRestore_NonCoordinator(boolean failWhenDisconnected) throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - Ignite node1 = startGrid(1); - - ZkTestClientCnxnSocketNIO c1 = ZkTestClientCnxnSocketNIO.forNode(node1); - - c1.closeSocket(true); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() { - try { - startGrid(2); - } - catch (Exception e) { - info("Start error: " + e); - } - - return null; - } - }, "start-node"); - - checkEvents(node0, joinEvent(3)); - - if (failWhenDisconnected) { - ZookeeperDiscoverySpi spi = spis.get(getTestIgniteInstanceName(2)); - - closeZkClient(spi); - - checkEvents(node0, failEvent(4)); - } - - c1.allowConnect(); - - checkEvents(ignite(1), joinEvent(3)); - - if (failWhenDisconnected) { - checkEvents(ignite(1), failEvent(4)); - - IgnitionEx.stop(getTestIgniteInstanceName(2), true, true); - } - - fut.get(); - - waitForTopology(failWhenDisconnected ? 2 : 3); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator1() throws Exception { - connectionRestore_Coordinator(1, 1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator1_1() throws Exception { - connectionRestore_Coordinator(1, 1, 1); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator2() throws Exception { - connectionRestore_Coordinator(1, 3, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator3() throws Exception { - connectionRestore_Coordinator(3, 3, 0); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore_Coordinator4() throws Exception { - connectionRestore_Coordinator(3, 3, 1); - } - - /** - * @param initNodes Number of initially started nodes. - * @param startNodes Number of nodes to start after coordinator loose connection. - * @param failCnt Number of nodes to stop after coordinator loose connection. - * @throws Exception If failed. - */ - private void connectionRestore_Coordinator(final int initNodes, int startNodes, int failCnt) throws Exception { - sesTimeout = 30_000; - testSockNio = true; - - Ignite node0 = startGrids(initNodes); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(true); - - final AtomicInteger nodeIdx = new AtomicInteger(initNodes); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() { - try { - startGrid(nodeIdx.getAndIncrement()); - } - catch (Exception e) { - error("Start failed: " + e); - } - - return null; - } - }, startNodes, "start-node"); - - int cnt = 0; - - DiscoveryEvent[] expEvts = new DiscoveryEvent[startNodes - failCnt]; - - int expEvtCnt = 0; - - sesTimeout = 1000; - - List<ZkTestClientCnxnSocketNIO> blockedC = new ArrayList<>(); - - final List<String> failedZkNodes = new ArrayList<>(failCnt); - - for (int i = initNodes; i < initNodes + startNodes; i++) { - final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i)); - - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - long internalOrder = GridTestUtils.getFieldValue(spi, "impl", "rtState", "internalOrder"); - - return internalOrder > 0; - } - }, 10_000)); - - if (cnt++ < failCnt) { - ZkTestClientCnxnSocketNIO c = ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i)); - - c.closeSocket(true); - - blockedC.add(c); - - failedZkNodes.add(aliveZkNodePath(spi)); - } - else { - expEvts[expEvtCnt] = joinEvent(initNodes + expEvtCnt + 1); - - expEvtCnt++; - } - } - - waitNoAliveZkNodes(log, zkCluster.getConnectString(), failedZkNodes, 10_000); - - c0.allowConnect(); - - for (ZkTestClientCnxnSocketNIO c : blockedC) - c.allowConnect(); - - if (expEvts.length > 0) { - for (int i = 0; i < initNodes; i++) - checkEvents(ignite(i), expEvts); - } - - fut.get(); - - waitForTopology(initNodes + startNodes - failCnt); - } - - /** - * @param node Node. - * @return Corresponding znode. - */ - private static String aliveZkNodePath(Ignite node) { - return aliveZkNodePath(node.configuration().getDiscoverySpi()); - } - - /** - * @param spi SPI. - * @return Znode related to given SPI. - */ - private static String aliveZkNodePath(DiscoverySpi spi) { - String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", "locNodeZkPath"); - - return path.substring(path.lastIndexOf('/') + 1); - } - - /** - * @param log Logger. - * @param connectString Zookeeper connect string. - * @param failedZkNodes Znodes which should be removed. - * @param timeout Timeout. - * @throws Exception If failed. - */ - private static void waitNoAliveZkNodes(final IgniteLogger log, - String connectString, - final List<String> failedZkNodes, - long timeout) - throws Exception - { - final ZookeeperClient zkClient = new ZookeeperClient(log, connectString, 10_000, null); - - try { - assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - List<String> c = zkClient.getChildren(IGNITE_ZK_ROOT + "/" + ZkIgnitePaths.ALIVE_NODES_DIR); - - for (String failedZkNode : failedZkNodes) { - if (c.contains(failedZkNode)) { - log.info("Alive node is not removed [node=" + failedZkNode + ", all=" + c + ']'); - - return false; - } - } - - return true; - } - catch (Exception e) { - e.printStackTrace(); - - fail(); - - return true; - } - } - }, timeout)); - } - finally { - zkClient.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentStartWithClient() throws Exception { - final int NODES = 20; - - for (int i = 0; i < 3; i++) { - info("Iteration: " + i); - - final int srvIdx = ThreadLocalRandom.current().nextInt(NODES); - - final AtomicInteger idx = new AtomicInteger(); - - GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override public Void call() throws Exception { - int threadIdx = idx.getAndIncrement(); - - clientModeThreadLocal(threadIdx == srvIdx || ThreadLocalRandom.current().nextBoolean()); - - startGrid(threadIdx); - - return null; - } - }, NODES, "start-node"); - - waitForTopology(NODES); - - stopAllGrids(); - - checkEventsConsistency(); - - evts.clear(); - } - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentStartStop1() throws Exception { - concurrentStartStop(1); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentStartStop2() throws Exception { - concurrentStartStop(5); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentStartStop2_EventsThrottle() throws Exception { - System.setProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS, "1"); - - try { - concurrentStartStop(5); - } - finally { - System.clearProperty(ZookeeperDiscoveryImpl.IGNITE_ZOOKEEPER_DISCOVERY_SPI_MAX_EVTS); - } - } - - /** - * @param initNodes Number of initially started nnodes. - * @throws Exception If failed. - */ - private void concurrentStartStop(final int initNodes) throws Exception { - startGrids(initNodes); - - final int NODES = 5; - - long topVer = initNodes; - - for (int i = 0; i < 10; i++) { - info("Iteration: " + i); - - DiscoveryEvent[] expEvts = new DiscoveryEvent[NODES]; - - startGridsMultiThreaded(initNodes, NODES); - - for (int j = 0; j < NODES; j++) - expEvts[j] = joinEvent(++topVer); - - checkEvents(ignite(0), expEvts); - - checkEventsConsistency(); - - final CyclicBarrier b = new CyclicBarrier(NODES); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer idx) { - try { - b.await(); - - stopGrid(initNodes + idx); - } - catch (Exception e) { - e.printStackTrace(); - - fail(); - } - } - }, NODES, "stop-node"); - - for (int j = 0; j < NODES; j++) - expEvts[j] = failEvent(++topVer); - - checkEventsConsistency(); - } - } - - /** - * @throws Exception If failed. - */ - public void testClusterRestart() throws Exception { - startGridsMultiThreaded(3, false); - - stopAllGrids(); - - evts.clear(); - - startGridsMultiThreaded(3, false); - - waitForTopology(3); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionRestore4() throws Exception { - testSockNio = true; - - Ignite node0 = startGrid(0); - - ZkTestClientCnxnSocketNIO c0 = ZkTestClientCnxnSocketNIO.forNode(node0); - - c0.closeSocket(false); - - startGrid(1); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_1_Node() throws Exception { - startGrid(0); - - waitForTopology(1); - - stopGrid(0); - } - - /** - * @throws Exception If failed. - */ - public void testRestarts_2_Nodes() throws Exception { - startGrid(0); - - for (int i = 0; i < 10; i++) { - info("Iteration: " + i); - - startGrid(1); - - waitForTopology(2); - - stopGrid(1); - } - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_2_Nodes_WithCache() throws Exception { - startGrids(2); - - for (Ignite node : G.allGrids()) { - IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME); - - assertNotNull(cache); - - for (int i = 0; i < 100; i++) { - cache.put(i, node.name()); - - assertEquals(node.name(), cache.get(i)); - } - } - - awaitPartitionMapExchange(); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop_2_Nodes() throws Exception { - ackEveryEventSystemProperty(); - - startGrid(0); - - waitForTopology(1); - - startGrid(1); - - waitForTopology(2); - - for (Ignite node : G.allGrids()) - node.compute().broadcast(new DummyCallable(null)); - - awaitPartitionMapExchange(); - - waitForEventsAcks(ignite(0)); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop1() throws Exception { - ackEveryEventSystemProperty(); - - startGridsMultiThreaded(5, false); - - waitForTopology(5); - - awaitPartitionMapExchange(); - - waitForEventsAcks(ignite(0)); - - stopGrid(0); - - waitForTopology(4); - - for (Ignite node : G.allGrids()) - node.compute().broadcast(new DummyCallable(null)); - - startGrid(0); - - waitForTopology(5); - - awaitPartitionMapExchange(); - - waitForEventsAcks(grid(CU.oldest(ignite(1).cluster().nodes()))); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop3() throws Exception { - startGrids(4); - - awaitPartitionMapExchange(); - - stopGrid(0); - - startGrid(5); - - awaitPartitionMapExchange(); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop4() throws Exception { - startGrids(6); - - awaitPartitionMapExchange(); - - stopGrid(2); - - if (ThreadLocalRandom.current().nextBoolean()) - awaitPartitionMapExchange(); - - stopGrid(1); - - if (ThreadLocalRandom.current().nextBoolean()) - awaitPartitionMapExchange(); - - stopGrid(0); - - if (ThreadLocalRandom.current().nextBoolean()) - awaitPartitionMapExchange(); - - startGrid(7); - - awaitPartitionMapExchange(); - } - - /** - * @throws Exception If failed. - */ - public void testStartStop2() throws Exception { - startGridsMultiThreaded(10, false); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer idx) { - stopGrid(idx); - } - }, 3, "stop-node-thread"); - - waitForTopology(7); - - startGridsMultiThreaded(0, 3); - - waitForTopology(10); - } - - /** - * @throws Exception If failed. - */ - public void testStartStopWithClients() throws Exception { - final int SRVS = 3; - - startGrids(SRVS); - - clientMode(true); - - final int THREADS = 30; - - for (int i = 0; i < 5; i++) { - info("Iteration: " + i); - - startGridsMultiThreaded(SRVS, THREADS); - - waitForTopology(SRVS + THREADS); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer idx) { - stopGrid(idx + SRVS); - } - }, THREADS, "stop-node"); - - waitForTopology(SRVS); - - checkEventsConsistency(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTopologyChangeMultithreaded() throws Exception { - topologyChangeWithRestarts(false, false); - } - - /** - * @throws Exception If failed. - */ - public void testTopologyChangeMultithreaded_RestartZk() throws Exception { - topologyChangeWithRestarts(true, false); - } - - /** - * @throws Exception If failed. - */ - public void testTopologyChangeMultithreaded_RestartZk_CloseClients() throws Exception { - topologyChangeWithRestarts(true, true); - } - - /** - * @param restartZk If {@code true} in background restarts on of ZK servers. - * @param closeClientSock If {@code true} in background closes zk clients' sockets. - * @throws Exception If failed. - */ - private void topologyChangeWithRestarts(boolean restartZk, boolean closeClientSock) throws Exception { - sesTimeout = 30_000; - - if (closeClientSock) - testSockNio = true; - - long stopTime = System.currentTimeMillis() + 60_000; - - AtomicBoolean stop = new AtomicBoolean(); - - IgniteInternalFuture<?> fut1 = restartZk ? startRestartZkServers(stopTime, stop) : null; - IgniteInternalFuture<?> fut2 = closeClientSock ? startCloseZkClientSocket(stopTime, stop) : null; - - int INIT_NODES = 10; - - startGridsMultiThreaded(INIT_NODES); - - final int MAX_NODES = 20; - - final List<Integer> startedNodes = new ArrayList<>(); - - for (int i = 0; i < INIT_NODES; i++) - startedNodes.add(i); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - final AtomicInteger startIdx = new AtomicInteger(INIT_NODES); - - try { - while (System.currentTimeMillis() < stopTime) { - if (startedNodes.size() >= MAX_NODES) { - int stopNodes = rnd.nextInt(5) + 1; - - log.info("Next, stop nodes: " + stopNodes); - - final List<Integer> idxs = new ArrayList<>(); - - while (idxs.size() < stopNodes) { - Integer stopIdx = rnd.nextInt(startedNodes.size()); - - if (!idxs.contains(stopIdx)) - idxs.add(startedNodes.get(stopIdx)); - } - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer threadIdx) { - int stopNodeIdx = idxs.get(threadIdx); - - info("Stop node: " + stopNodeIdx); - - stopGrid(stopNodeIdx); - } - }, stopNodes, "stop-node"); - - startedNodes.removeAll(idxs); - } - else { - int startNodes = rnd.nextInt(5) + 1; - - log.info("Next, start nodes: " + startNodes); - - GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override public Void call() throws Exception { - int idx = startIdx.incrementAndGet(); - - log.info("Start node: " + idx); - - startGrid(idx); - - synchronized (startedNodes) { - startedNodes.add(idx); - } - - return null; - } - }, startNodes, "start-node"); - } - - U.sleep(rnd.nextInt(100) + 1); - } - } - finally { - stop.set(true); - } - - if (fut1 != null) - fut1.get(); - - if (fut2 != null) - fut2.get(); - } - - /** - * @throws Exception If failed. - */ - public void testRandomTopologyChanges() throws Exception { - randomTopologyChanges(false, false); - } - - /** - * @throws Exception If failed. - */ - private void printZkNodes() throws Exception { - ZookeeperClient zkClient = new ZookeeperClient(new JavaLogger(), zkCluster.getConnectString(), 10_000, null); - - List<String> children = ZKUtil.listSubTreeBFS(zkClient.zk(), IGNITE_ZK_ROOT); - - info("Zookeeper nodes:"); - - for (String s : children) - info(s); - - zkClient.close(); - } - - /** - * @throws Exception If failed. - */ - public void testRandomTopologyChanges_RestartZk() throws Exception { - randomTopologyChanges(true, false); - } - - /** - * @throws Exception If failed. - */ - public void testRandomTopologyChanges_CloseClients() throws Exception { - randomTopologyChanges(false, true); - } - - /** - * @throws Exception If failed. - */ - public void testDeployService1() throws Exception { - startGridsMultiThreaded(3); - - grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl()); - } - - /** - * @throws Exception If failed. - */ - public void testDeployService2() throws Exception { - clientMode(false); - - startGrid(0); - - clientMode(true); - - startGrid(1); - - grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl()); - } - - /** - * @throws Exception If failed. - */ - public void testDeployService3() throws Exception { - IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - clientModeThreadLocal(true); - - startGrid(0); - - return null; - } - }, "start-node"); - - clientModeThreadLocal(false); - - startGrid(1); - - fut.get(); - - grid(0).services(grid(0).cluster()).deployNodeSingleton("test", new GridCacheAbstractFullApiSelfTest.DummyServiceImpl()); - } - - /** - * @throws Exception If failed. - */ - public void testLargeUserAttribute1() throws Exception { - initLargeAttribute(); - - startGrid(0); - - printZkNodes(); - - userAttrs = null; - - startGrid(1); - - waitForEventsAcks(ignite(0)); - - printZkNodes(); - - waitForTopology(2); - } - - /** - * @throws Exception If failed. - */ - public void testLargeUserAttribute2() throws Exception { - startGrid(0); - - initLargeAttribute(); - - startGrid(1); - - waitForEventsAcks(ignite(0)); - - printZkNodes(); - } - - /** - * @throws Exception If failed. - */ - public void testLargeUserAttribute3() throws Exception { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - long stopTime = System.currentTimeMillis() + 60_000; - - int nodes = 0; - - for (int i = 0; i < 25; i++) { - info("Iteration: " + i); - - if (rnd.nextBoolean()) - initLargeAttribute(); - else - userAttrs = null; - - clientMode(i > 5); - - startGrid(i); - - nodes++; - - if (System.currentTimeMillis() >= stopTime) - break; - } - - waitForTopology(nodes); - } - - /** - * - */ - private void initLargeAttribute() { - userAttrs = new HashMap<>(); - - int[] attr = new int[1024 * 1024 + ThreadLocalRandom.current().nextInt(1024)]; - - for (int i = 0; i < attr.length; i++) - attr[i] = i; - - userAttrs.put("testAttr", attr); - } - - /** - * @throws Exception If failed. - */ - public void testLargeCustomEvent() throws Exception { - Ignite srv0 = startGrid(0); - - // Send large message, single node in topology. - IgniteCache<Object, Object> cache = srv0.createCache(largeCacheConfiguration("c1")); - - for (int i = 0; i < 100; i++) - cache.put(i, i); - - assertEquals(1, cache.get(1)); - - waitForEventsAcks(ignite(0)); - - startGridsMultiThreaded(1, 3); - - srv0.destroyCache("c1"); - - // Send large message, multiple nodes in topology. - cache = srv0.createCache(largeCacheConfiguration("c1")); - - for (int i = 0; i < 100; i++) - cache.put(i, i); - - printZkNodes(); - - waitForTopology(4); - } - - /** - * @throws Exception If failed. - */ - public void testClientReconnectSessionExpire1_1() throws Exception { - clientReconnectSessionExpire(false); - } - - /** - * @throws Exception If failed. - */ - public void testClientReconnectSessionExpire1_2() throws Exception { - clientReconnectSessionExpire(true); - } - - /** - * @param closeSock Test mode flag. - * @throws Exception If failed. - */ - private void clientReconnectSessionExpire(boolean closeSock) throws Exception { - startGrid(0); - - sesTimeout = 2000; - clientMode(true); - testSockNio = true; - - Ignite client = startGrid(1); - - client.cache(DEFAULT_CACHE_NAME).put(1, 1); - - reconnectClientNodes(log, Collections.singletonList(client), null, closeSock); - - assertEquals(1, client.cache(DEFAULT_CACHE_NAME).get(1)); - - client.compute().broadcast(new DummyCallable(null)); - } - - /** - * @throws Exception If failed. - */ - public void testForceClientReconnect() throws Exception { - final int SRVS = 3; - - startGrids(SRVS); - - clientMode(true); - - startGrid(SRVS); - - reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable<Void>() { - @Override public Void call() throws Exception { - ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(SRVS)); - - spi.clientReconnect(); - - return null; - } - }); - - waitForTopology(SRVS + 1); - } - - /** - * @throws Exception If failed. - */ - public void testForcibleClientFail() throws Exception { - final int SRVS = 3; - - startGrids(SRVS); - - clientMode(true); - - startGrid(SRVS); - - reconnectClientNodes(Collections.singletonList(ignite(SRVS)), new Callable<Void>() { - @Override public Void call() throws Exception { - ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(0)); - - spi.failNode(ignite(SRVS).cluster().localNode().id(), "Test forcible node fail"); - - return null; - } - }); - - waitForTopology(SRVS + 1); - } - - /** - * @throws Exception If failed. - */ - public void testDuplicatedNodeId() throws Exception { - UUID nodeId0 = nodeId = UUID.randomUUID(); - - startGrid(0); - - int failingNodeIdx = 100; - - for (int i = 0; i < 5; i++) { - final int idx = failingNodeIdx++; - - nodeId = nodeId0; - - info("Start node with duplicated ID [iter=" + i + ", nodeId=" + nodeId + ']'); - - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(idx); - - return null; - } - }, IgniteCheckedException.class, null); - - nodeId = null; - - info("Start node with unique ID [iter=" + i + ']'); - - Ignite ignite = startGrid(idx); - - nodeId0 = ignite.cluster().localNode().id(); - - waitForTopology(i + 2); - } - } - - /** - * @throws Exception If failed. - */ - public void testPing() throws Exception { - sesTimeout = 5000; - - startGrids(3); - - final ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(1)); - - final UUID nodeId = ignite(2).cluster().localNode().id(); - - IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - assertTrue(spi.pingNode(nodeId)); - } - }, 32, "ping"); - - fut.get(); - - fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - spi.pingNode(nodeId); - } - }, 32, "ping"); - - U.sleep(100); - - stopGrid(2); - - fut.get(); - - fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - assertFalse(spi.pingNode(nodeId)); - } - }, 32, "ping"); - - fut.get(); - } - - /** - * @throws Exception If failed. - */ - public void testWithPersistence1() throws Exception { - startWithPersistence(false); - } - - /** - * @throws Exception If failed. - */ - public void testWithPersistence2() throws Exception { - startWithPersistence(true); - } - - /** - * @throws Exception If failed. - */ - public void testNoOpCommunicationErrorResolve_1() throws Exception { - communicationErrorResolve_Simple(2); - } - - /** - * @throws Exception If failed. - */ - public void testNoOpCommunicationErrorResolve_2() throws Exception { - communicationErrorResolve_Simple(10); - } - - /** - * @param nodes Nodes number. - * @throws Exception If failed. - */ - private void communicationErrorResolve_Simple(int nodes) throws Exception { - assert nodes > 1; - - sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; - - startGridsMultiThreaded(nodes); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - for (int i = 0; i < 3; i++) { - info("Iteration: " + i); - - int idx1 = rnd.nextInt(nodes); - - int idx2; - - do { - idx2 = rnd.nextInt(nodes); - } - while (idx1 == idx2); - - ZookeeperDiscoverySpi spi = spi(ignite(idx1)); - - spi.resolveCommunicationError(ignite(idx2).cluster().localNode(), new Exception("test")); - - checkInternalStructuresCleanup(); - } - } - - /** - * Tests case when one node fails before sending communication status. - * - * @throws Exception If failed. - */ - public void testNoOpCommunicationErrorResolve_3() throws Exception { - sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; - - startGridsMultiThreaded(3); - - sesTimeout = 10_000; - - testSockNio = true; - sesTimeout = 5000; - - startGrid(3); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() { - ZookeeperDiscoverySpi spi = spi(ignite(0)); - - spi.resolveCommunicationError(ignite(1).cluster().localNode(), new Exception("test")); - - return null; - } - }); - - U.sleep(1000); - - ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(ignite(3)); - - nio.closeSocket(true); - - try { - stopGrid(3); - - fut.get(); - } - finally { - nio.allowConnect(); - } - - waitForTopology(3); - } - - /** - * Tests case when Coordinator fails while resolve process is in progress. - * - * @throws Exception If failed. - */ - public void testNoOpCommunicationErrorResolve_4() throws Exception { - testCommSpi = true; - - sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; - - startGrid(0); - - startGridsMultiThreaded(1, 3); - - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3)); - - commSpi.pingLatch = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() { - ZookeeperDiscoverySpi spi = spi(ignite(1)); - - spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test")); - - return null; - } - }); - - U.sleep(1000); - - assertFalse(fut.isDone()); - - stopGrid(0); - - commSpi.pingLatch.countDown(); - - fut.get(); - - waitForTopology(3); - } - - /** - * Tests that nodes join is delayed while resolve is in progress. - * - * @throws Exception If failed. - */ - public void testNoOpCommunicationErrorResolve_5() throws Exception { - testCommSpi = true; - - sesTimeout = 2000; - commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY; - - startGrid(0); - - startGridsMultiThreaded(1, 3); - - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3)); - - commSpi.pingStartLatch = new CountDownLatch(1); - commSpi.pingLatch = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() { - ZookeeperDiscoverySpi spi = spi(ignite(1)); - - spi.resolveCommunicationError(ignite(2).cluster().localNode(), new Exception("test")); - - return null; - } - }); - - assertTrue(commSpi.pingStartLatch.await(10, SECONDS)); - - try { - assertFalse(fut.isDone()); - - final AtomicInteger nodeIdx = new AtomicInteger(3); - - IgniteInternalFuture<?> startFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(nodeIdx.incrementAndGet()); - - return null; - } - }, 3, "start-node"); - - U.sleep(1000); - - assertFalse(startFut.isDone()); - - assertEquals(4, ignite(0).cluster().nodes().size()); - - commSpi.pingLatch.countDown(); - - startFut.get(); - fut.get(); - - waitForTopology(7); - } - finally { - commSpi.pingLatch.countDown(); - } - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillNode_1() throws Exception { - communicationErrorResolve_KillNodes(2, Collections.singleton(2L)); - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillNode_2() throws Exception { - communicationErrorResolve_KillNodes(3, Collections.singleton(2L)); - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillNode_3() throws Exception { - communicationErrorResolve_KillNodes(10, Arrays.asList(2L, 4L, 6L)); - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillCoordinator_1() throws Exception { - communicationErrorResolve_KillNodes(2, Collections.singleton(1L)); - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillCoordinator_2() throws Exception { - communicationErrorResolve_KillNodes(3, Collections.singleton(1L)); - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillCoordinator_3() throws Exception { - communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 4L, 6L)); - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillCoordinator_4() throws Exception { - communicationErrorResolve_KillNodes(10, Arrays.asList(1L, 2L, 3L)); - } - - /** - * @param startNodes Number of nodes to start. - * @param killNodes Nodes to kill by resolve process. - * @throws Exception If failed. - */ - private void communicationErrorResolve_KillNodes(int startNodes, Collection<Long> killNodes) throws Exception { - testCommSpi = true; - - commProblemRslvr = TestNodeKillCommunicationProblemResolver.factory(killNodes); - - startGrids(startNodes); - - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(0)); - - commSpi.checkRes = new BitSet(startNodes); - - ZookeeperDiscoverySpi spi = null; - UUID killNodeId = null; - - for (Ignite node : G.allGrids()) { - ZookeeperDiscoverySpi spi0 = spi(node); - - if (!killNodes.contains(node.cluster().localNode().order())) - spi = spi0; - else - killNodeId = node.cluster().localNode().id(); - } - - assertNotNull(spi); - assertNotNull(killNodeId); - - try { - spi.resolveCommunicationError(spi.getNode(killNodeId), new Exception("test")); - - fail("Exception is not thrown"); - } - catch (IgniteSpiException e) { - assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException); - } - - int expNodes = startNodes - killNodes.size(); - - waitForTopology(expNodes); - - for (Ignite node : G.allGrids()) - assertFalse(killNodes.contains(node.cluster().localNode().order())); - - startGrid(startNodes); - - waitForTopology(expNodes + 1); - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillCoordinator_5() throws Exception { - sesTimeout = 2000; - - testCommSpi = true; - commProblemRslvr = KillCoordinatorCommunicationProblemResolver.FACTORY; - - startGrids(10); - - int crd = 0; - - int nodeIdx = 10; - - for (int i = 0; i < 10; i++) { - info("Iteration: " + i); - - for (Ignite node : G.allGrids()) - ZkTestCommunicationSpi.spi(node).initCheckResult(10); - - UUID crdId = ignite(crd).cluster().localNode().id(); - - ZookeeperDiscoverySpi spi = spi(ignite(crd + 1)); - - try { - spi.resolveCommunicationError(spi.getNode(crdId), new Exception("test")); - - fail("Exception is not thrown"); - } - catch (IgniteSpiException e) { - assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException); - } - - waitForTopology(9); - - startGrid(nodeIdx++); - - waitForTopology(10); - - crd++; - } - } - - /** - * @throws Exception If failed. - */ - public void testCommunicationErrorResolve_KillRandom() throws Exception { - sesTimeout = 2000; - - testCommSpi = true; - commProblemRslvr = KillRandomCommunicationProblemResolver.FACTORY; - - startGridsMultiThreaded(10); - - clientMode(true); - - startGridsMultiThreaded(10, 5); - - int nodeIdx = 15; - - for (int i = 0; i < 10; i++) { - info("Iteration: " + i); - - ZookeeperDiscoverySpi spi = null; - - for (Ignite node : G.allGrids()) { - ZkTestCommunicationSpi.spi(node).initCheckResult(100); - - spi = spi(node); - } - - assert spi != null; - - try { - spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new Exception("test")); - } - catch (IgniteSpiException ignore) { - // No-op. - } - - clientMode(ThreadLocalRandom.current().nextBoolean()); - - startGrid(nodeIdx++); - - awaitPartitionMapExchange(); - } - } - - /** - * @throws Exception If failed. - */ - public void testDefaultCommunicationErrorResolver1() throws Exception { - testCommSpi = true; - sesTimeout = 5000; - - startGrids(3); - - ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 0, 1); - ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(3, 0, 1); - ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(3, 2); - - UUID killedId = nodeId(2); - - assertNotNull(ignite(0).cluster().node(killedId)); - - ZookeeperDiscoverySpi spi = spi(ignite(0)); - - spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); - - waitForTopology(2); - - assertNull(ignite(0).cluster().node(killedId)); - } - - /** - * @throws Exception If failed. - */ - public void testDefaultCommunicationErrorResolver2() throws Exception { - testCommSpi = true; - sesTimeout = 5000; - - startGrids(3); - - clientMode(true); - - startGridsMultiThreaded(3, 2); - - ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(5, 0, 1); - ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(5, 0, 1); - ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(5, 2, 3, 4); - ZkTestCommunicationSpi.spi(ignite(3)).initCheckResult(5, 2, 3, 4); - ZkTestCommunicationSpi.spi(ignite(4)).initCheckResult(5, 2, 3, 4); - - ZookeeperDiscoverySpi spi = spi(ignite(0)); - - spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); - - waitForTopology(2); - } - - /** - * @throws Exception If failed. - */ - public void testDefaultCommunicationErrorResolver3() throws Exception { - defaultCommunicationErrorResolver_BreakCommunication(3, 1); - } - - /** - * @throws Exception If failed. - */ - public void testDefaultCommunicationErrorResolver4() throws Exception { - defaultCommunicationErrorResolver_BreakCommunication(3, 0); - } - - /** - * @throws Exception If failed. - */ - public void testDefaultCommunicationErrorResolver5() throws Exception { - defaultCommunicationErrorResolver_BreakCommunication(10, 1, 3, 6); - } - - /** - * @param startNodes Initial nodes number. - * @param breakNodes Node indices where communication server is closed. - * @throws Exception If failed. - */ - private void defaultCommunicationErrorResolver_BreakCommunication(int startNodes, final int...breakNodes) throws Exception { - sesTimeout = 5000; - - startGridsMultiThreaded(startNodes); - - final CyclicBarrier b = new CyclicBarrier(breakNodes.length); - - GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { - @Override public void apply(Integer threadIdx) { - try { - b.await(); - - int nodeIdx = breakNodes[threadIdx]; - - info("Close communication: " + nodeIdx); - - ((TcpCommunicationSpi)ignite(nodeIdx).configuration().getCommunicationSpi()).simulateNodeFailure(); - } - catch (Exception e) { - fail("Unexpected error: " + e); - } - } - }, breakNodes.length, "break-communication"); - - waitForTopology(startNodes - breakNodes.length); - } - - /** - * @throws Exception If failed. - */ - public void testConnectionCheck() throws Exception { - final int NODES = 5; - - startGridsMultiThreaded(NODES); - - for (int i = 0; i < NODES; i++) { - Ignite node = ignite(i); - - TcpCommunicationSpi spi = (TcpCommunicationSpi)node.configuration().getCommunicationSpi(); - - List<ClusterNode> nodes = new ArrayList<>(node.cluster().nodes()); - - BitSet res = spi.checkConnection(nodes).get(); - - for (int j = 0; j < NODES; j++) - assertTrue(res.get(j)); - } - } - - /** - * @throws Exception If failed. - */ - public void testReconnectDisabled_ConnectionLost() throws Exception { - clientReconnectDisabled = true; - - startGrid(0); - - sesTimeout = 3000; - testSockNio = true; - client = true; - - Ignite client = startGrid(1); - - final CountDownLatch latch = new CountDownLatch(1); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return false; - } - }, EventType.EVT_NODE_SEGMENTED); - - ZkTestClientCnxnSocketNIO nio = ZkTestClientCnxnSocketNIO.forNode(client); - - nio.closeSocket(true); - - try { - waitNoAliveZkNodes(log, - zkCluster.getConnectString(), - Collections.singletonList(aliveZkNodePath(client)), - 10_000); - } - finally { - nio.allowConnect(); - } - - assertTrue(latch.await(10, SECONDS)); - } - - /** - * @throws Exception If failed. - */ - public void testServersLeft_FailOnTimeout() throws Exception { - startGrid(0); - - final int CLIENTS = 5; - - joinTimeout = 3000; - - clientMode(true); - - startGridsMultiThreaded(1, CLIENTS); - - waitForTopology(CLIENTS + 1); - - final CountDownLatch latch = new CountDownLatch(CLIENTS); - - for (int i = 0; i < CLIENTS; i++) { - Ignite node = ignite(i + 1); - - node.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return false; - } - }, EventType.EVT_NODE_SEGMENTED); - } - - stopGrid(getTestIgniteInstanceName(0), true, false); - - assertTrue(latch.await(10, SECONDS)); - - evts.clear(); - } - - /** - * - */ - public void testStartNoServers_FailOnTimeout() { - joinTimeout = 3000; - - clientMode(true); - - long start = System.currentTimeMillis(); - - Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - startGrid(0); - - return null; - } - }, IgniteCheckedException.class, null); - - assertTrue(System.currentTimeMillis() >= start + joinTimeout); - - IgniteSpiException spiErr = X.cause(err, IgniteSpiException.class); - - assertNotNull(spiErr); - assertTrue(spiErr.getMessage().contains("Failed to connect to cluster within configured timeout")); - } - - /** - * @throws Exception If failed. - */ - public void testStartNoServer_WaitForServers1() throws Exception { - startNoServer_WaitForServers(0); - } - - /** - * @throws Exception If failed. - */ - public void testStartNoServer_WaitForServers2() throws Exception { - startNoServer_WaitForServers(10_000); - } - - /** - * @param joinTimeout Join timeout. - * @throws Exception If failed. - */ - private void startNoServer_WaitForServers(long joinTimeout) throws Exception { - this.joinTimeout = joinTimeout; - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - clientModeThreadLocal(true); - - startGrid(0); - - return null; - } - }); - - U.sleep(3000); - - waitSpi(getTestIgniteInstanceName(0)); - - clientModeThreadLocal(false); - - startGrid(1); - - fut.get(); - - waitForTopology(2); - } - - /** - * @throws Exception If failed. - */ - public void testDisconnectOnServersLeft_1() throws Exception { - disconnectOnServersLeft(1, 1); - } - - /** - * @throws Exception If f
<TRUNCATED>
