Repository: ignite
Updated Branches:
  refs/heads/ignite-zk 69aedc5b5 -> fdd2c530a


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdd2c530
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdd2c530
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdd2c530

Branch: refs/heads/ignite-zk
Commit: fdd2c530a370c815226d5eb60fc28e0c28325f65
Parents: 69aedc5
Author: sboikov <[email protected]>
Authored: Mon Dec 25 16:36:42 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Dec 25 16:59:10 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 34 ++--------
 .../ZookeeperDiscoverySpiBasicTest.java         | 71 ++++++++++++++++++--
 2 files changed, 71 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd2c530/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index d7e0a76..04eb607 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -32,7 +32,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -480,7 +479,7 @@ public class ZookeeperDiscoveryImpl {
     private void onSegmented(Exception e) {
         rtState.errForClose = e;
 
-        if (rtState.joined) {
+        if (rtState.joined || joinFut.isDone()) {
             synchronized (stateMux) {
                 connState = ConnectionState.STOPPED;
             }
@@ -495,12 +494,15 @@ public class ZookeeperDiscoveryImpl {
      *
      */
     private void notifySegmented() {
-        assert rtState.evtsData != null;
+        List<ClusterNode> nodes = rtState.top.topologySnapshot();
+
+        if (nodes.isEmpty())
+            nodes = Collections.singletonList((ClusterNode)locNode);
 
         lsnr.onDiscovery(EventType.EVT_NODE_SEGMENTED,
-            rtState.evtsData.topVer,
+            rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
             locNode,
-            rtState.top.topologySnapshot(),
+            nodes,
             Collections.<Long, Collection<ClusterNode>>emptyMap(),
             null);
     }
@@ -992,9 +994,6 @@ public class ZookeeperDiscoveryImpl {
         catch (IgniteCheckedException | ZookeeperClientFailedException e) {
             throw new IgniteSpiException("Failed to initialize Zookeeper 
nodes", e);
         }
-        finally {
-            connStartLatch.countDown();
-        }
     }
 
     /**
@@ -1196,18 +1195,6 @@ public class ZookeeperDiscoveryImpl {
         }
     }
 
-    /** TODO ZK */
-    private final CountDownLatch connStartLatch = new CountDownLatch(1);
-
-    /**
-     * For testing only.
-     *
-     * @throws Exception If failed.
-     */
-    void waitConnectStart() throws Exception {
-        connStartLatch.await();
-    }
-
     /**
      * @param aliveNodes Alive nodes.
      * @throws Exception If failed.
@@ -3846,13 +3833,6 @@ public class ZookeeperDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override void onPreviousNodeFail() {
-            // TODO ZK:
-//        if (locInternalId == crdInternalId + 1) {
-//            if (log.isInfoEnabled())
-//                log.info("Previous discovery coordinator failed [locId=" + 
locNode.id() + ']');
-//
-//            onBecomeCoordinator(aliveNodes, locInternalId);
-//        }
             if (log.isInfoEnabled())
                 log.info("Previous server node failed, check is node new 
coordinator [locId=" + locNode.id() + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdd2c530/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index b31555e..a542a7a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -157,6 +157,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     private long joinTimeout;
 
     /** */
+    private boolean clientReconnectDisabled;
+
+    /** */
     private ConcurrentHashMap<String, ZookeeperDiscoverySpi> spis = new 
ConcurrentHashMap<>();
 
     /** */
@@ -200,6 +203,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         zkSpi.setSessionTimeout(sesTimeout > 0 ? sesTimeout : 10_000);
 
+        zkSpi.setClientReconnectDisabled(clientReconnectDisabled);
+
         // Set authenticator for basic sanity tests.
         if (auth != null) {
             zkSpi.setAuthenticator(auth.apply());
@@ -821,7 +826,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         final CountDownLatch l = new CountDownLatch(1);
 
         node0.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event event) {
+            @Override public boolean apply(Event evt) {
                 l.countDown();
 
                 return false;
@@ -857,7 +862,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         final CountDownLatch l = new CountDownLatch(1);
 
         node0.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event event) {
+            @Override public boolean apply(Event evt) {
                 l.countDown();
 
                 return false;
@@ -887,7 +892,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         final CountDownLatch l = new CountDownLatch(1);
 
         node0.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event event) {
+            @Override public boolean apply(Event evt) {
                 l.countDown();
 
                 return false;
@@ -1085,7 +1090,7 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      * @param failCnt Number of nodes to stop after coordinator loose 
connection.
      * @throws Exception If failed.
      */
-    private void connectionRestore_Coordinator(int initNodes, int startNodes, 
int failCnt) throws Exception {
+    private void connectionRestore_Coordinator(final int initNodes, int 
startNodes, int failCnt) throws Exception {
         sesTimeout = 30_000;
         testSockNio = true;
 
@@ -1123,11 +1128,15 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
         final List<String> failedZkNodes = new ArrayList<>(failCnt);
 
         for (int i = initNodes; i < initNodes + startNodes; i++) {
-            ZookeeperDiscoverySpi spi = waitSpi(getTestIgniteInstanceName(i));
+            final ZookeeperDiscoverySpi spi = 
waitSpi(getTestIgniteInstanceName(i));
 
-            ZookeeperDiscoveryImpl impl = GridTestUtils.getFieldValue(spi, 
"impl");
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    long internalOrder = GridTestUtils.getFieldValue(spi, 
"impl", "rtState", "internalOrder");
 
-            impl.waitConnectStart();
+                    return internalOrder > 0;
+                }
+            }, 10_000));
 
             if (cnt++ < failCnt) {
                 ZkTestClientCnxnSocketNIO c = 
ZkTestClientCnxnSocketNIO.forNode(getTestIgniteInstanceName(i));
@@ -2581,6 +2590,45 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testServersLeft_FailOnTimeout() throws Exception {
+        startGrid(0);
+
+        final int CLIENTS = 5;
+
+        joinTimeout = 3000;
+
+        clientMode(true);
+
+        startGridsMultiThreaded(1, CLIENTS);
+
+        waitForTopology(CLIENTS + 1);
+
+        final CountDownLatch l = new CountDownLatch(CLIENTS);
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite node = ignite(i + 1);
+
+            node.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    info("Segmented!");
+
+                    l.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_SEGMENTED);
+        }
+
+        stopGrid(getTestIgniteInstanceName(0), true, false);
+
+        assertTrue(l.await(10, SECONDS));
+
+        evts.clear();
+    }
+
+    /**
      *
      */
     public void testStartNoServers_FailOnTimeout() {
@@ -2679,6 +2727,15 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDisconnectOnServersLeft_5() throws Exception {
+        joinTimeout = 10_000;
+
+        disconnectOnServersLeft(5, 10);
+    }
+
+    /**
      * @param srvs Number of servers.
      * @param clients Number of clients.
      * @throws Exception If failed.

Reply via email to