Fixed updateSequence comparison, need send partitions if state was changed in detectLostPartitions.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c988cb7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c988cb7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c988cb7 Branch: refs/heads/ignite-5267-1 Commit: 5c988cb7b9a8e9563f7b3007dae133130f3d6681 Parents: 77aa61b Author: Pavel Kovalenko <[email protected]> Authored: Thu Jun 15 18:51:07 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 15 18:51:07 2017 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 32 ++++++++++++++------ .../GridDhtPartitionsExchangeFuture.java | 12 ++++++-- .../IgnitePdsCacheRebalancingAbstractTest.java | 9 ++++-- 3 files changed, 39 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5c988cb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index e8fcef9..3c626f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1037,6 +1037,20 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + /** + * Checks should current partition map overwritten by new partition map + * Method returns true if topology version or update sequence of new map are greater than of current map + * + * @param currentMap Current partition map + * @param newMap New partition map + * @return True if current partition map should be overwritten by new partition map, false in other case + */ + private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { + return newMap != null && + (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 || + newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence()); + } + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Override public GridDhtPartitionMap update( @@ -1087,7 +1101,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return null; } - if (node2part != null && node2part.compareTo(partMap) >= 0) { + if (node2part != null && node2part.compareTo(partMap) > 0) { if (log.isDebugEnabled()) log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" + lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']'); @@ -1102,16 +1116,14 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (GridDhtPartitionMap part : node2part.values()) { GridDhtPartitionMap newPart = partMap.get(part.nodeId()); - // If for some nodes current partition has a newer map, - // then we keep the newer value. - if (newPart != null && - (newPart.updateSequence() < part.updateSequence() || - (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0)) - ) { + if (shouldOverridePartitionMap(part, newPart)) { if (log.isDebugEnabled()) - log.debug("Overriding partition map in full update map [exch=" + exchangeVer + - ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']'); - + log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" + + mapString(part) + ", newPart=" + mapString(newPart) + ']'); + } + else { + // If for some nodes current partition has a newer map, + // then we keep the newer value. partMap.put(part.nodeId(), part); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5c988cb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 67d5622..8c56b711 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1636,15 +1636,23 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * Detect lost partitions. */ private void detectLostPartitions() { + boolean detected = false; + synchronized (cctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) return; for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (!grp.isLocal()) - grp.topology().detectLostPartitions(discoEvt); + if (!grp.isLocal()) { + boolean detectedOnGrp = grp.topology().detectLostPartitions(discoEvt); + + detected |= detectedOnGrp; + } } } + + if (detected) + cctx.exchange().scheduleResendPartitions(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5c988cb7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java index 037a1b1..e4ec085 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheRebalancingAbstractTest.java @@ -129,6 +129,11 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb return 20 * 60 * 1000; } + /** {@inheritDoc} */ + @Override protected long getPartitionMapExchangeTimeout() { + return 60 * 1000; + } + /** * @param cacheName Cache name. * @return Cache configuration. @@ -459,8 +464,8 @@ public abstract class IgnitePdsCacheRebalancingAbstractTest extends GridCommonAb add = true; else if (nodesCnt.get() > maxNodesCount) add = false; - else - add = ThreadLocalRandom.current().nextBoolean(); + else // More chance that node will be added + add = ThreadLocalRandom.current().nextInt(3 ) <= 1; if (add) startGrid(nodesCnt.incrementAndGet());
