zk
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e419ec83 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e419ec83 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e419ec83 Branch: refs/heads/ignite-zk Commit: e419ec83df5a986a968cd5a63b5b1bb8cf76673f Parents: 4525b92 Author: sboikov <[email protected]> Authored: Tue Dec 19 14:51:43 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 19 15:00:39 2017 +0300 ---------------------------------------------------------------------- .../DefaultCommunicationProblemResolver.java | 45 +++++++--- .../zk/internal/ZookeeperDiscoveryImpl.java | 9 +- .../ZookeeperDiscoverySpiBasicTest.java | 89 ++++++++++++-------- .../testframework/junits/GridAbstractTest.java | 2 +- 4 files changed, 95 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java index 1e973d3..ed2ddb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java @@ -20,6 +20,7 @@ package org.apache.ignite.configuration; import java.util.BitSet; import java.util.List; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.util.typedef.internal.CU; /** * @@ -44,6 +45,27 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem /** * */ + private static class ClusterSearch { + /** */ + int srvCnt; + + /** */ + int nodeCnt; + + /** */ + final BitSet nodesBitSet; + + /** + * @param nodes Total nodes. + */ + ClusterSearch(int nodes) { + nodesBitSet = new BitSet(nodes); + } + } + + /** + * + */ private static class ClusterGraph { /** */ private final static int WORD_IDX_SHIFT = 6; @@ -95,26 +117,21 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem * @return Cluster nodes bit set. */ BitSet findLargestIndependentCluster() { - BitSet maxCluster = null; - int maxClusterSize = 0; + ClusterSearch maxCluster = null; for (int i = 0; i < nodeCnt; i++) { if (getBit(visitBitSet, i)) continue; - BitSet cluster = new BitSet(nodeCnt); + ClusterSearch cluster = new ClusterSearch(nodeCnt); search(cluster, i); - int size = cluster.cardinality(); - - if (maxCluster == null || size > maxClusterSize) { + if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt) maxCluster = cluster; - maxClusterSize = size; - } } - return maxCluster; + return maxCluster != null ? maxCluster.nodesBitSet : null; } /** @@ -154,13 +171,19 @@ public class DefaultCommunicationProblemResolver implements CommunicationProblem * @param cluster Current cluster bit set. * @param idx Node index. */ - void search(BitSet cluster, int idx) { + void search(ClusterSearch cluster, int idx) { + assert !getBit(visitBitSet, idx); + setBit(visitBitSet, idx); - cluster.set(idx); + cluster.nodesBitSet.set(idx); + cluster.nodeCnt++; ClusterNode node1 = nodes.get(idx); + if (!CU.clientNode(node1)) + cluster.srvCnt++; + for (int i = 0; i < nodeCnt; i++) { if (i == idx || getBit(visitBitSet, i)) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index f1ad869..7032fd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -2088,7 +2088,7 @@ public class ZookeeperDiscoveryImpl { assert node != null : msg.nodeId; if (node.isLocal()) - throw localNodeFail("Received force EVT_NODE_FAILED event for local node."); + throw localNodeFail("Received force EVT_NODE_FAILED event for local node.", true); else notifyNodeFail(node.internalId(), evtData.topologyVersion()); } @@ -2137,7 +2137,7 @@ public class ZookeeperDiscoveryImpl { deleteAliveNodes(res.killedNodes); throw localNodeFail("Local node is forced to stop by communication error resolver " + - "[nodeId=" + locNode.id() + ']'); + "[nodeId=" + locNode.id() + ']', false); } ZookeeperClusterNode node = rtState.top.nodesByInternalId.get(internalId); @@ -2527,14 +2527,15 @@ public class ZookeeperDiscoveryImpl { /** * @param msg Message to log. + * @param clientReconnect {@code True} if allow client reconnect. * @return Exception to be thrown. */ - private Exception localNodeFail(String msg) { + private Exception localNodeFail(String msg, boolean clientReconnect) { U.warn(log, msg); rtState.onCloseStart(); - if (clientReconnectEnabled) { + if (clientReconnect && clientReconnectEnabled) { assert locNode.isClient() : locNode; boolean reconnect = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java index 44e48f9..0630af4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java @@ -1718,23 +1718,6 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { } /** - * TODO ZK - * - * @throws Exception If failed. - */ - public void _testCommunicationFailure() throws Exception { - Ignite srv0 = startGrid(0); - - Ignite srv1 = startGrid(1); - - info("Close communication"); - - ((TcpCommunicationSpi)srv1.configuration().getCommunicationSpi()).simulateNodeFailure(); - - Thread.sleep(60_000); - } - - /** * @throws Exception If failed. */ public void testPing() throws Exception { @@ -2035,23 +2018,9 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { startGrids(3); - { - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(0)); - commSpi.checkRes = new BitSet(3); - commSpi.checkRes.set(0); - commSpi.checkRes.set(1); - } - { - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(1)); - commSpi.checkRes = new BitSet(3); - commSpi.checkRes.set(0); - commSpi.checkRes.set(1); - } - { - ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.forNode(ignite(2)); - commSpi.checkRes = new BitSet(3); - commSpi.checkRes.set(2); - } + ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(3, 0, 1); + ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 2); UUID killedId = nodeId(2); @@ -2069,6 +2038,47 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testDefaultCommunicationErrorResolver2() throws Exception { + testCommSpi = true; + sesTimeout = 5000; + + startGrids(3); + + client = true; + + startGridsMultiThreaded(3, 2); + + ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(5, 0, 1); + ZkTestCommunicationSpi.forNode(ignite(2)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.forNode(ignite(3)).initCheckResult(5, 2, 3, 4); + ZkTestCommunicationSpi.forNode(ignite(4)).initCheckResult(5, 2, 3, 4); + + ZookeeperDiscoverySpi spi = spi(ignite(0)); + + spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()), new Exception("test")); + + waitForTopology(2); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultCommunicationErrorResolver3() throws Exception { + sesTimeout = 5000; + + startGridsMultiThreaded(3); + + info("Close communication"); + + ((TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).simulateNodeFailure(); + + waitForTopology(2); + } + + /** + * @throws Exception If failed. + */ public void testConnectionCheck() throws Exception { final int NODES = 5; @@ -2832,6 +2842,17 @@ public class ZookeeperDiscoverySpiBasicTest extends GridCommonAbstractTest { return (ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi(); } + /** + * @param nodes Number of nodes. + * @param setBitIdxs Bits indexes to set in check result. + */ + void initCheckResult(int nodes, Integer... setBitIdxs) { + checkRes = new BitSet(nodes); + + for (Integer bitIdx : setBitIdxs) + checkRes.set(bitIdx); + } + /** {@inheritDoc} */ @Override public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { CountDownLatch pingLatch = this.pingLatch; http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 8afbdb7..4b62534 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -2188,7 +2188,7 @@ public abstract class GridAbstractTest extends TestCase { return true; } - }, 15_000)); + }, 30_000)); } /**
