Repository: ignite Updated Branches: refs/heads/ignite-zk d6ec00c0e -> a3a625699
zk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3a62569 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3a62569 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3a62569 Branch: refs/heads/ignite-zk Commit: a3a6256996586cfebe97b964b56be864277226f5 Parents: d6ec00c Author: sboikov <[email protected]> Authored: Wed Dec 20 11:45:12 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 20 12:49:35 2017 +0300 ---------------------------------------------------------------------- .../DefaultCommunicationProblemResolver.java | 108 ++++++-- .../communication/tcp/TcpCommunicationSpi.java | 12 +- .../TcpCommunicationConnectionCheckFuture.java | 3 +- .../ZookeeperDiscoverySpiBasicTest.java | 273 ++++++++++++++++--- 4 files changed, 333 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java index ed2ddb8..ca7bcd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java @@ -19,27 +19,92 @@ package org.apache.ignite.configuration; import java.util.BitSet; import java.util.List; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.LoggerResource; /** * */ public class DefaultCommunicationProblemResolver implements CommunicationProblemResolver { + @LoggerResource + private IgniteLogger log; + /** {@inheritDoc} */ @Override public void resolve(CommunicationProblemContext ctx) { - ClusterGraph graph = new ClusterGraph(ctx); + ClusterGraph graph = new ClusterGraph(log, ctx); - BitSet cluster = graph.findLargestIndependentCluster(); + ClusterSearch cluster = graph.findLargestIndependentCluster(); List<ClusterNode> nodes = ctx.topologySnapshot(); - if (graph.checkFullyConnected(cluster) && cluster.cardinality() < nodes.size()) { - for (int i = 0; i < nodes.size(); i++) { - if (!cluster.get(i)) - ctx.killNode(nodes.get(i)); + assert nodes.size() > 0; + assert cluster != null; + + if (graph.checkFullyConnected(cluster.nodesBitSet)) { + assert cluster.nodeCnt <= nodes.size(); + + if (cluster.nodeCnt < nodes.size()) { + if (log.isInfoEnabled()) { + log.info("Communication problem resolver found fully connected independent cluster [" + + "clusterSrvCnt=" + cluster.srvCnt + + ", clusterTotalNodes=" + cluster.nodeCnt + + ", totalAliveNodes=" + nodes.size() + "]"); + } + + for (int i = 0; i < nodes.size(); i++) { + if (!cluster.nodesBitSet.get(i)) + ctx.killNode(nodes.get(i)); + } } + else + U.warn(log, "All alive nodes are fully connected, this should be resolved automatically."); + } + else { + if (log.isInfoEnabled()) { + log.info("Communication problem resolver failed to find fully connected independent cluster."); + } + } + } + + /** + * @param cluster Cluster nodes mask. + * @param nodes Nodes. + * @param limit IDs limit. + * @return Cluster node IDs string. + */ + private static String clusterNodeIds(BitSet cluster, List<ClusterNode> nodes, int limit) { + int startIdx = 0; + + StringBuilder builder = new StringBuilder(); + + int cnt = 0; + + for (;;) { + int idx = cluster.nextSetBit(startIdx); + + if (idx == -1) + break; + + startIdx = idx + 1; + + if (builder.length() == 0) { + builder.append('['); + } + else + builder.append(", "); + + builder.append(nodes.get(idx).id()); + + if (cnt++ > limit) + builder.append(", ..."); } + + builder.append(']'); + + return builder.toString(); } /** @@ -70,13 +135,8 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem /** */ private final static int WORD_IDX_SHIFT = 6; - /** - * @param bitIndex Bit index. - * @return Word index containing bit with given index. - */ - private static int wordIndex(int bitIndex) { - return bitIndex >> WORD_IDX_SHIFT; - } + /** */ + private final IgniteLogger log; /** */ private final int nodeCnt; @@ -91,9 +151,11 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem private final List<ClusterNode> nodes; /** + * @param log Logger. * @param ctx Context. */ - ClusterGraph(CommunicationProblemContext ctx) { + ClusterGraph(IgniteLogger log, CommunicationProblemContext ctx) { + this.log = log; this.ctx = ctx; nodes = ctx.topologySnapshot(); @@ -106,6 +168,14 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem } /** + * @param bitIndex Bit index. + * @return Word index containing bit with given index. + */ + private static int wordIndex(int bitIndex) { + return bitIndex >> WORD_IDX_SHIFT; + } + + /** * @param bitCnt Number of bits. * @return Bit set words. */ @@ -116,7 +186,7 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem /** * @return Cluster nodes bit set. */ - BitSet findLargestIndependentCluster() { + ClusterSearch findLargestIndependentCluster() { ClusterSearch maxCluster = null; for (int i = 0; i < nodeCnt; i++) { @@ -127,11 +197,17 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem search(cluster, i); + if (log.isInfoEnabled()) { + log.info("Communication problem resolver found cluster [srvCnt=" + cluster.srvCnt + + ", totalNodeCnt=" + cluster.nodeCnt + + ", nodeIds=" + clusterNodeIds(cluster.nodesBitSet, nodes, 1000) + "]"); + } + if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt) maxCluster = cluster; } - return maxCluster != null ? maxCluster.nodesBitSet : null; + return maxCluster; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 37be29f..4b7199d 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2595,7 +2595,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati sendMessage0(node, msg, null); } - /** {@inheritDoc} */ + /** + * @param nodes Nodes to check connection with. + * @return Result future (each bit in result BitSet contains connection status to corresponding node). + */ public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture( this, @@ -2603,7 +2606,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati nioSrvr, nodes); - fut.init(failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout); + long timeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout; + + if (log.isInfoEnabled()) + log.info("Start check connection process [nodeCnt=" + nodes.size() + ", timeout=" + timeout + ']'); + + fut.init(timeout); return new IgniteFutureImpl<>(fut); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java index 170ee44..9c161d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java @@ -334,6 +334,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit /** * */ + @SuppressWarnings("unchecked") void cancel() { if (finish(false)) nioSrvr.cancelConnect(ch, sesMeta); @@ -440,7 +441,7 @@ public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<Bit futs[idx++] = fut; - if (done()) + if (resCnt == Integer.MAX_VALUE) return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a3a62569/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 1373da8..6d61ac2 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -25,8 +25,10 @@ import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.Callable; @@ -49,6 +51,8 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.CommunicationProblemContext; +import org.apache.ignite.configuration.CommunicationProblemResolver; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -77,10 +81,9 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.logger.java.JavaLogger; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.configuration.CommunicationProblemContext; -import org.apache.ignite.configuration.CommunicationProblemResolver; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; @@ -103,6 +106,7 @@ import static org.apache.zookeeper.ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET; /** * TODO ZK: test with max client connections limit error. */ +@SuppressWarnings("deprecation") public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** */ private static final String IGNITE_ZK_ROOT = ZookeeperDiscoverySpi.DFLT_ROOT_PATH; @@ -199,6 +203,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { @IgniteInstanceResource private Ignite ignite; + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public boolean apply(Event evt) { try { DiscoveryEvent discoveryEvt = (DiscoveryEvent)evt; @@ -773,7 +778,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { c1.closeSocket(true); IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { + @Override public Void call() { try { startGrid(2); } @@ -864,7 +869,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { final AtomicInteger nodeIdx = new AtomicInteger(initNodes); IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { + @Override public Void call() { try { startGrid(nodeIdx.getAndIncrement()); } @@ -929,16 +934,16 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * @param node - * @return + * @param node Node. + * @return Corresponding znode. */ private static String aliveZkNodePath(Ignite node) { return aliveZkNodePath(node.configuration().getDiscoverySpi()); } /** - * @param spi - * @return + * @param spi SPI. + * @return Znode related to given SPI. */ private static String aliveZkNodePath(DiscoverySpi spi) { String path = GridTestUtils.getFieldValue(spi, "impl", "rtState", "locNodeZkPath"); @@ -947,14 +952,15 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * @param log - * @param connectString - * @param failedZkNodes - * @param timeout - * @throws Exception + * @param log Logger. + * @param connectString Zookeeper connect string. + * @param failedZkNodes Znodes which should be removed. + * @param timeout Timeout. + * @throws Exception If failed. */ private static void waitNoAliveZkNodes(final IgniteLogger log, - String connectString, final List<String> failedZkNodes, + String connectString, + final List<String> failedZkNodes, long timeout) throws Exception { @@ -1563,7 +1569,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { Ignite srv0 = startGrid(0); // Send large message, single node in topology. - IgniteCache cache = srv0.createCache(largeCacheConfiguration("c1")); + IgniteCache<Object, Object> cache = srv0.createCache(largeCacheConfiguration("c1")); for (int i = 0; i < 100; i++) cache.put(i, i); @@ -1893,7 +1899,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { startGridsMultiThreaded(1, 3); - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(3)); + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3)); commSpi.pingLatch = new CountDownLatch(1); @@ -1970,8 +1976,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * TODO ZK: kill random, kill coordinator multiple times. - * * @param startNodes Number of nodes to start. * @param killNodes Nodes to kill by resolve process. * @throws Exception If failed. @@ -1983,7 +1987,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { startGrids(startNodes); - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(0)); + ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(0)); commSpi.checkRes = new BitSet(startNodes); @@ -2026,15 +2030,102 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCommunicationErrorResolve_KillCoordinator_5() throws Exception { + sesTimeout = 2000; + + testCommSpi = true; + commProblemRslvr = KillCoordinatorCommunicationProblemResolver.FACTORY; + + startGrids(10); + + int crd = 0; + + int nodeIdx = 10; + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + for (Ignite node : G.allGrids()) + ZkTestCommunicationSpi.spi(node).initCheckResult(10); + + UUID crdId = ignite(crd).cluster().localNode().id(); + + ZookeeperDiscoverySpi spi = spi(ignite(crd + 1)); + + try { + spi.resolveCommunicationError(spi.getNode(crdId), new Exception("test")); + + fail("Exception is not thrown"); + } + catch (IgniteSpiException e) { + assertTrue("Unexpected exception: " + e, e.getCause() instanceof ClusterTopologyCheckedException); + } + + waitForTopology(9); + + startGrid(nodeIdx++); + + waitForTopology(10); + + crd++; + } + } + + /** + * @throws Exception If failed. + */ + public void testCommunicationErrorResolve_KillRandom() throws Exception { + sesTimeout = 2000; + + testCommSpi = true; + commProblemRslvr = KillRandomCommunicationProblemResolver.FACTORY; + + startGridsMultiThreaded(10); + + client = true; + + startGridsMultiThreaded(10, 5); + + int nodeIdx = 15; + + for (int i = 0; i < 10; i++) { + info("Iteration: " + i); + + ZookeeperDiscoverySpi spi = null; + + for (Ignite node : G.allGrids()) { + ZkTestCommunicationSpi.spi(node).initCheckResult(100); + + spi = spi(node); + } + + try { + spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new Exception("test")); + } + catch (IgniteSpiException ignore) { + // No-op. + } + + client = ThreadLocalRandom.current().nextBoolean(); + + startGrid(nodeIdx++); + + awaitPartitionMapExchange(); + } + } + + /** + * @throws Exception If failed. + */ public void testDefaultCommunicationErrorResolver1() throws Exception { testCommSpi = true; sesTimeout = 5000; startGrids(3); - ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 0, 1); - ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(3, 0, 1); - ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 2); + ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(3, 2); UUID killedId = nodeId(2); @@ -2062,11 +2153,11 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { startGridsMultiThreaded(3, 2); - ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(5, 0, 1); - ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(5, 0, 1); - ZkTestCommunicationSpi.forNode(ignite(2)).initCheckResult(5, 2, 3, 4); - ZkTestCommunicationSpi.forNode(ignite(3)).initCheckResult(5, 2, 3, 4); - ZkTestCommunicationSpi.forNode(ignite(4)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.spi(ignite(0)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.spi(ignite(1)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.spi(ignite(2)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.spi(ignite(3)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.spi(ignite(4)).initCheckResult(5, 2, 3, 4); ZookeeperDiscoverySpi spi = spi(ignite(0)); @@ -2079,15 +2170,53 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDefaultCommunicationErrorResolver3() throws Exception { + defaultCommunicationErrorResolver_BreakCommunication(3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationErrorResolver4() throws Exception { + defaultCommunicationErrorResolver_BreakCommunication(3, 0); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationErrorResolver5() throws Exception { + defaultCommunicationErrorResolver_BreakCommunication(10, 1, 3, 6); + } + + /** + * @param startNodes Initial nodes number. + * @param breakNodes Node indices where communication server is closed. + * @throws Exception If failed. + */ + private void defaultCommunicationErrorResolver_BreakCommunication(int startNodes, final int...breakNodes) throws Exception { sesTimeout = 5000; - startGridsMultiThreaded(3); + startGridsMultiThreaded(startNodes); - info("Close communication"); + final CyclicBarrier b = new CyclicBarrier(breakNodes.length); - ((TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).simulateNodeFailure(); + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer threadIdx) { + try { + b.await(); - waitForTopology(2); + int nodeIdx = breakNodes[threadIdx]; + + info("Close communication: " + nodeIdx); + + ((TcpCommunicationSpi)ignite(nodeIdx).configuration().getCommunicationSpi()).simulateNodeFailure(); + } + catch (Exception e) { + fail("Unexpected error: " + e); + } + } + }, breakNodes.length, "break-communication"); + + waitForTopology(startNodes - breakNodes.length); } /** @@ -2414,6 +2543,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private void checkEventsConsistency() { for (Map.Entry<UUID, Map<Long, DiscoveryEvent>> nodeEvtEntry : evts.entrySet()) { UUID nodeId = nodeEvtEntry.getKey(); @@ -2517,6 +2647,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { */ private void checkEvents(final UUID nodeId, final DiscoveryEvent...expEvts) throws Exception { assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @Override public boolean apply() { Map<Long, DiscoveryEvent> nodeEvts = evts.get(nodeId); @@ -2548,17 +2679,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * @param node Node. - */ - private static void closeZkClient(Ignite node) { - DiscoverySpi spi = node.configuration().getDiscoverySpi(); - - assertTrue(spi.getClass().getName(), spi instanceof ZookeeperDiscoverySpi); - - closeZkClient((ZookeeperDiscoverySpi)spi); - } - - /** * @param spi Spi instance. */ private static void closeZkClient(ZookeeperDiscoverySpi spi) { @@ -2717,7 +2837,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @param cacheName Cache name. * @return Configuration. */ - private CacheConfiguration largeCacheConfiguration(String cacheName) { + private CacheConfiguration<Object, Object> largeCacheConfiguration(String cacheName) { CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(cacheName); ccfg.setAffinity(new TestAffinityFunction(1024 * 1024)); @@ -2802,6 +2922,71 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * */ + static class KillCoordinatorCommunicationProblemResolver implements CommunicationProblemResolver { + /** */ + static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() { + @Override public CommunicationProblemResolver apply() { + return new KillCoordinatorCommunicationProblemResolver(); + } + }; + + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void resolve(CommunicationProblemContext ctx) { + List<ClusterNode> nodes = ctx.topologySnapshot(); + + ClusterNode node = nodes.get(0); + + log.info("Resolver kills node: " + node.id()); + + ctx.killNode(node); + } + } + + /** + * + */ + static class KillRandomCommunicationProblemResolver implements CommunicationProblemResolver { + /** */ + static final IgniteOutClosure<CommunicationProblemResolver> FACTORY = new IgniteOutClosure<CommunicationProblemResolver>() { + @Override public CommunicationProblemResolver apply() { + return new KillRandomCommunicationProblemResolver(); + } + }; + + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void resolve(CommunicationProblemContext ctx) { + List<ClusterNode> nodes = ctx.topologySnapshot(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int killNodes = rnd.nextInt(nodes.size() / 2); + + log.info("Resolver kills nodes [total=" + nodes.size() + ", kill=" + killNodes + ']'); + + Set<Integer> idxs = new HashSet<>(); + + while (idxs.size() < killNodes) + idxs.add(rnd.nextInt(nodes.size())); + + for (int idx : idxs) { + ClusterNode node = nodes.get(idx); + + log.info("Resolver kills node: " + node.id()); + + ctx.killNode(node); + } + } + } + + /** + * + */ static class TestNodeKillCommunicationProblemResolver implements CommunicationProblemResolver { /** * @param killOrders Killed nodes order. @@ -2852,7 +3037,7 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { * @param ignite Node. * @return Node's communication SPI. */ - static ZkTestCommunicationSpi forNode(Ignite ignite) { + static ZkTestCommunicationSpi spi(Ignite ignite) { return (ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi(); }
