Repository: ignite
Updated Branches:
  refs/heads/ignite-zk a3a625699 -> 8735efda6


zk


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8735efda
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8735efda
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8735efda

Branch: refs/heads/ignite-zk
Commit: 8735efda660e707cb0c50e83d15891650eeccc78
Parents: a3a6256
Author: sboikov <[email protected]>
Authored: Wed Dec 20 12:57:40 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Dec 20 13:21:47 2017 +0300

----------------------------------------------------------------------
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 14 ++++
 .../ZookeeperDiscoverySpiBasicTest.java         | 80 +++++++++++++++++++-
 2 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8735efda/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index c54d7c6..91d8e3a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -1209,6 +1209,17 @@ public class ZookeeperDiscoveryImpl {
             Integer internalId = e.getKey();
 
             if (!rtState.top.nodesByInternalId.containsKey(internalId)) {
+                UUID rslvFutId = 
rtState.evtsData.communicationErrorResolveFutureId();
+
+                if (rslvFutId != null) {
+                    if (log.isInfoEnabled()) {
+                        log.info("Delay alive nodes change process while 
communication error resolve " +
+                            "is in progress [reqId=" + rslvFutId + ']');
+                    }
+
+                    break;
+                }
+
                 if (processJoinOnCoordinator(curTop, internalId, 
e.getValue())) {
                     newEvts++;
 
@@ -2467,6 +2478,9 @@ public class ZookeeperDiscoveryImpl {
         evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);
 
         saveAndProcessNewEvents();
+
+        // Need re-check alive nodes in case join was delayed.
+        rtState.zkClient.getChildrenAsync(zkPaths.aliveNodesDir, 
rtState.watcher, rtState.watcher);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8735efda/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
index 6d61ac2..829d3a8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiBasicTest.java
@@ -94,6 +94,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
@@ -1841,10 +1842,11 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Tests case when one node fails before sending communication status.
+     *
      * @throws Exception If failed.
      */
     public void testNoOpCommunicationErrorResolve_3() throws Exception {
-        // One node fails before sending communication status.
         sesTimeout = 2000;
         commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
 
@@ -1886,10 +1888,11 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Tests case when Coordinator fails while resolve process is in progress.
+     *
      * @throws Exception If failed.
      */
     public void testNoOpCommunicationErrorResolve_4() throws Exception {
-        // Coordinator fails while resolve process is in progress.
         testCommSpi = true;
 
         sesTimeout = 2000;
@@ -1927,6 +1930,69 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Tests that nodes join is delayed while resolve is in progress.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNoOpCommunicationErrorResolve_5() throws Exception {
+        testCommSpi = true;
+
+        sesTimeout = 2000;
+        commProblemRslvr = NoOpCommunicationProblemResolver.FACTORY;
+
+        startGrid(0);
+
+        startGridsMultiThreaded(1, 3);
+
+        ZkTestCommunicationSpi commSpi = ZkTestCommunicationSpi.spi(ignite(3));
+
+        commSpi.pingStartLatch = new CountDownLatch(1);
+        commSpi.pingLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
+            @Override public Object call() {
+                ZookeeperDiscoverySpi spi = spi(ignite(1));
+
+                spi.resolveCommunicationError(ignite(2).cluster().localNode(), 
new Exception("test"));
+
+                return null;
+            }
+        });
+
+        assertTrue(commSpi.pingStartLatch.await(10, SECONDS));
+
+        try {
+            assertFalse(fut.isDone());
+
+            final AtomicInteger nodeIdx = new AtomicInteger(3);
+
+            IgniteInternalFuture<?> startFut = 
GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    startGrid(nodeIdx.incrementAndGet());
+
+                    return null;
+                }
+            }, 3, "start-node");
+
+            U.sleep(1000);
+
+            assertFalse(startFut.isDone());
+
+            assertEquals(4, ignite(0).cluster().nodes().size());
+
+            commSpi.pingLatch.countDown();
+
+            startFut.get();
+            fut.get();
+
+            waitForTopology(7);
+        }
+        finally {
+            commSpi.pingLatch.countDown();
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testCommunicationErrorResolve_KillNode_1() throws Exception {
@@ -2099,6 +2165,8 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
                 spi = spi(node);
             }
 
+            assert spi != null;
+
             try {
                 
spi.resolveCommunicationError(spi.getRemoteNodes().iterator().next(), new 
Exception("test"));
             }
@@ -3028,6 +3096,9 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
      */
     static class ZkTestCommunicationSpi extends TcpCommunicationSpi {
         /** */
+        private volatile CountDownLatch pingStartLatch;
+
+        /** */
         private volatile CountDownLatch pingLatch;
 
         /** */
@@ -3054,6 +3125,11 @@ public class ZookeeperDiscoverySpiBasicTest extends 
GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public IgniteFuture<BitSet> 
checkConnection(List<ClusterNode> nodes) {
+            CountDownLatch pingStartLatch = this.pingStartLatch;
+
+            if (pingStartLatch != null)
+                pingStartLatch.countDown();
+
             CountDownLatch pingLatch = this.pingLatch;
 
             try {

Reply via email to