Repository: ignite Updated Branches: refs/heads/ignite-10189 6ae44a775 -> c77e82e37
IGNITE-10589 Correctly handle exchanges merge when calculating last affinity change topology version - Fixes #5609. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d9be16f2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d9be16f2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d9be16f2 Branch: refs/heads/ignite-10189 Commit: d9be16f23f4df6405ea7e523c255dbe44e21c853 Parents: 86a788f Author: Ilya Lantukh <[email protected]> Authored: Mon Dec 10 10:23:20 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Dec 10 10:23:20 2018 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 13 ++- .../affinity/AffinityTopologyVersion.java | 9 +++ .../GridCachePartitionExchangeManager.java | 14 +++- .../cache/CacheNoAffinityExchangeTest.java | 84 ++++++++++++++++++++ 4 files changed, 113 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 5abe63c..bbe0c78 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2050,10 +2050,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { AffinityTopologyVersion lastAffChangedTopVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(topVer); - DiscoCache lastAffChangedDiscoCache = discoCacheHist.get(lastAffChangedTopVer); + if (!lastAffChangedTopVer.equals(topVer)) { + assert lastAffChangedTopVer.compareTo(topVer) < 0; - if (lastAffChangedDiscoCache != null) - return lastAffChangedDiscoCache; + for (Map.Entry<AffinityTopologyVersion, DiscoCache> e : discoCacheHist.descendingEntrySet()) { + if (e.getKey().isBetween(lastAffChangedTopVer, topVer)) + return e.getValue(); + + if (e.getKey().compareTo(lastAffChangedTopVer) < 0) + break; + } + } CacheGroupDescriptor desc = ctx.cache().cacheGroupDescriptors().get(grpId); http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java index 333841d..2c02f26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java @@ -112,6 +112,15 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi return cmp; } + /** + * @param lower Lower bound. + * @param upper Upper bound. + * @return {@code True} if this topology version is within provided bounds (inclusive). + */ + public boolean isBetween(AffinityTopologyVersion lower, AffinityTopologyVersion upper) { + return compareTo(lower) >= 0 && compareTo(upper) <= 0; + } + /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 0a0e709..88f6f0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -2854,12 +2854,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (lastFut != null) { if (!lastFut.changedAffinity()) { - AffinityTopologyVersion lastAffVer = cctx.exchange().lastAffinityChangedTopologyVersion(lastFut.initialVersion()); - - cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), lastAffVer); + // If lastFut corresponds to merged exchange, it is essential to use + // topologyVersion() instead of initialVersion() - nodes joined in this PME + // will have DiscoCache only for the last version. + AffinityTopologyVersion lastAffVer = cctx.exchange() + .lastAffinityChangedTopologyVersion(lastFut.topologyVersion()); + + cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), + lastAffVer); } else - cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), lastFut.initialVersion()); + cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), + lastFut.topologyVersion()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java index 15cd5ee..7eada30 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java @@ -23,10 +23,19 @@ import java.util.concurrent.locks.Lock; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -63,6 +72,8 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + cfg.setDiscoverySpi(new TestDiscoverySpi().setIpFinder(IP_FINDER)); if (startClient) { @@ -199,6 +210,79 @@ public class CacheNoAffinityExchangeTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testNoAffinityChangeOnClientLeftWithMergedExchanges() throws Exception { + System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, "1000"); + + try { + Ignite ig = startGridsMultiThreaded(4); + + IgniteCache<Integer, Integer> atomicCache = ig.createCache(new CacheConfiguration<Integer, Integer>() + .setName("atomic").setAtomicityMode(CacheAtomicityMode.ATOMIC).setCacheMode(CacheMode.REPLICATED)); + + IgniteCache<Integer, Integer> txCache = ig.createCache(new CacheConfiguration<Integer, Integer>() + .setName("tx").setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setCacheMode(CacheMode.REPLICATED)); + + startClient = true; + + Ignite client = startGrid("client"); + + startClient = false; + + stopGrid(1); + stopGrid(2); + stopGrid(3); + + awaitPartitionMapExchange(); + + atomicCache.put(-1, -1); + txCache.put(-1, -1); + + TestRecordingCommunicationSpi.spi(ig).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message message) { + return message instanceof GridDhtPartitionSupplyMessageV2; + } + }); + + startGridsMultiThreaded(1, 3); + + CountDownLatch latch = new CountDownLatch(1); + for (Ignite ignite : G.allGrids()) { + if (ignite.cluster().localNode().order() == 9) { + TestDiscoverySpi discoSpi = + (TestDiscoverySpi)((IgniteEx)ignite).context().discovery().getInjectedDiscoverySpi(); + + discoSpi.latch = latch; + + break; + } + } + + client.close(); + + for (int k = 0; k < 100; k++) { + atomicCache.put(k, k); + txCache.put(k, k); + + Lock lock = txCache.lock(k); + lock.lock(); + lock.unlock(); + } + + for (int k = 0; k < 100; k++) { + assertEquals(Integer.valueOf(k), atomicCache.get(k)); + assertEquals(Integer.valueOf(k), txCache.get(k)); + } + + latch.countDown(); + } + finally { + System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY); + } + } + + /** * */ public static class TestDiscoverySpi extends TcpDiscoverySpi {
