This is an automated email from the ASF dual-hosted git repository.
jokser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 037b38e IGNITE-11852 Fixed several coordinator failure issues during
PME. - Fixes #6539.
037b38e is described below
commit 037b38e43b2698407554e2c94a7676250fa6af59
Author: Pavel Kovalenko <[email protected]>
AuthorDate: Tue Oct 15 17:59:15 2019 +0300
IGNITE-11852 Fixed several coordinator failure issues during PME. - Fixes
#6539.
Signed-off-by: Pavel Kovalenko <[email protected]>
---
.../cache/CacheAffinitySharedManager.java | 24 ++-
.../cache/GridCachePartitionExchangeManager.java | 12 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 10 +-
.../dht/preloader/InitNewCoordinatorFuture.java | 23 ++-
.../PartitionsExchangeCoordinatorFailoverTest.java | 203 ++++++++++++++++++---
5 files changed, 233 insertions(+), 39 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f00b6b1..2e54a30 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1532,7 +1532,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
CacheGroupContext grp =
cctx.cache().cacheGroup(holder.groupId());
if (affReq != null && affReq.contains(aff.groupId())) {
- assert
AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion();
+ assert resTopVer.compareTo(aff.lastVersion()) >= 0 :
aff.lastVersion();
CacheGroupAffinityMessage affMsg =
receivedAff.get(aff.groupId());
@@ -1702,7 +1702,7 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
* @param fut Current exchange future.
* @return Computed difference with ideal affinity.
*/
- private Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(
+ public Map<Integer, CacheGroupAffinityMessage> onReassignmentEnforced(
final GridDhtPartitionsExchangeFuture fut) {
final ExchangeDiscoveryEvents evts = fut.context().events();
@@ -1710,12 +1710,16 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
@Override public void applyx(CacheGroupDescriptor desc) throws
IgniteCheckedException {
AffinityTopologyVersion topVer = evts.topologyVersion();
- CacheGroupHolder cache = getOrCreateGroupHolder(topVer, desc);
+ CacheGroupHolder grpHolder = getOrCreateGroupHolder(topVer,
desc);
+
+ // Already calculated.
+ if (grpHolder.affinity().lastVersion().equals(topVer))
+ return;
- List<List<ClusterNode>> assign =
cache.affinity().calculate(topVer, evts, evts.discoveryCache()).assignment();
+ List<List<ClusterNode>> assign =
grpHolder.affinity().calculate(topVer, evts,
evts.discoveryCache()).assignment();
- if (!cache.rebalanceEnabled ||
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
- cache.affinity().initialize(topVer, assign);
+ if (!grpHolder.rebalanceEnabled ||
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
+ grpHolder.affinity().initialize(topVer, assign);
fut.timeBag().finishLocalStage("Affinity initialization
(enforced) " +
"[grp=" + desc.cacheOrGroupName() + "]");
@@ -2196,7 +2200,13 @@ public class CacheAffinitySharedManager<K, V> extends
GridCacheSharedManagerAdap
@Override public void applyx(CacheGroupDescriptor desc) throws
IgniteCheckedException {
CacheGroupHolder grpHolder =
getOrCreateGroupHolder(evts.topologyVersion(), desc);
- boolean latePrimary = grpHolder.rebalanceEnabled;
+ CacheGroupHolder cache =
getOrCreateGroupHolder(evts.topologyVersion(), desc);
+
+ // Already calculated.
+ if
(cache.affinity().lastVersion().equals(evts.topologyVersion()))
+ return;
+
+ boolean latePrimary = cache.rebalanceEnabled;
boolean grpAdded = evts.nodeJoined(desc.receivedFrom());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 19299a4..dd28286 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2532,7 +2532,10 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
* @param curFut Current active exchange future.
* @return {@code False} if need wait messages for merged exchanges.
*/
- public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture
curFut) {
+ public boolean mergeExchangesOnCoordinator(
+ GridDhtPartitionsExchangeFuture curFut,
+ @Nullable AffinityTopologyVersion threshold
+ ) {
if (IGNITE_EXCHANGE_MERGE_DELAY > 0) {
try {
U.sleep(IGNITE_EXCHANGE_MERGE_DELAY);
@@ -2558,6 +2561,13 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
DiscoveryEvent evt = fut.firstEvent();
+ if (threshold != null &&
fut.initialVersion().compareTo(threshold) > 0) {
+ if (log.isInfoEnabled())
+ log.info("Stop merge, threshold is exceed: " + evt
+ ", threshold = " + threshold);
+
+ break;
+ }
+
if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
if (log.isInfoEnabled())
log.info("Stop merge, custom event found: " + evt);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a1fc523..88399e7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3385,7 +3385,15 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
if (log.isInfoEnabled())
log.info("Coordinator received all messages, try merge
[ver=" + initialVersion() + ']');
- boolean finish =
cctx.exchange().mergeExchangesOnCoordinator(this);
+ AffinityTopologyVersion threshold = newCrdFut != null ?
newCrdFut.resultTopologyVersion() : null;
+
+ if (threshold != null) {
+ assert newCrdFut.fullMessage() == null :
+ "There is full message in new coordinator future, but
exchange was not finished using it: "
+ + newCrdFut.fullMessage();
+ }
+
+ boolean finish =
cctx.exchange().mergeExchangesOnCoordinator(this, threshold);
timeBag.finishGlobalStage("Exchanges merge");
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
index f311817..1ae3aea 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java
@@ -75,6 +75,9 @@ public class InitNewCoordinatorFuture extends
GridCompoundFuture implements Igni
private AffinityTopologyVersion initTopVer;
/** */
+ private AffinityTopologyVersion resTopVer;
+
+ /** */
private Map<UUID, GridDhtPartitionExchangeId> joinedNodes;
/** */
@@ -223,6 +226,15 @@ public class InitNewCoordinatorFuture extends
GridCompoundFuture implements Igni
}
/**
+ * @return Result topology version from nodes that already finished this
exchange.
+ */
+ AffinityTopologyVersion resultTopologyVersion() {
+ synchronized (this) {
+ return resTopVer;
+ }
+ }
+
+ /**
* @param node Node.
* @param msg Message.
*/
@@ -242,9 +254,16 @@ public class InitNewCoordinatorFuture extends
GridCompoundFuture implements Igni
GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage();
if (fullMsg0 != null && fullMsg0.resultTopologyVersion() !=
null) {
- assert fullMsg == null ||
fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
+ if (node.isClient() || node.isDaemon()) {
+ assert resTopVer == null ||
resTopVer.equals(fullMsg0.resultTopologyVersion());
+
+ resTopVer = fullMsg0.resultTopologyVersion();
+ }
+ else {
+ assert fullMsg == null ||
fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion());
- fullMsg = fullMsg0;
+ fullMsg = fullMsg0;
+ }
}
else
msgs.put(node, msg);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
index 6c1d85b..38fb088 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -34,6 +37,7 @@ import
org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.util.typedef.G;
@@ -42,9 +46,9 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
@@ -56,8 +60,17 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
/** */
private static final String CACHE_NAME = "cache";
+ /** Coordinator node name. */
+ private static final String CRD_NONE = "crd";
+
+ /** */
+ private volatile Supplier<TcpCommunicationSpi> spiFactory =
TcpCommunicationSpi::new;
+
/** */
- private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+ private boolean newCaches = true;
+
+ /** Node client mode. */
+ private boolean client;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
@@ -65,7 +78,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
cfg.setConsistentId(igniteInstanceName);
- cfg.setCommunicationSpi(spiFactory.get());
+ cfg.setCommunicationSpi(spiFactory.get().setName("tcp"));
cfg.setCacheConfiguration(
new CacheConfiguration(CACHE_NAME)
@@ -74,7 +87,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
);
// Add cache that exists only on coordinator node.
- if (igniteInstanceName.equals("crd")) {
+ if (newCaches && igniteInstanceName.equals(CRD_NONE)) {
IgnitePredicate<ClusterNode> nodeFilter = node ->
node.consistentId().equals(igniteInstanceName);
cfg.setCacheConfiguration(
@@ -85,6 +98,8 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
);
}
+ cfg.setClientMode(client);
+
return cfg;
}
@@ -107,7 +122,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
public void testNewCoordinatorCompletedExchange() throws Exception {
spiFactory = TestRecordingCommunicationSpi::new;
- IgniteEx crd = (IgniteEx) startGrid("crd");
+ IgniteEx crd = startGrid(CRD_NONE);
IgniteEx newCrd = startGrid(1);
@@ -122,13 +137,13 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
// Block FullMessage for newly joined nodes.
TestRecordingCommunicationSpi spi =
TestRecordingCommunicationSpi.spi(crd);
- final CountDownLatch sendFullMsgLatch = new CountDownLatch(1);
+ final CountDownLatch sndFullMsgLatch = new CountDownLatch(1);
// Delay sending full message to newly joined nodes.
spi.blockMessages((node, msg) -> {
if (msg instanceof GridDhtPartitionsFullMessage && node.order() >
2) {
try {
- sendFullMsgLatch.await();
+ sndFullMsgLatch.await();
}
catch (Throwable ignored) { }
@@ -156,13 +171,13 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
getTestTimeout()
);
- IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() ->
stopGrid("crd", true, false));
+ IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() ->
stopGrid(CRD_NONE, true, false));
// Magic sleep to make sure that coordinator stop process has started.
U.sleep(1000);
// Resume full messages sending to unblock coordinator stopping
process.
- sendFullMsgLatch.countDown();
+ sndFullMsgLatch.countDown();
// Coordinator stop should succeed.
stopCrdFut.get();
@@ -193,7 +208,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws
Exception {
spiFactory = TestRecordingCommunicationSpi::new;
- IgniteEx crd = startGrid("crd");
+ IgniteEx crd = startGrid(CRD_NONE);
IgniteEx newCrd = startGrid(1);
@@ -203,7 +218,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
awaitPartitionMapExchange();
- blockSendingFullMessage(crd, problemNode);
+ blockSendingFullMessage(crd, node ->
node.equals(problemNode.localNode()));
IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() ->
startGrid(3));
@@ -211,11 +226,11 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
U.sleep(5000);
- blockSendingFullMessage(newCrd, problemNode);
+ blockSendingFullMessage(newCrd, node ->
node.equals(problemNode.localNode()));
- IgniteInternalFuture stopCoordinatorFut = GridTestUtils.runAsync(() ->
stopGrid("crd"));
+ IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() ->
stopGrid(CRD_NONE));
- stopCoordinatorFut.get();
+ stopCrdFut.get();
U.sleep(5000);
@@ -238,9 +253,9 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
final int delay = 5_000;
if (msg instanceof GridDhtPartitionDemandMessage) {
- GridDhtPartitionDemandMessage demandMessage =
(GridDhtPartitionDemandMessage) msg;
+ GridDhtPartitionDemandMessage demandMsg =
(GridDhtPartitionDemandMessage) msg;
- if (demandMessage.groupId() ==
GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+ if (demandMsg.groupId() ==
GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
return 0;
return delay;
@@ -249,7 +264,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
return 0;
});
- final IgniteEx crd = startGrid("crd");
+ final IgniteEx crd = startGrid(CRD_NONE);
startGrid(1);
@@ -294,7 +309,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
U.sleep(2_500);
// And then stop coordinator node.
- stopGrid("crd", true);
+ stopGrid(CRD_NONE, true);
startNodeFut.get();
@@ -315,11 +330,143 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
}
/**
+ * Test checks that changing coordinator to a node that joining to cluster
at the moment works correctly
+ * in case of exchanges merge and completed exchange on other joining
nodes.
+ */
+ @Test
+ @WithSystemProperty(key =
IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value =
"true")
+ public void testChangeCoordinatorToLocallyJoiningNode() throws Exception {
+ newCaches = false;
+
+ spiFactory = TestRecordingCommunicationSpi::new;
+
+ IgniteEx crd = startGrid(CRD_NONE);
+
+ final int newCrdNodeIdx = 1;
+
+ // A full message shouldn't be send to new coordinator.
+ blockSendingFullMessage(crd, node ->
node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
+
+ CountDownLatch joiningNodeSentSingleMsg = new CountDownLatch(1);
+
+ // For next joining node delay sending single message to emulate
exchanges merge.
+ spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+ final int delay = 5_000;
+
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMsg =
(GridDhtPartitionsSingleMessage) msg;
+
+ if (singleMsg.exchangeId() != null) {
+ joiningNodeSentSingleMsg.countDown();
+
+ return delay;
+ }
+ }
+
+ return 0;
+ });
+
+ IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() ->
startGrid(newCrdNodeIdx));
+
+ // Wait till new coordinator node sent single message.
+ joiningNodeSentSingleMsg.await();
+
+ spiFactory = TcpCommunicationSpi::new;
+
+ // Additionally start 2 new nodes. Their exchange should be merged
with exchange on join new coordinator node.
+ startGridsMultiThreaded(2, 2);
+
+ Assert.assertFalse("New coordinator join shouldn't be happened before
stopping old coordinator.",
+ newCrdJoinFut.isDone());
+
+ // Stop coordinator.
+ stopGrid(CRD_NONE);
+
+ // New coordinator join process should succeed after that.
+ newCrdJoinFut.get();
+
+ awaitPartitionMapExchange();
+
+ // Check that affinity are equal on all nodes.
+ AffinityTopologyVersion affVer = ((IgniteEx)
ignite(1)).cachex(CACHE_NAME)
+ .context().shared().exchange().readyAffinityVersion();
+
+ List<List<ClusterNode>> expAssignment = null;
+ IgniteEx expAssignmentNode = null;
+
+ for (Ignite node : G.allGrids()) {
+ IgniteEx nodeEx = (IgniteEx) node;
+
+ List<List<ClusterNode>> assignment =
nodeEx.cachex(CACHE_NAME).context().affinity().assignments(affVer);
+
+ if (expAssignment == null) {
+ expAssignment = assignment;
+ expAssignmentNode = nodeEx;
+ }
+ else
+ Assert.assertEquals("Affinity assignments are different " +
+ "[expectedNode=" + expAssignmentNode + ", actualNode=" +
nodeEx + "]", expAssignment, assignment);
+ }
+ }
+
+ /**
+ * Test checks that changing coordinator to a node that joining to cluster
at the moment works correctly
+ * in case of completed exchange on client nodes.
+ */
+ @Test
+ @WithSystemProperty(key =
IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value =
"true")
+ public void testChangeCoordinatorToLocallyJoiningNode2() throws Exception {
+ newCaches = false;
+
+ spiFactory = TestRecordingCommunicationSpi::new;
+
+ IgniteEx crd = startGrid(CRD_NONE);
+
+ client = true;
+
+ // Start several clients.
+ IgniteEx clientNode = (IgniteEx) startGridsMultiThreaded(2, 2);
+
+ client = false;
+
+ awaitPartitionMapExchange();
+
+ final int newCrdNodeIdx = 1;
+
+ // A full message shouldn't be send to new coordinator.
+ blockSendingFullMessage(crd, node ->
node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
+
+ IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() ->
startGrid(newCrdNodeIdx));
+
+ // Wait till client node will receive full message and finish exchange
on node join.
+ GridTestUtils.waitForCondition(() -> {
+ GridDhtPartitionsExchangeFuture fut =
clientNode.cachex(CACHE_NAME)
+ .context().shared().exchange().lastFinishedFuture();
+
+ return fut != null && fut.topologyVersion().equals(new
AffinityTopologyVersion(4, 0));
+ }, 60_000
+ );
+
+ Assert.assertFalse("New coordinator join shouldn't be happened before
stopping old coordinator.",
+ newCrdJoinFut.isDone());
+
+ // Stop coordinator.
+ stopGrid(CRD_NONE);
+
+ // New coordinator join process should succeed after that.
+ newCrdJoinFut.get();
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
* Blocks sending full message from coordinator to non-coordinator node.
+ *
* @param from Coordinator node.
- * @param to Non-coordinator node.
+ * @param pred Non-coordinator node predicate.
+ * If predicate returns {@code true} a full message will
not be send to that node.
*/
- private void blockSendingFullMessage(IgniteEx from, IgniteEx to) {
+ private void blockSendingFullMessage(IgniteEx from, Predicate<ClusterNode>
pred) {
// Block FullMessage for newly joined nodes.
TestRecordingCommunicationSpi spi =
TestRecordingCommunicationSpi.spi(from);
@@ -328,8 +475,8 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
if (msg instanceof GridDhtPartitionsFullMessage) {
GridDhtPartitionsFullMessage fullMsg =
(GridDhtPartitionsFullMessage) msg;
- if (fullMsg.exchangeId() != null && node.order() ==
to.localNode().order()) {
- log.warning("Blocked sending " + msg + " to " +
to.localNode());
+ if (fullMsg.exchangeId() != null && pred.test(node)) {
+ log.warning("Blocked sending " + msg + " to " + node);
return true;
}
@@ -342,9 +489,9 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
/**
* Communication SPI that allows to delay sending message by predicate.
*/
- class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
+ static class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
/** Function that returns delay in milliseconds for given message. */
- private final Function<Message, Integer> delayMessageFunc;
+ private final Function<Message, Integer> delayMsgFunc;
/** */
DynamicDelayingCommunicationSpi() {
@@ -352,10 +499,10 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
}
/**
- * @param delayMessageFunc Function to calculate delay for message.
+ * @param delayMsgFunc Function to calculate delay for message.
*/
- DynamicDelayingCommunicationSpi(final Function<Message, Integer>
delayMessageFunc) {
- this.delayMessageFunc = delayMessageFunc;
+ DynamicDelayingCommunicationSpi(final Function<Message, Integer>
delayMsgFunc) {
+ this.delayMsgFunc = delayMsgFunc;
}
/** {@inheritDoc} */
@@ -364,7 +511,7 @@ public class PartitionsExchangeCoordinatorFailoverTest
extends GridCommonAbstrac
try {
GridIoMessage ioMsg = (GridIoMessage)msg;
- int delay = delayMessageFunc.apply(ioMsg.message());
+ int delay = delayMsgFunc.apply(ioMsg.message());
if (delay > 0) {
log.warning(String.format("Delay sending %s to %s", msg,
node));