This is an automated email from the ASF dual-hosted git repository.

jokser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 037b38e  IGNITE-11852 Fixed several coordinator failure issues during 
PME. - Fixes #6539.
037b38e is described below

commit 037b38e43b2698407554e2c94a7676250fa6af59
Author: Pavel Kovalenko <[email protected]>
AuthorDate: Tue Oct 15 17:59:15 2019 +0300

    IGNITE-11852 Fixed several coordinator failure issues during PME. - Fixes 
#6539.
    
    Signed-off-by: Pavel Kovalenko <[email protected]>
---
 .../cache/CacheAffinitySharedManager.java          |  24 ++-
 .../cache/GridCachePartitionExchangeManager.java   |  12 +-
 .../preloader/GridDhtPartitionsExchangeFuture.java |  10 +-
 .../dht/preloader/InitNewCoordinatorFuture.java    |  23 ++-
 .../PartitionsExchangeCoordinatorFailoverTest.java | 203 ++++++++++++++++++---
 5 files changed, 233 insertions(+), 39 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f00b6b1..2e54a30 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1532,7 +1532,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 CacheGroupContext grp = 
cctx.cache().cacheGroup(holder.groupId());
 
                 if (affReq != null && affReq.contains(aff.groupId())) {
-                    assert 
AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion();
+                    assert resTopVer.compareTo(aff.lastVersion()) >= 0 : 
aff.lastVersion();
 
                     CacheGroupAffinityMessage affMsg = 
receivedAff.get(aff.groupId());
 
@@ -1702,7 +1702,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @param fut Current exchange future.
      * @return Computed difference with ideal affinity.
      */
-    private Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(
+    public Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(
         final GridDhtPartitionsExchangeFuture fut) {
         final ExchangeDiscoveryEvents evts = fut.context().events();
 
@@ -1710,12 +1710,16 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 AffinityTopologyVersion topVer = evts.topologyVersion();
 
-                CacheGroupHolder cache = getOrCreateGroupHolder(topVer, desc);
+                CacheGroupHolder grpHolder = getOrCreateGroupHolder(topVer, 
desc);
+
+                // Already calculated.
+                if (grpHolder.affinity().lastVersion().equals(topVer))
+                    return;
 
-                List<List<ClusterNode>> assign = 
cache.affinity().calculate(topVer, evts, evts.discoveryCache()).assignment();
+                List<List<ClusterNode>> assign = 
grpHolder.affinity().calculate(topVer, evts, 
evts.discoveryCache()).assignment();
 
-                if (!cache.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
-                    cache.affinity().initialize(topVer, assign);
+                if (!grpHolder.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
+                    grpHolder.affinity().initialize(topVer, assign);
 
                 fut.timeBag().finishLocalStage("Affinity initialization 
(enforced) " +
                     "[grp=" + desc.cacheOrGroupName() + "]");
@@ -2196,7 +2200,13 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 CacheGroupHolder grpHolder = 
getOrCreateGroupHolder(evts.topologyVersion(), desc);
 
-                boolean latePrimary = grpHolder.rebalanceEnabled;
+                CacheGroupHolder cache = 
getOrCreateGroupHolder(evts.topologyVersion(), desc);
+
+                // Already calculated.
+                if 
(cache.affinity().lastVersion().equals(evts.topologyVersion()))
+                    return;
+
+                boolean latePrimary = cache.rebalanceEnabled;
 
                 boolean grpAdded = evts.nodeJoined(desc.receivedFrom());
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 19299a4..dd28286 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2532,7 +2532,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @param curFut Current active exchange future.
      * @return {@code False} if need wait messages for merged exchanges.
      */
-    public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture 
curFut) {
+    public boolean mergeExchangesOnCoordinator(
+        GridDhtPartitionsExchangeFuture curFut,
+        @Nullable AffinityTopologyVersion threshold
+    ) {
         if (IGNITE_EXCHANGE_MERGE_DELAY > 0) {
             try {
                 U.sleep(IGNITE_EXCHANGE_MERGE_DELAY);
@@ -2558,6 +2561,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                     DiscoveryEvent evt = fut.firstEvent();
 
+                    if (threshold != null && 
fut.initialVersion().compareTo(threshold) > 0) {
+                        if (log.isInfoEnabled())
+                            log.info("Stop merge, threshold is exceed: " + evt 
+ ", threshold = " + threshold);
+
+                        break;
+                    }
+
                     if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
                         if (log.isInfoEnabled())
                             log.info("Stop merge, custom event found: " + evt);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a1fc523..88399e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3385,7 +3385,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (log.isInfoEnabled())
                     log.info("Coordinator received all messages, try merge 
[ver=" + initialVersion() + ']');
 
-                boolean finish = 
cctx.exchange().mergeExchangesOnCoordinator(this);
+                AffinityTopologyVersion threshold = newCrdFut != null ? 
newCrdFut.resultTopologyVersion() : null;
+
+                if (threshold != null) {
+                    assert newCrdFut.fullMessage() == null :
+                        "There is full message in new coordinator future, but 
exchange was not finished using it: "
+                        + newCrdFut.fullMessage();
+                }
+
+                boolean finish = 
cctx.exchange().mergeExchangesOnCoordinator(this, threshold);
 
                 timeBag.finishGlobalStage("Exchanges merge");
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index f311817..1ae3aea 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -75,6 +75,9 @@ public class InitNewCoordinatorFuture extends 
GridCompoundFuture implements Igni
     private AffinityTopologyVersion initTopVer;
 
     /** */
+    private AffinityTopologyVersion resTopVer;
+
+    /** */
     private Map<UUID, GridDhtPartitionExchangeId> joinedNodes;
 
     /** */
@@ -223,6 +226,15 @@ public class InitNewCoordinatorFuture extends 
GridCompoundFuture implements Igni
     }
 
     /**
+     * @return Result topology version from nodes that already finished this 
exchange.
+     */
+    AffinityTopologyVersion resultTopologyVersion() {
+        synchronized (this) {
+            return resTopVer;
+        }
+    }
+
+    /**
      * @param node Node.
      * @param msg Message.
      */
@@ -242,9 +254,16 @@ public class InitNewCoordinatorFuture extends 
GridCompoundFuture implements Igni
                 GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
 
                 if (fullMsg0 != null && fullMsg0.resultTopologyVersion() != 
null) {
-                    assert fullMsg == null || 
fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
+                    if (node.isClient() || node.isDaemon()) {
+                        assert resTopVer == null || 
resTopVer.equals(fullMsg0.resultTopologyVersion());
+
+                        resTopVer = fullMsg0.resultTopologyVersion();
+                    }
+                    else {
+                        assert fullMsg == null || 
fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
 
-                    fullMsg = fullMsg0;
+                        fullMsg = fullMsg0;
+                    }
                 }
                 else
                     msgs.put(node, msg);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
index 6c1d85b..38fb088 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -34,6 +37,7 @@ import 
org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.G;
@@ -42,9 +46,9 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -56,8 +60,17 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
     /** */
     private static final String CACHE_NAME = "cache";
 
+    /** Coordinator node name. */
+    private static final String CRD_NONE = "crd";
+
+    /** */
+    private volatile Supplier<TcpCommunicationSpi> spiFactory = 
TcpCommunicationSpi::new;
+
     /** */
-    private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+    private boolean newCaches = true;
+
+    /** Node client mode. */
+    private boolean client;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
@@ -65,7 +78,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
 
         cfg.setConsistentId(igniteInstanceName);
 
-        cfg.setCommunicationSpi(spiFactory.get());
+        cfg.setCommunicationSpi(spiFactory.get().setName("tcp"));
 
         cfg.setCacheConfiguration(
                 new CacheConfiguration(CACHE_NAME)
@@ -74,7 +87,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
         );
 
         // Add cache that exists only on coordinator node.
-        if (igniteInstanceName.equals("crd")) {
+        if (newCaches && igniteInstanceName.equals(CRD_NONE)) {
             IgnitePredicate<ClusterNode> nodeFilter = node -> 
node.consistentId().equals(igniteInstanceName);
 
             cfg.setCacheConfiguration(
@@ -85,6 +98,8 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
             );
         }
 
+        cfg.setClientMode(client);
+
         return cfg;
     }
 
@@ -107,7 +122,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
     public void testNewCoordinatorCompletedExchange() throws Exception {
         spiFactory = TestRecordingCommunicationSpi::new;
 
-        IgniteEx crd = (IgniteEx) startGrid("crd");
+        IgniteEx crd = startGrid(CRD_NONE);
 
         IgniteEx newCrd = startGrid(1);
 
@@ -122,13 +137,13 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
         // Block FullMessage for newly joined nodes.
         TestRecordingCommunicationSpi spi = 
TestRecordingCommunicationSpi.spi(crd);
 
-        final CountDownLatch sendFullMsgLatch = new CountDownLatch(1);
+        final CountDownLatch sndFullMsgLatch = new CountDownLatch(1);
 
         // Delay sending full message to newly joined nodes.
         spi.blockMessages((node, msg) -> {
             if (msg instanceof GridDhtPartitionsFullMessage && node.order() > 
2) {
                 try {
-                    sendFullMsgLatch.await();
+                    sndFullMsgLatch.await();
                 }
                 catch (Throwable ignored) { }
 
@@ -156,13 +171,13 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
             getTestTimeout()
         );
 
-        IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> 
stopGrid("crd", true, false));
+        IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> 
stopGrid(CRD_NONE, true, false));
 
         // Magic sleep to make sure that coordinator stop process has started.
         U.sleep(1000);
 
         // Resume full messages sending to unblock coordinator stopping 
process.
-        sendFullMsgLatch.countDown();
+        sndFullMsgLatch.countDown();
 
         // Coordinator stop should succeed.
         stopCrdFut.get();
@@ -193,7 +208,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
     public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws 
Exception {
         spiFactory = TestRecordingCommunicationSpi::new;
 
-        IgniteEx crd = startGrid("crd");
+        IgniteEx crd = startGrid(CRD_NONE);
 
         IgniteEx newCrd = startGrid(1);
 
@@ -203,7 +218,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
 
         awaitPartitionMapExchange();
 
-        blockSendingFullMessage(crd, problemNode);
+        blockSendingFullMessage(crd, node -> 
node.equals(problemNode.localNode()));
 
         IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() -> 
startGrid(3));
 
@@ -211,11 +226,11 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
 
         U.sleep(5000);
 
-        blockSendingFullMessage(newCrd, problemNode);
+        blockSendingFullMessage(newCrd, node -> 
node.equals(problemNode.localNode()));
 
-        IgniteInternalFuture stopCoordinatorFut = GridTestUtils.runAsync(() -> 
stopGrid("crd"));
+        IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> 
stopGrid(CRD_NONE));
 
-        stopCoordinatorFut.get();
+        stopCrdFut.get();
 
         U.sleep(5000);
 
@@ -238,9 +253,9 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
             final int delay = 5_000;
 
             if (msg instanceof GridDhtPartitionDemandMessage) {
-                GridDhtPartitionDemandMessage demandMessage = 
(GridDhtPartitionDemandMessage) msg;
+                GridDhtPartitionDemandMessage demandMsg = 
(GridDhtPartitionDemandMessage) msg;
 
-                if (demandMessage.groupId() == 
GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+                if (demandMsg.groupId() == 
GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
                     return 0;
 
                 return delay;
@@ -249,7 +264,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
             return 0;
         });
 
-        final IgniteEx crd = startGrid("crd");
+        final IgniteEx crd = startGrid(CRD_NONE);
 
         startGrid(1);
 
@@ -294,7 +309,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
         U.sleep(2_500);
 
         // And then stop coordinator node.
-        stopGrid("crd", true);
+        stopGrid(CRD_NONE, true);
 
         startNodeFut.get();
 
@@ -315,11 +330,143 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
     }
 
     /**
+     * Test checks that changing coordinator to a node that joining to cluster 
at the moment works correctly
+     * in case of exchanges merge and completed exchange on other joining 
nodes.
+     */
+    @Test
+    @WithSystemProperty(key = 
IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = 
"true")
+    public void testChangeCoordinatorToLocallyJoiningNode() throws Exception {
+        newCaches = false;
+
+        spiFactory = TestRecordingCommunicationSpi::new;
+
+        IgniteEx crd = startGrid(CRD_NONE);
+
+        final int newCrdNodeIdx = 1;
+
+        // A full message shouldn't be send to new coordinator.
+        blockSendingFullMessage(crd, node -> 
node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
+
+        CountDownLatch joiningNodeSentSingleMsg = new CountDownLatch(1);
+
+        // For next joining node delay sending single message to emulate 
exchanges merge.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 5_000;
+
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = 
(GridDhtPartitionsSingleMessage) msg;
+
+                if (singleMsg.exchangeId() != null) {
+                    joiningNodeSentSingleMsg.countDown();
+
+                    return delay;
+                }
+            }
+
+            return 0;
+        });
+
+        IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() -> 
startGrid(newCrdNodeIdx));
+
+        // Wait till new coordinator node sent single message.
+        joiningNodeSentSingleMsg.await();
+
+        spiFactory = TcpCommunicationSpi::new;
+
+        // Additionally start 2 new nodes. Their exchange should be merged 
with exchange on join new coordinator node.
+        startGridsMultiThreaded(2, 2);
+
+        Assert.assertFalse("New coordinator join shouldn't be happened before 
stopping old coordinator.",
+            newCrdJoinFut.isDone());
+
+        // Stop coordinator.
+        stopGrid(CRD_NONE);
+
+        // New coordinator join process should succeed after that.
+        newCrdJoinFut.get();
+
+        awaitPartitionMapExchange();
+
+        // Check that affinity are equal on all nodes.
+        AffinityTopologyVersion affVer = ((IgniteEx) 
ignite(1)).cachex(CACHE_NAME)
+            .context().shared().exchange().readyAffinityVersion();
+
+        List<List<ClusterNode>> expAssignment = null;
+        IgniteEx expAssignmentNode = null;
+
+        for (Ignite node : G.allGrids()) {
+            IgniteEx nodeEx = (IgniteEx) node;
+
+            List<List<ClusterNode>> assignment = 
nodeEx.cachex(CACHE_NAME).context().affinity().assignments(affVer);
+
+            if (expAssignment == null) {
+                expAssignment = assignment;
+                expAssignmentNode = nodeEx;
+            }
+            else
+                Assert.assertEquals("Affinity assignments are different " +
+                    "[expectedNode=" + expAssignmentNode + ", actualNode=" + 
nodeEx + "]", expAssignment, assignment);
+        }
+    }
+
+    /**
+     * Test checks that changing coordinator to a node that joining to cluster 
at the moment works correctly
+     * in case of completed exchange on client nodes.
+     */
+    @Test
+    @WithSystemProperty(key = 
IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = 
"true")
+    public void testChangeCoordinatorToLocallyJoiningNode2() throws Exception {
+        newCaches = false;
+
+        spiFactory = TestRecordingCommunicationSpi::new;
+
+        IgniteEx crd = startGrid(CRD_NONE);
+
+        client = true;
+
+        // Start several clients.
+        IgniteEx clientNode = (IgniteEx) startGridsMultiThreaded(2, 2);
+
+        client = false;
+
+        awaitPartitionMapExchange();
+
+        final int newCrdNodeIdx = 1;
+
+        // A full message shouldn't be send to new coordinator.
+        blockSendingFullMessage(crd, node -> 
node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
+
+        IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() -> 
startGrid(newCrdNodeIdx));
+
+        // Wait till client node will receive full message and finish exchange 
on node join.
+        GridTestUtils.waitForCondition(() -> {
+                GridDhtPartitionsExchangeFuture fut = 
clientNode.cachex(CACHE_NAME)
+                    .context().shared().exchange().lastFinishedFuture();
+
+                return fut != null && fut.topologyVersion().equals(new 
AffinityTopologyVersion(4, 0));
+            }, 60_000
+        );
+
+        Assert.assertFalse("New coordinator join shouldn't be happened before 
stopping old coordinator.",
+            newCrdJoinFut.isDone());
+
+        // Stop coordinator.
+        stopGrid(CRD_NONE);
+
+        // New coordinator join process should succeed after that.
+        newCrdJoinFut.get();
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
      * Blocks sending full message from coordinator to non-coordinator node.
+     *
      * @param from Coordinator node.
-     * @param to Non-coordinator node.
+     * @param pred Non-coordinator node predicate.
+     *                  If predicate returns {@code true} a full message will 
not be send to that node.
      */
-    private void blockSendingFullMessage(IgniteEx from, IgniteEx to) {
+    private void blockSendingFullMessage(IgniteEx from, Predicate<ClusterNode> 
pred) {
         // Block FullMessage for newly joined nodes.
         TestRecordingCommunicationSpi spi = 
TestRecordingCommunicationSpi.spi(from);
 
@@ -328,8 +475,8 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
             if (msg instanceof GridDhtPartitionsFullMessage) {
                 GridDhtPartitionsFullMessage fullMsg = 
(GridDhtPartitionsFullMessage) msg;
 
-                if (fullMsg.exchangeId() != null && node.order() == 
to.localNode().order()) {
-                    log.warning("Blocked sending " + msg + " to " + 
to.localNode());
+                if (fullMsg.exchangeId() != null && pred.test(node)) {
+                    log.warning("Blocked sending " + msg + " to " + node);
 
                     return true;
                 }
@@ -342,9 +489,9 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
     /**
      * Communication SPI that allows to delay sending message by predicate.
      */
-    class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
+    static class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
         /** Function that returns delay in milliseconds for given message. */
-        private final Function<Message, Integer> delayMessageFunc;
+        private final Function<Message, Integer> delayMsgFunc;
 
         /** */
         DynamicDelayingCommunicationSpi() {
@@ -352,10 +499,10 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
         }
 
         /**
-         * @param delayMessageFunc Function to calculate delay for message.
+         * @param delayMsgFunc Function to calculate delay for message.
          */
-        DynamicDelayingCommunicationSpi(final Function<Message, Integer> 
delayMessageFunc) {
-            this.delayMessageFunc = delayMessageFunc;
+        DynamicDelayingCommunicationSpi(final Function<Message, Integer> 
delayMsgFunc) {
+            this.delayMsgFunc = delayMsgFunc;
         }
 
         /** {@inheritDoc} */
@@ -364,7 +511,7 @@ public class PartitionsExchangeCoordinatorFailoverTest 
extends GridCommonAbstrac
             try {
                 GridIoMessage ioMsg = (GridIoMessage)msg;
 
-                int delay = delayMessageFunc.apply(ioMsg.message());
+                int delay = delayMsgFunc.apply(ioMsg.message());
 
                 if (delay > 0) {
                     log.warning(String.format("Delay sending %s to %s", msg, 
node));

Reply via email to