This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch gg-18540 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 8baee8b280626e04157caa937e81a8b8b2ebbd8e Author: Pavel Kovalenko <[email protected]> AuthorDate: Mon May 6 12:21:52 2019 +0300 GG-17478 Fixed wrong assert on affinity initialization on node join Signed-off-by: Pavel Kovalenko <[email protected]> --- .../cache/CacheAffinitySharedManager.java | 2 +- .../PartitionsExchangeCoordinatorFailoverTest.java | 147 +++++++++++++++++---- 2 files changed, 120 insertions(+), 29 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 45f55da..29b4d81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1543,7 +1543,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupContext grp = cctx.cache().cacheGroup(holder.groupId()); if (affReq != null && affReq.contains(aff.groupId())) { - assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion(); + assert resTopVer.compareTo(aff.lastVersion()) >= 0 : aff.lastVersion(); CacheGroupAffinityMessage affMsg = receivedAff.get(aff.groupId()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java index fdef983..5cff138 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java @@ -16,12 +16,15 @@ package org.apache.ignite.internal.processors.cache; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -41,9 +44,9 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; import org.junit.Test; @@ -55,8 +58,14 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac /** */ private static final String CACHE_NAME = "cache"; + /** Coordinator node name. */ + private static final String CRD_NONE = "crd"; + + /** */ + private volatile Supplier<TcpCommunicationSpi> spiFactory = TcpCommunicationSpi::new; + /** */ - private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new; + private boolean newCaches = true; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -64,7 +73,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac cfg.setConsistentId(igniteInstanceName); - cfg.setCommunicationSpi(spiFactory.get()); + cfg.setCommunicationSpi(spiFactory.get().setName("tcp")); cfg.setCacheConfiguration( new CacheConfiguration(CACHE_NAME) @@ -73,7 +82,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac ); // Add cache that exists only on coordinator node. - if (igniteInstanceName.equals("crd")) { + if (newCaches && igniteInstanceName.equals(CRD_NONE)) { IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName); cfg.setCacheConfiguration( @@ -106,7 +115,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac public void testNewCoordinatorCompletedExchange() throws Exception { spiFactory = TestRecordingCommunicationSpi::new; - IgniteEx crd = (IgniteEx) startGrid("crd"); + IgniteEx crd = startGrid(CRD_NONE); IgniteEx newCrd = startGrid(1); @@ -121,13 +130,13 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac // Block FullMessage for newly joined nodes. TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd); - final CountDownLatch sendFullMsgLatch = new CountDownLatch(1); + final CountDownLatch sndFullMsgLatch = new CountDownLatch(1); // Delay sending full message to newly joined nodes. spi.blockMessages((node, msg) -> { if (msg instanceof GridDhtPartitionsFullMessage && node.order() > 2) { try { - sendFullMsgLatch.await(); + sndFullMsgLatch.await(); } catch (Throwable ignored) { } @@ -155,13 +164,13 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac getTestTimeout() ); - IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid("crd", true, false)); + IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE, true, false)); // Magic sleep to make sure that coordinator stop process has started. U.sleep(1000); // Resume full messages sending to unblock coordinator stopping process. - sendFullMsgLatch.countDown(); + sndFullMsgLatch.countDown(); // Coordinator stop should succeed. stopCrdFut.get(); @@ -192,7 +201,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception { spiFactory = TestRecordingCommunicationSpi::new; - IgniteEx crd = startGrid("crd"); + IgniteEx crd = startGrid(CRD_NONE); IgniteEx newCrd = startGrid(1); @@ -202,7 +211,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac awaitPartitionMapExchange(); - blockSendingFullMessage(crd, problemNode); + blockSendingFullMessage(crd, node -> node.equals(problemNode.localNode())); IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() -> startGrid(3)); @@ -210,11 +219,11 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac U.sleep(5000); - blockSendingFullMessage(newCrd, problemNode); + blockSendingFullMessage(newCrd, node -> node.equals(problemNode.localNode())); - IgniteInternalFuture stopCoordinatorFut = GridTestUtils.runAsync(() -> stopGrid("crd")); + IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE)); - stopCoordinatorFut.get(); + stopCrdFut.get(); U.sleep(5000); @@ -237,9 +246,9 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac final int delay = 5_000; if (msg instanceof GridDhtPartitionDemandMessage) { - GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage) msg; + GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage) msg; - if (demandMessage.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME)) + if (demandMsg.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME)) return 0; return delay; @@ -248,7 +257,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac return 0; }); - final IgniteEx crd = startGrid("crd"); + final IgniteEx crd = startGrid(CRD_NONE); startGrid(1); @@ -293,7 +302,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac U.sleep(2_500); // And then stop coordinator node. - stopGrid("crd", true); + stopGrid(CRD_NONE, true); startNodeFut.get(); @@ -314,11 +323,93 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac } /** + * Test checks that changing coordinator to a node that joining to cluster at the moment works correctly + * in case of exchanges merge and completed exchange on other joining nodes. + */ + @Test + @WithSystemProperty(key = IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true") + public void testChangeCoordinatorToLocallyJoiningNode() throws Exception { + newCaches = false; + + spiFactory = TestRecordingCommunicationSpi::new; + + IgniteEx crd = startGrid(CRD_NONE); + + final int newCrdNodeIdx = 1; + + // A full message shouldn't be send to new coordinator. + blockSendingFullMessage(crd, node -> node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx))); + + CountDownLatch joiningNodeSentSingleMsg = new CountDownLatch(1); + + // For next joining node delay sending single message to emulate exchanges merge. + spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> { + final int delay = 5_000; + + if (msg instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg; + + if (singleMsg.exchangeId() != null) { + joiningNodeSentSingleMsg.countDown(); + + return delay; + } + } + + return 0; + }); + + IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() -> startGrid(newCrdNodeIdx)); + + // Wait till new coordinator node sent single message. + joiningNodeSentSingleMsg.await(); + + spiFactory = TcpCommunicationSpi::new; + + // Additionally start 2 new nodes. Their exchange should be merged with exchange on join new coordinator node. + startGridsMultiThreaded(2, 2); + + Assert.assertFalse("New coordinator join shouldn't be happened before stopping old coordinator.", + newCrdJoinFut.isDone()); + + // Stop coordinator. + stopGrid(CRD_NONE); + + // New coordinator join process should succeed after that. + newCrdJoinFut.get(); + + awaitPartitionMapExchange(); + + // Check that affinity are equal on all nodes. + AffinityTopologyVersion affVer = ((IgniteEx) ignite(1)).cachex(CACHE_NAME) + .context().shared().exchange().readyAffinityVersion(); + + List<List<ClusterNode>> expAssignment = null; + IgniteEx expAssignmentNode = null; + + for (Ignite node : G.allGrids()) { + IgniteEx nodeEx = (IgniteEx) node; + + List<List<ClusterNode>> assignment = nodeEx.cachex(CACHE_NAME).context().affinity().assignments(affVer); + + if (expAssignment == null) { + expAssignment = assignment; + expAssignmentNode = nodeEx; + } + else + Assert.assertEquals("Affinity assignments are different " + + "[expectedNode=" + expAssignmentNode + ", actualNode=" + nodeEx + "]", expAssignment, assignment); + } + } + + /** * Blocks sending full message from coordinator to non-coordinator node. + * * @param from Coordinator node. - * @param to Non-coordinator node. + * @param pred Non-coordinator node predicate. + * If predicate returns {@code true} a full message will not be send to that node. */ - private void blockSendingFullMessage(IgniteEx from, IgniteEx to) { + private void blockSendingFullMessage(IgniteEx from, Predicate<ClusterNode> pred) { // Block FullMessage for newly joined nodes. TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(from); @@ -327,8 +418,8 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac if (msg instanceof GridDhtPartitionsFullMessage) { GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage) msg; - if (fullMsg.exchangeId() != null && node.order() == to.localNode().order()) { - log.warning("Blocked sending " + msg + " to " + to.localNode()); + if (fullMsg.exchangeId() != null && pred.test(node)) { + log.warning("Blocked sending " + msg + " to " + node); return true; } @@ -341,9 +432,9 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac /** * Communication SPI that allows to delay sending message by predicate. */ - class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi { + static class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi { /** Function that returns delay in milliseconds for given message. */ - private final Function<Message, Integer> delayMessageFunc; + private final Function<Message, Integer> delayMsgFunc; /** */ DynamicDelayingCommunicationSpi() { @@ -351,10 +442,10 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac } /** - * @param delayMessageFunc Function to calculate delay for message. + * @param delayMsgFunc Function to calculate delay for message. */ - DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMessageFunc) { - this.delayMessageFunc = delayMessageFunc; + DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMsgFunc) { + this.delayMsgFunc = delayMsgFunc; } /** {@inheritDoc} */ @@ -363,7 +454,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac try { GridIoMessage ioMsg = (GridIoMessage)msg; - int delay = delayMessageFunc.apply(ioMsg.message()); + int delay = delayMsgFunc.apply(ioMsg.message()); if (delay > 0) { log.warning(String.format("Delay sending %s to %s", msg, node));
