Repository: ignite
Updated Branches:
  refs/heads/master 86a788f31 -> d9be16f23


IGNITE-10589 Correctly handle exchanges merge when calculating last affinity 
change topology version - Fixes #5609.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d9be16f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d9be16f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d9be16f2

Branch: refs/heads/master
Commit: d9be16f23f4df6405ea7e523c255dbe44e21c853
Parents: 86a788f
Author: Ilya Lantukh <[email protected]>
Authored: Mon Dec 10 10:23:20 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Mon Dec 10 10:23:20 2018 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 13 ++-
 .../affinity/AffinityTopologyVersion.java       |  9 +++
 .../GridCachePartitionExchangeManager.java      | 14 +++-
 .../cache/CacheNoAffinityExchangeTest.java      | 84 ++++++++++++++++++++
 4 files changed, 113 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 5abe63c..bbe0c78 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2050,10 +2050,17 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             AffinityTopologyVersion lastAffChangedTopVer =
                 
ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(topVer);
 
-            DiscoCache lastAffChangedDiscoCache = 
discoCacheHist.get(lastAffChangedTopVer);
+            if (!lastAffChangedTopVer.equals(topVer)) {
+                assert lastAffChangedTopVer.compareTo(topVer) < 0;
 
-            if (lastAffChangedDiscoCache != null)
-                return lastAffChangedDiscoCache;
+                for (Map.Entry<AffinityTopologyVersion, DiscoCache> e : 
discoCacheHist.descendingEntrySet()) {
+                    if (e.getKey().isBetween(lastAffChangedTopVer, topVer))
+                        return e.getValue();
+
+                    if (e.getKey().compareTo(lastAffChangedTopVer) < 0)
+                        break;
+                }
+            }
 
             CacheGroupDescriptor desc = 
ctx.cache().cacheGroupDescriptors().get(grpId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
index 333841d..2c02f26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java
@@ -112,6 +112,15 @@ public class AffinityTopologyVersion implements 
Comparable<AffinityTopologyVersi
         return cmp;
     }
 
+    /**
+     * @param lower Lower bound.
+     * @param upper Upper bound.
+     * @return {@code True} if this topology version is within provided bounds 
(inclusive).
+     */
+    public boolean isBetween(AffinityTopologyVersion lower, 
AffinityTopologyVersion upper) {
+        return compareTo(lower) >= 0 && compareTo(upper) <= 0;
+    }
+
     /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0a0e709..88f6f0c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -2854,12 +2854,18 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                                 if (lastFut != null) {
                                     if (!lastFut.changedAffinity()) {
-                                        AffinityTopologyVersion lastAffVer = 
cctx.exchange().lastAffinityChangedTopologyVersion(lastFut.initialVersion());
-
-                                        
cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), 
lastAffVer);
+                                        // If lastFut corresponds to merged 
exchange, it is essential to use
+                                        // topologyVersion() instead of 
initialVersion() - nodes joined in this PME
+                                        // will have DiscoCache only for the 
last version.
+                                        AffinityTopologyVersion lastAffVer = 
cctx.exchange()
+                                            
.lastAffinityChangedTopologyVersion(lastFut.topologyVersion());
+
+                                        
cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
+                                            lastAffVer);
                                     }
                                     else
-                                        
cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(), 
lastFut.initialVersion());
+                                        
cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
+                                            lastFut.topologyVersion());
                                 }
                             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d9be16f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
index 15cd5ee..7eada30 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheNoAffinityExchangeTest.java
@@ -23,10 +23,19 @@ import java.util.concurrent.locks.Lock;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -63,6 +72,8 @@ public class CacheNoAffinityExchangeTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
         cfg.setDiscoverySpi(new TestDiscoverySpi().setIpFinder(IP_FINDER));
 
         if (startClient) {
@@ -199,6 +210,79 @@ public class CacheNoAffinityExchangeTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testNoAffinityChangeOnClientLeftWithMergedExchanges() throws 
Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY, 
"1000");
+
+        try {
+            Ignite ig = startGridsMultiThreaded(4);
+
+            IgniteCache<Integer, Integer> atomicCache = ig.createCache(new 
CacheConfiguration<Integer, Integer>()
+                
.setName("atomic").setAtomicityMode(CacheAtomicityMode.ATOMIC).setCacheMode(CacheMode.REPLICATED));
+
+            IgniteCache<Integer, Integer> txCache = ig.createCache(new 
CacheConfiguration<Integer, Integer>()
+                
.setName("tx").setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setCacheMode(CacheMode.REPLICATED));
+
+            startClient = true;
+
+            Ignite client = startGrid("client");
+
+            startClient = false;
+
+            stopGrid(1);
+            stopGrid(2);
+            stopGrid(3);
+
+            awaitPartitionMapExchange();
+
+            atomicCache.put(-1, -1);
+            txCache.put(-1, -1);
+
+            TestRecordingCommunicationSpi.spi(ig).blockMessages(new 
IgniteBiPredicate<ClusterNode, Message>() {
+                @Override public boolean apply(ClusterNode node, Message 
message) {
+                    return message instanceof GridDhtPartitionSupplyMessageV2;
+                }
+            });
+
+            startGridsMultiThreaded(1, 3);
+
+            CountDownLatch latch = new CountDownLatch(1);
+            for (Ignite ignite : G.allGrids()) {
+                if (ignite.cluster().localNode().order() == 9) {
+                    TestDiscoverySpi discoSpi =
+                        
(TestDiscoverySpi)((IgniteEx)ignite).context().discovery().getInjectedDiscoverySpi();
+
+                    discoSpi.latch = latch;
+
+                    break;
+                }
+            }
+
+            client.close();
+
+            for (int k = 0; k < 100; k++) {
+                atomicCache.put(k, k);
+                txCache.put(k, k);
+
+                Lock lock = txCache.lock(k);
+                lock.lock();
+                lock.unlock();
+            }
+
+            for (int k = 0; k < 100; k++) {
+                assertEquals(Integer.valueOf(k), atomicCache.get(k));
+                assertEquals(Integer.valueOf(k), txCache.get(k));
+            }
+
+            latch.countDown();
+        }
+        finally {
+            
System.clearProperty(IgniteSystemProperties.IGNITE_EXCHANGE_MERGE_DELAY);
+        }
+    }
+
+    /**
      *
      */
     public static class TestDiscoverySpi extends TcpDiscoverySpi {

Reply via email to