zk

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e419ec83
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e419ec83
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e419ec83

Branch: refs/heads/ignite-zk
Commit: e419ec83df5a986a968cd5a63b5b1bb8cf76673f
Parents: 4525b92
Author: sboikov <[email protected]>
Authored: Tue Dec 19 14:51:43 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue Dec 19 15:00:39 2017 +0300

----------------------------------------------------------------------
 .../DefaultCommunicationProblemResolver.java    | 45 +++++++---
 .../zk/internal/ZookeeperDiscoveryImpl.java     |  9 +-
 .../ZookeeperDiscoverySpiBasicTest.java         | 89 ++++++++++++--------
 .../testframework/junits/GridAbstractTest.java  |  2 +-
 4 files changed, 95 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
index 1e973d3..ed2ddb8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/DefaultCommunicationProblemResolver.java
@@ -20,6 +20,7 @@ package org.apache.ignite.configuration;
 import java.util.BitSet;
 import java.util.List;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /**
  *
@@ -44,6 +45,27 @@ public class DefaultCommunicationProblemResolver implements 
CommunicationProblem
     /**
      *
      */
+    private static class ClusterSearch {
+        /** */
+        int srvCnt;
+
+        /** */
+        int nodeCnt;
+
+        /** */
+        final BitSet nodesBitSet;
+
+        /**
+         * @param nodes Total nodes.
+         */
+        ClusterSearch(int nodes) {
+            nodesBitSet = new BitSet(nodes);
+        }
+    }
+
+    /**
+     *
+     */
     private static class ClusterGraph {
         /** */
         private final static int WORD_IDX_SHIFT = 6;
@@ -95,26 +117,21 @@ public class DefaultCommunicationProblemResolver 
implements CommunicationProblem
          * @return Cluster nodes bit set.
          */
         BitSet findLargestIndependentCluster() {
-            BitSet maxCluster = null;
-            int maxClusterSize = 0;
+            ClusterSearch maxCluster = null;
 
             for (int i = 0; i < nodeCnt; i++) {
                 if (getBit(visitBitSet, i))
                     continue;
 
-                BitSet cluster = new BitSet(nodeCnt);
+                ClusterSearch cluster = new ClusterSearch(nodeCnt);
 
                 search(cluster, i);
 
-                int size = cluster.cardinality();
-
-                if (maxCluster == null || size > maxClusterSize) {
+                if (maxCluster == null || cluster.srvCnt > maxCluster.srvCnt)
                     maxCluster = cluster;
-                    maxClusterSize = size;
-                }
             }
 
-            return maxCluster;
+            return maxCluster != null ? maxCluster.nodesBitSet : null;
         }
 
         /**
@@ -154,13 +171,19 @@ public class DefaultCommunicationProblemResolver 
implements CommunicationProblem
          * @param cluster Current cluster bit set.
          * @param idx Node index.
          */
-        void search(BitSet cluster, int idx) {
+        void search(ClusterSearch cluster, int idx) {
+            assert !getBit(visitBitSet, idx);
+
             setBit(visitBitSet, idx);
 
-            cluster.set(idx);
+            cluster.nodesBitSet.set(idx);
+            cluster.nodeCnt++;
 
             ClusterNode node1 = nodes.get(idx);
 
+            if (!CU.clientNode(node1))
+                cluster.srvCnt++;
+
             for (int i = 0; i < nodeCnt; i++) {
                 if (i == idx || getBit(visitBitSet, i))
                     continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index f1ad869..7032fd8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -2088,7 +2088,7 @@ public class ZookeeperDiscoveryImpl {
         assert node != null : msg.nodeId;
 
         if (node.isLocal())
-            throw localNodeFail("Received force EVT_NODE_FAILED event for 
local node.");
+            throw localNodeFail("Received force EVT_NODE_FAILED event for 
local node.", true);
         else
             notifyNodeFail(node.internalId(), evtData.topologyVersion());
     }
@@ -2137,7 +2137,7 @@ public class ZookeeperDiscoveryImpl {
                             deleteAliveNodes(res.killedNodes);
 
                         throw localNodeFail("Local node is forced to stop by 
communication error resolver " +
-                            "[nodeId=" + locNode.id() + ']');
+                            "[nodeId=" + locNode.id() + ']', false);
                     }
 
                     ZookeeperClusterNode node = 
rtState.top.nodesByInternalId.get(internalId);
@@ -2527,14 +2527,15 @@ public class ZookeeperDiscoveryImpl {
 
     /**
      * @param msg Message to log.
+     * @param clientReconnect {@code True} if allow client reconnect.
      * @return Exception to be thrown.
      */
-    private Exception localNodeFail(String msg) {
+    private Exception localNodeFail(String msg, boolean clientReconnect) {
         U.warn(log, msg);
 
         rtState.onCloseStart();
 
-        if (clientReconnectEnabled) {
+        if (clientReconnect && clientReconnectEnabled) {
             assert locNode.isClient() : locNode;
 
             boolean reconnect = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 44e48f9..0630af4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -1718,23 +1718,6 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * TODO ZK
-     *
-     * @throws Exception If failed.
-     */
-    public void _testCommunicationFailure() throws Exception {
-        Ignite srv0 = startGrid(0);
-
-        Ignite srv1 = startGrid(1);
-
-        info("Close communication");
-
-        
((TcpCommunicationSpi)srv1.configuration().getCommunicationSpi()).simulateNodeFailure();
-
-        Thread.sleep(60_000);
-    }
-
-    /**
      * @throws Exception If failed.
      */
     public void testPing() throws Exception {
@@ -2035,23 +2018,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         startGrids(3);
 
-        {
-            ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.forNode(ignite(0));
-            commSpi.checkRes = new BitSet(3);
-            commSpi.checkRes.set(0);
-            commSpi.checkRes.set(1);
-        }
-        {
-            ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.forNode(ignite(1));
-            commSpi.checkRes = new BitSet(3);
-            commSpi.checkRes.set(0);
-            commSpi.checkRes.set(1);
-        }
-        {
-            ZkTestCommunicationSpi commSpi = 
ZkTestCommunicationSpi.forNode(ignite(2));
-            commSpi.checkRes = new BitSet(3);
-            commSpi.checkRes.set(2);
-        }
+        ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(3, 0, 1);
+        ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(3, 2);
 
         UUID killedId = nodeId(2);
 
@@ -2069,6 +2038,47 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testDefaultCommunicationErrorResolver2() throws Exception {
+        testCommSpi = true;
+        sesTimeout = 5000;
+
+        startGrids(3);
+
+        client = true;
+
+        startGridsMultiThreaded(3, 2);
+
+        ZkTestCommunicationSpi.forNode(ignite(0)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.forNode(ignite(1)).initCheckResult(5, 0, 1);
+        ZkTestCommunicationSpi.forNode(ignite(2)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.forNode(ignite(3)).initCheckResult(5, 2, 3, 4);
+        ZkTestCommunicationSpi.forNode(ignite(4)).initCheckResult(5, 2, 3, 4);
+
+        ZookeeperDiscoverySpi spi = spi(ignite(0));
+
+        
spi.resolveCommunicationError(spi.getNode(ignite(1).cluster().localNode().id()),
 new Exception("test"));
+
+        waitForTopology(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultCommunicationErrorResolver3() throws Exception {
+        sesTimeout = 5000;
+
+        startGridsMultiThreaded(3);
+
+        info("Close communication");
+
+        
((TcpCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).simulateNodeFailure();
+
+        waitForTopology(2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConnectionCheck() throws Exception {
        final int NODES = 5;
 
@@ -2832,6 +2842,17 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
             return 
(ZkTestCommunicationSpi)ignite.configuration().getCommunicationSpi();
         }
 
+        /**
+         * @param nodes Number of nodes.
+         * @param setBitIdxs Bits indexes to set in check result.
+         */
+        void initCheckResult(int nodes, Integer... setBitIdxs) {
+            checkRes = new BitSet(nodes);
+
+            for (Integer bitIdx : setBitIdxs)
+                checkRes.set(bitIdx);
+        }
+
         /** {@inheritDoc} */
         @Override public IgniteFuture<BitSet> 
checkConnection(List<ClusterNode> nodes) {
             CountDownLatch pingLatch = this.pingLatch;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e419ec83/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 8afbdb7..4b62534 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -2188,7 +2188,7 @@ public abstract class GridAbstractTest extends TestCase {
 
                 return true;
             }
-        }, 15_000));
+        }, 30_000));
     }
 
     /**

Reply via email to