Fixed updateSequence comparison, need send partitions if state was changed in 
detectLostPartitions.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c988cb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c988cb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c988cb7

Branch: refs/heads/ignite-5267-1
Commit: 5c988cb7b9a8e9563f7b3007dae133130f3d6681
Parents: 77aa61b
Author: Pavel Kovalenko <[email protected]>
Authored: Thu Jun 15 18:51:07 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jun 15 18:51:07 2017 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       | 32 ++++++++++++++------
 .../GridDhtPartitionsExchangeFuture.java        | 12 ++++++--
 .../IgnitePdsCacheRebalancingAbstractTest.java  |  9 ++++--
 3 files changed, 39 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5c988cb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index e8fcef9..3c626f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1037,6 +1037,20 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         }
     }
 
+    /**
+     * Checks should current partition map overwritten by new partition map
+     * Method returns true if topology version or update sequence of new map 
are greater than of current map
+     *
+     * @param currentMap Current partition map
+     * @param newMap New partition map
+     * @return True if current partition map should be overwritten by new 
partition map, false in other case
+     */
+    private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, 
GridDhtPartitionMap newMap) {
+        return newMap != null &&
+                
(newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 ||
+                 
newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && 
newMap.updateSequence() > currentMap.updateSequence());
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Override public GridDhtPartitionMap update(
@@ -1087,7 +1101,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 return null;
             }
 
-            if (node2part != null && node2part.compareTo(partMap) >= 0) {
+            if (node2part != null && node2part.compareTo(partMap) > 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale partition map for full partition map 
update (will ignore) [lastExch=" +
                         lastExchangeVer + ", exch=" + exchangeVer + ", 
curMap=" + node2part + ", newMap=" + partMap + ']');
@@ -1102,16 +1116,14 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 for (GridDhtPartitionMap part : node2part.values()) {
                     GridDhtPartitionMap newPart = partMap.get(part.nodeId());
 
-                    // If for some nodes current partition has a newer map,
-                    // then we keep the newer value.
-                    if (newPart != null &&
-                        (newPart.updateSequence() < part.updateSequence() ||
-                        
(grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
-                        ) {
+                    if (shouldOverridePartitionMap(part, newPart)) {
                         if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update 
map [exch=" + exchangeVer +
-                                ", curPart=" + mapString(part) + ", newPart=" 
+ mapString(newPart) + ']');
-
+                            log.debug("Overriding partition map in full update 
map [exchId=" + exchangeVer + ", curPart=" +
+                                mapString(part) + ", newPart=" + 
mapString(newPart) + ']');
+                    }
+                    else {
+                        // If for some nodes current partition has a newer map,
+                        // then we keep the newer value.
                         partMap.put(part.nodeId(), part);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c988cb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 67d5622..8c56b711 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1636,15 +1636,23 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * Detect lost partitions.
      */
     private void detectLostPartitions() {
+        boolean detected = false;
+
         synchronized (cctx.exchange().interruptLock()) {
             if (Thread.currentThread().isInterrupted())
                 return;
 
             for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                if (!grp.isLocal())
-                    grp.topology().detectLostPartitions(discoEvt);
+                if (!grp.isLocal()) {
+                    boolean detectedOnGrp = 
grp.topology().detectLostPartitions(discoEvt);
+
+                    detected |= detectedOnGrp;
+                }
             }
         }
+
+        if (detected)
+            cctx.exchange().scheduleResendPartitions();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c988cb7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
index 037a1b1..e4ec085 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java
@@ -129,6 +129,11 @@ public abstract class 
IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb
         return 20 * 60 * 1000;
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getPartitionMapExchangeTimeout() {
+        return 60 * 1000;
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Cache configuration.
@@ -459,8 +464,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest 
extends GridCommonAb
                 add = true;
             else if (nodesCnt.get() > maxNodesCount)
                 add = false;
-            else
-                add = ThreadLocalRandom.current().nextBoolean();
+            else // More chance that node will be added
+                add = ThreadLocalRandom.current().nextInt(3 ) <= 1;
 
             if (add)
                 startGrid(nodesCnt.incrementAndGet());

Reply via email to