Repository: ignite Updated Branches: refs/heads/ignite-5578 8a6583195 -> 3e22eac26
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e22eac2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e22eac2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e22eac2 Branch: refs/heads/ignite-5578 Commit: 3e22eac26873de9da3f10b80afcc7987fed8a180 Parents: 8a65831 Author: sboikov <[email protected]> Authored: Mon Jul 17 11:48:51 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Jul 17 18:17:20 2017 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityAssignmentCache.java | 29 ++ .../cache/CacheAffinitySharedManager.java | 6 +- .../cache/ExchangeDiscoveryEvents.java | 24 +- .../GridCachePartitionExchangeManager.java | 107 ++++++- .../dht/GridDhtPartitionTopologyImpl.java | 51 ++-- .../GridDhtPartitionsAbstractMessage.java | 4 + .../GridDhtPartitionsExchangeFuture.java | 290 +++++++++++++------ .../preloader/GridDhtPartitionsFullMessage.java | 12 +- .../dht/preloader/GridDhtPreloader.java | 6 +- 9 files changed, 377 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index a8ac825..1142c8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -360,6 +360,15 @@ public class GridAffinityAssignmentCache { return aff.assignment(); } + /** + * @param topVer Topology version. + * @return Affinity assignment. + */ + public List<List<ClusterNode>> readyAssignments(AffinityTopologyVersion topVer) { + AffinityAssignment aff = readyAffinity(topVer); + + return aff.assignment(); + } /** * Gets future that will be completed after topology with version {@code topVer} is calculated. @@ -463,6 +472,26 @@ public class GridAffinityAssignmentCache { return false; } + public AffinityAssignment readyAffinity(AffinityTopologyVersion topVer) { + AffinityAssignment cache = head.get(); + + if (!cache.topologyVersion().equals(topVer)) { + cache = affCache.get(topVer); + + if (cache == null) { + throw new IllegalStateException("Affinity for topology version is " + + "not initialized [locNode=" + ctx.discovery().localNode().id() + + ", grp=" + cacheOrGrpName + + ", topVer=" + topVer + + ", head=" + head.get().topologyVersion() + + ", history=" + affCache.keySet() + + ']'); + } + } + + return cache; + } + /** * Get cached affinity for specified topology version. * http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 4ea61a9..0ef2999 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1259,7 +1259,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap CacheGroupHolder cache = groupHolder(topVer, desc); - cache.affinity().calculate(topVer, evts.event(), evts.discoveryCache()); + cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache()); } }); @@ -1726,7 +1726,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (addedOnExchnage) { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) { List<List<ClusterNode>> newAff = aff.calculate(evts.topologyVersion(), - evts.event(), + evts.lastEvent(), evts.discoveryCache()); aff.initialize(evts.topologyVersion(), newAff); @@ -1744,7 +1744,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert aff.idealAssignment() != null : "Previous assignment is not available."; - List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.event(), evts.discoveryCache()); + List<List<ClusterNode>> idealAssignment = aff.calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache()); List<List<ClusterNode>> newAssignment = null; if (latePrimary) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java index 7d3e256..f1c4bea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java @@ -19,17 +19,15 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.UUID; -import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; -import static com.sun.corba.se.impl.util.RepositoryId.cache; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -45,7 +43,7 @@ public class ExchangeDiscoveryEvents { private DiscoCache discoCache; /** */ - private DiscoveryEvent evt; + private DiscoveryEvent lastEvt; /** */ private List<DiscoveryEvent> evts = new ArrayList<>(); @@ -76,7 +74,7 @@ public class ExchangeDiscoveryEvents { evts.add(evt); this.topVer = topVer; - this.evt = evt; + this.lastEvt = evt; this.discoCache = cache; ClusterNode node = evt.eventNode(); @@ -92,23 +90,27 @@ public class ExchangeDiscoveryEvents { } } - DiscoCache discoveryCache() { + public List<DiscoveryEvent> events() { + return evts; + } + + public DiscoCache discoveryCache() { return discoCache; } - DiscoveryEvent event() { - return evt; + public DiscoveryEvent lastEvent() { + return lastEvt; } - AffinityTopologyVersion topologyVersion() { + public AffinityTopologyVersion topologyVersion() { return topVer; } - boolean serverJoin() { + public boolean serverJoin() { return srvJoin; } - boolean serverLeft() { + public boolean serverLeft() { return srvLeft; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 90ea1fe..1db4b09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -1236,14 +1237,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param exchFut Exchange. + * @param topVer Topology version. * @param err Error. */ - public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) { - AffinityTopologyVersion topVer = exchFut.topologyVersion(); - + public void onExchangeDone(AffinityTopologyVersion topVer, @Nullable Throwable err) { if (log.isDebugEnabled()) - log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']'); + log.debug("Exchange done [topVer=" + topVer + ", err=" + err + ']'); if (err == null) { while (true) { @@ -1284,7 +1283,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana int skipped = 0; for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) { - if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion()) < 0) + if (topVer.compareTo(fut.exchangeId().topologyVersion()) < 0) continue; skipped++; @@ -1755,11 +1754,43 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana this.exchMergeTestWaitVer = exchMergeTestWaitVer; } - public boolean mergeExchanges(GridDhtPartitionsExchangeFuture curFut) { + public void mergeExchanges(GridDhtPartitionsExchangeFuture curFut, AffinityTopologyVersion resVer) + throws IgniteInterruptedCheckedException { + exchWorker.waitForExchangeFuture(resVer); + + for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) { + if (task instanceof GridDhtPartitionsExchangeFuture) { + GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture) task; + + if (fut.topologyVersion().compareTo(resVer) > 0) + break; + + log.info("Merge exchange future on finish [curFut=" + curFut.topologyVersion() + + ", mergedFut=" + fut.topologyVersion() + ']'); + + curFut.context().events().addEvent(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache()); + + exchWorker.futQ.remove(fut); + } + } + + ExchangeDiscoveryEvents evts = curFut.context().events(); + + assert evts.topologyVersion().equals(resVer) : "Invalid exchange merge result [ver=" + evts.topologyVersion() + + ", expVer=" + resVer + ']'; + } + + /** + * @param curFut Current active exchange future. + * @return {@code False} if need wait messages for merged exchanges. + */ + public boolean mergeExchangesOnCoordinator(GridDhtPartitionsExchangeFuture curFut) { AffinityTopologyVersion exchMergeTestWaitVer = this.exchMergeTestWaitVer; if (exchMergeTestWaitVer != null) { - log.info("Coalesce test, waiting for version [exch=" + curFut.topologyVersion() + + log.info("Exchange merge test, waiting for version [exch=" + curFut.topologyVersion() + ", waitVer=" + exchMergeTestWaitVer + ']'); long end = U.currentTimeMillis() + 10_000; @@ -1772,7 +1803,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task; if (exchMergeTestWaitVer.equals(fut.topologyVersion())) { - log.info("Coalesce test, found awaited version: " + exchMergeTestWaitVer); + log.info("Exchange merge test, found awaited version: " + exchMergeTestWaitVer); found = true; @@ -1808,8 +1839,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!supportsMergeExchanges(node)) break; - if (evt.type() == EVT_NODE_JOINED && !CU.clientNode(node)) - fut.mergeServerJoinExchange(curFut); + log.info("Merge exchange future [curFut=" + curFut.topologyVersion() + + ", mergedFut=" + fut.topologyVersion() + ']'); + + curFut.context().events().addEvent(fut.topologyVersion(), + fut.discoveryEvent(), + fut.discoCache()); + + if (evt.type() == EVT_NODE_JOINED && !CU.clientNode(node)) { + if (fut.mergeServerJoinExchange(curFut)) + awaited++; + } exchWorker.futQ.remove(fut); } @@ -1830,6 +1870,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ = new LinkedBlockingDeque<>(); + /** */ + private AffinityTopologyVersion lastFutVer; + /** Busy flag used as performance optimization to stop current preloading. */ private volatile boolean busy; @@ -1868,10 +1911,39 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana futQ.offer(exchFut); + synchronized (this) { + lastFutVer = exchFut.topologyVersion(); + + notifyAll(); + } + if (log.isDebugEnabled()) log.debug("Added exchange future to exchange worker: " + exchFut); } + private void waitForExchangeFuture(AffinityTopologyVersion resVer) throws IgniteInterruptedCheckedException { + synchronized (this) { + while (lastFutVer.compareTo(resVer) < 0) + U.wait(this); + } + } + + private void onExchangeDone(AffinityTopologyVersion resVer, GridDhtPartitionsExchangeFuture exchFut) + throws IgniteInterruptedCheckedException { + if (resVer.compareTo(exchFut.exchangeId().topologyVersion()) != 0) { + waitForExchangeFuture(resVer); + + for (CachePartitionExchangeWorkerTask task : futQ) { + if (task instanceof GridDhtPartitionsExchangeFuture) { + GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task; + + if (resVer.compareTo(fut0.topologyVersion()) >= 0) + futQ.remove(fut0); + } + } + } + } + /** {@inheritDoc} */ @Override public void cancel() { synchronized (interruptLock) { @@ -2005,6 +2077,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture exchFut = null; + AffinityTopologyVersion resVer = null; + try { if (isCancelled()) break; @@ -2038,7 +2112,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana while (true) { try { - exchFut.get(futTimeout, TimeUnit.MILLISECONDS); + resVer = exchFut.get(futTimeout, TimeUnit.MILLISECONDS); break; } @@ -2067,6 +2141,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + onExchangeDone(resVer, exchFut); if (log.isDebugEnabled()) log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" + @@ -2175,7 +2250,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (assignsCancelled) { // Pending exchange. U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + ", node=" + exchId.nodeId() + ']'); } else if (r != null) { @@ -2185,19 +2260,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!hasPendingExchange()) { U.log(log, "Rebalancing started " + - "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + ", node=" + exchId.nodeId() + ']'); r.run(); // Starts rebalancing routine. } else U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + ", node=" + exchId.nodeId() + ']'); } else U.log(log, "Skipping rebalancing (nothing scheduled) " + - "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + "[top=" + resVer + ", evt=" + exchId.discoveryEventName() + ", node=" + exchId.nodeId() + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 5ef499c..e477a82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -60,6 +61,8 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; @@ -297,7 +300,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion()); + AffinityTopologyVersion resTopVer = exchFut.context().events().topologyVersion(); + + List<List<ClusterNode>> aff = grp.affinity().readyAssignments(resTopVer); if (grp.affinityNode()) { ClusterNode loc = ctx.localNode(); @@ -306,15 +311,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - assert topVer.equals(exchFut.topologyVersion()) : - "Invalid topology [topVer=" + topVer + - ", grp=" + grp.cacheOrGroupName() + - ", futVer=" + exchFut.topologyVersion() + - ", fut=" + exchFut + ']'; - assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : + assert grp.affinity().lastVersion().equals(resTopVer) : "Invalid affinity [topVer=" + grp.affinity().lastVersion() + ", grp=" + grp.cacheOrGroupName() + - ", futVer=" + exchFut.topologyVersion() + + ", futVer=" + resTopVer + ", fut=" + exchFut + ']'; int num = grp.affinity().partitions(); @@ -433,18 +433,22 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return; - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + ExchangeDiscoveryEvents evts = exchFut.context().events(); - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + - ", exchId=" + exchId + ']'; + assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" + topVer + + ", exchId=" + exchFut.exchangeId() + ']'; - if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent()) - removeNode(exchId.nodeId()); + topVer = evts.topologyVersion(); + + for (DiscoveryEvent evt : evts.events()) { + if ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) && !CU.clientNode(evt.eventNode())) + removeNode(evt.eventNode().id()); + } ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (log.isDebugEnabled()) { - log.debug("Partition map beforeExchange [exchId=" + exchId + + log.debug("Partition map beforeExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); } @@ -461,7 +465,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Created brand new full topology map on oldest node [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); } else if (!node2part.valid()) { node2part = new GridDhtPartitionFullMap(oldest.id(), @@ -471,7 +475,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { false); if (log.isDebugEnabled()) { - log.debug("Created new full topology map on oldest node [exchId=" + exchId + + log.debug("Created new full topology map on oldest node [exchId=" + exchFut.exchangeId() + ", fullMap=" + node2part + ']'); } } @@ -484,7 +488,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) { log.debug("Copied old map into new map on oldest node (previous oldest node left) [" + - "exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); } } } @@ -504,7 +508,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { consistencyCheck(); if (log.isDebugEnabled()) { - log.debug("Partition map after beforeExchange [exchId=" + exchId + + log.debug("Partition map after beforeExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); } } @@ -533,7 +537,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { int num = grp.affinity().partitions(); - AffinityTopologyVersion topVer = exchFut.topologyVersion(); + AffinityTopologyVersion topVer = exchFut.context().events().topologyVersion(); assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " + "[topVer=" + topVer + @@ -546,9 +550,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return false; - assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchFut.exchangeId() + ']'; - if (log.isDebugEnabled()) log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); @@ -1113,6 +1114,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return false; + if (exchangeVer != null) { + assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer; + + topVer = exchangeVer; + } + if (cntrMap != null) { // update local map partition counters for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 20b33e7..466ec03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -100,6 +100,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage return exchId; } + public void exchangeId(GridDhtPartitionExchangeId exchId) { + this.exchId = exchId; + } + /** * @param grpId Cache group ID. * @return Parition update counters. http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 9b2a82b..190a417 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -37,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.CacheEvent; @@ -298,6 +298,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte this.affChangeMsg = affChangeMsg; } + private AffinityTopologyVersion initTopologyVersion() { + return exchId.topologyVersion(); + } + /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { return exchId.topologyVersion(); @@ -520,8 +524,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (exchCtx.canMergeExchanges()) { if (cctx.kernalContext().clientNode() || CU.clientNode(discoEvt.eventNode())) exchange = onClientNodeEvent(crdNode); - else + else { + if (localJoinExchange()) + onServerNodeEvent(crdNode); + exchange = ExchangeType.ALL_2; + } } else { exchange = CU.clientNode(discoEvt.eventNode()) ? onClientNodeEvent(crdNode) : @@ -1235,6 +1243,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + if (exchCtx.canMergeExchanges()) + msg.resultTopologyVersion(exchCtx.events().topologyVersion()); + GridDhtPartitionsFullMessage msgWithAff = null; assert !nodes.contains(cctx.localNode()); @@ -1256,8 +1267,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id()); if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) { - if (msgWithAff == null) - msgWithAff = msg.copyWithAffinity(cachesAff); + if (msgWithAff == null) { + msgWithAff = msg.copy(); + + msgWithAff.cachesAffinity(cachesAff); + } sndMsg = msgWithAff; } @@ -1265,6 +1279,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } try { + GridDhtPartitionsSingleMessage mergedMsg = mergedJoinExchMsgs.get(node.id()); + + if (mergedMsg != null) { + sndMsg = sndMsg.copy(); + + sndMsg.exchangeId(mergedMsg.exchangeId()); + } + cctx.io().send(node, sndMsg, SYSTEM_POOL); } catch (IgniteCheckedException e) { @@ -1310,6 +1332,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!done.compareAndSet(false, true)) return false; + log.info("Finish exchange future [startVer=" + topologyVersion() + ", resVer=" + res + ']'); + + assert res != null || err != null; + if (err == null && !cctx.kernalContext().clientNode() && (serverNodeDiscoveryEvent() || affChangeMsg != null)) { @@ -1317,7 +1343,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!cacheCtx.affinityNode() || cacheCtx.isLocal()) continue; - cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion()); + cacheCtx.continuousQueries().flushBackupQueue(res); } } @@ -1341,7 +1367,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (drCacheCtx.isDrEnabled()) { try { - drCacheCtx.dr().onExchange(topologyVersion(), exchId.isLeft()); + drCacheCtx.dr().onExchange(res, exchId.isLeft()); } catch (IgniteCheckedException e) { U.error(log, "Failed to notify DR: " + e, e); @@ -1365,9 +1391,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte tryToPerformLocalSnapshotOperation(); - cctx.cache().onExchangeDone(exchId.topologyVersion(), exchActions, err); + cctx.cache().onExchangeDone(res, exchActions, err); - cctx.exchange().onExchangeDone(this, err); + cctx.exchange().onExchangeDone(res, err); if (exchActions != null && err == null) exchActions.completeRequestFutures(cctx); @@ -1394,7 +1420,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (err == null) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) - grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false); + grp.topology().onExchangeDone(grp.affinity().readyAffinity(res), false); } } @@ -1459,31 +1485,49 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private GridDhtPartitionsExchangeFuture mergedWith; /** */ - private GridDhtPartitionsSingleMessage pendingSingleMsg; + private GridDhtPartitionsSingleMessage pendingSrvJoinMsg; /** */ private Map<ClusterNode, GridDhtPartitionsSingleMessage> pendingClientMsgs; - private void addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + private boolean addMergedJoinExchange(UUID nodeId, GridDhtPartitionsSingleMessage msg) { if (mergedJoinExchMsgs == null) mergedJoinExchMsgs = new LinkedHashMap<>(); - if (msg != null) + boolean wait = false; + + if (msg != null) { + log.info("Merge server join exchange, message received [curFut=" + topologyVersion() + + ", node=" + nodeId + ']'); + mergedJoinExchMsgs.put(nodeId, msg); + } else { - if (cctx.discovery().alive(nodeId)) + if (cctx.discovery().alive(nodeId)) { + log.info("Merge server join exchange, wait for message [curFut=" + topologyVersion() + + ", node=" + nodeId + ']'); + + wait = true; + awaitMergedMsgs++; - else + } + else { + log.info("Merge server join exchange, awaited node left [curFut=" + topologyVersion() + + ", node=" + nodeId + ']'); + mergedJoinExchMsgs.put(nodeId, null); + } } + + return wait; } /** * @param fut Current exchange to merge with. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - public void mergeServerJoinExchange(final GridDhtPartitionsExchangeFuture fut) { - log.info("Merge exchange future [fut=" + topologyVersion() + ", mergeWith=" + fut.topologyVersion() + ']'); + public boolean mergeServerJoinExchange(final GridDhtPartitionsExchangeFuture fut) { + boolean wait; synchronized (this) { assert !isDone(); @@ -1493,17 +1537,31 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte mergedWith = fut; - fut.addMergedJoinExchange(discoEvt.eventNode().id(), pendingSingleMsg); + wait = fut.addMergedJoinExchange(discoEvt.eventNode().id(), pendingSrvJoinMsg); // TODO 5578 client messages. } + + return wait; } + /** + * @param node + * @param msg + */ void onReceiveMerged(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { boolean done = false; synchronized (this) { - if (mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id())) { + boolean process = mergedJoinExchMsgs != null && !mergedJoinExchMsgs.containsKey(node.id()); + + log.info("Merge server join exchange, received message [curFut=" + topologyVersion() + + ", node=" + node.id() + + ", msgVer=" + msg.exchangeId().topologyVersion() + + ", process=" + process + + ", awaited=" + awaitMergedMsgs + ']'); + + if (process) { mergedJoinExchMsgs.put(node.id(), msg); assert awaitMergedMsgs > 0 : awaitMergedMsgs; @@ -1514,9 +1572,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (done) { - - } + if (done) + finishExchangeOnCoordinator(); } /** @@ -1549,9 +1606,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else { if (msg.client()) { assert false; + + // TODO IGNITE-5578 + } + else if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) { + assert !CU.clientNode(node) : node; + + pendingSrvJoinMsg = msg; } - else if (exchangeId().isJoined() && node.id().equals(exchId.nodeId())) - pendingSingleMsg = msg; } } @@ -1869,8 +1931,30 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (exchCtx.canMergeExchanges()) { - cctx.exchange().mergeExchanges(this); + boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this); + + if (!finish) + return; + } + + finishExchangeOnCoordinator(); + } + catch (IgniteCheckedException e) { + if (reconnectOnError(e)) + onDone(new IgniteNeedReconnectException(cctx.localNode(), e)); + else + onDone(e); + } + } + + /** + * + */ + private void finishExchangeOnCoordinator() { + try { + log.info("finishExchangeOnCoordinator [topVer=" + topologyVersion() + ", resVer=" + exchCtx.events().topologyVersion() + ']'); + if (exchCtx.canMergeExchanges()) { cctx.affinity().onTopologyChange(this, true); for (CacheGroupContext grp : cctx.cache().cacheGroups()) { @@ -1881,6 +1965,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + synchronized (this) { + if (mergedJoinExchMsgs != null) { + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) { + if (e.getValue() != null) + msgs.put(e.getKey(), e.getValue()); + } + } + } + + AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion(); + Map<Integer, CacheGroupAffinityMessage> cachesAff = null; for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { @@ -1905,7 +2000,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (affReq != null) { cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, - topologyVersion(), + resTopVer, affReq, cachesAff); @@ -1920,12 +2015,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (partMap == null) { partMap = new GridDhtPartitionMap(nodeId, 1L, - topologyVersion(), + resTopVer, new GridPartitionStateMap(), false); } - AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(topologyVersion()); + AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(resTopVer); for (int p = 0; p < aff.assignment().size(); p++) { if (aff.getIds(p).contains(nodeId)) @@ -1987,6 +2082,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte srvNodes.remove(cctx.localNode()); nodes = new ArrayList<>(srvNodes); + + if (mergedJoinExchMsgs != null) { + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) { + if (e.getValue() != null) { + ClusterNode node = cctx.discovery().node(e.getKey()); + + if (node != null) + nodes.add(node); + } + } + } } IgniteCheckedException err = null; @@ -2016,7 +2122,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (!nodes.isEmpty()) sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null); - onDone(exchangeId().topologyVersion(), err); + onDone(exchCtx.events().topologyVersion(), err); } } catch (IgniteCheckedException e) { @@ -2166,98 +2272,100 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param msg Message. */ private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { - assert exchId.equals(msg.exchangeId()) : msg; - assert msg.lastVersion() != null : msg; + try { + assert exchId.equals(msg.exchangeId()) : msg; + assert msg.lastVersion() != null : msg; - synchronized (this) { - if (crd == null || finishState != null) - return; + synchronized (this) { + if (crd == null || finishState != null) + return; - if (!crd.equals(node)) { - if (log.isDebugEnabled()) - log.debug("Received full partition map from unexpected node [oldest=" + crd.id() + - ", nodeId=" + node.id() + ']'); + if (!crd.equals(node)) { + if (log.isDebugEnabled()) + log.debug("Received full partition map from unexpected node [oldest=" + crd.id() + + ", nodeId=" + node.id() + ']'); - if (node.order() > crd.order()) - fullMsgs.put(node, msg); + if (node.order() > crd.order()) + fullMsgs.put(node, msg); - return; + return; + } + + finishState = new FinishState(crd.id(), msg.resultTopologyVersion()); } - finishState = new FinishState(crd.id(), msg.resultTopologyVersion()); - } + if (exchCtx.canMergeExchanges()) { + if (msg.resultTopologyVersion() != null && !initTopologyVersion().equals(msg.resultTopologyVersion())) { + log.info("Received full message, need merge [curFut=" + topologyVersion() + + ", resVer=" + msg.resultTopologyVersion() + ']'); - if (exchCtx.canMergeExchanges()) { - try { - onServerNodeEvent(true); + cctx.exchange().mergeExchanges(this, msg.resultTopologyVersion()); + } - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal() || cacheGroupStopping(grp.groupId())) - continue; + if (localJoinExchange()) { + Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin(); - grp.topology().beforeExchange(this, true); - } - } - catch (IgniteCheckedException e) { - // TODO 5578. - U.error(log, "Failed: " + e, e); - } - } + ExchangeDiscoveryEvents evts = exchCtx.events(); - Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin(); + Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); - if (localJoinExchange() && affReq != null) { - Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); + Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity(); - Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity(); + assert !F.isEmpty(cachesAff) : msg; + assert cachesAff.size() >= affReq.size(); - assert !F.isEmpty(cachesAff) : msg; - assert cachesAff.size() >= affReq.size(); + int cnt = 0; - int cnt = 0; + for (CacheGroupAffinityMessage aff : cachesAff) { + if (affReq.contains(aff.groupId())) { + CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); - for (CacheGroupAffinityMessage aff : cachesAff) { - if (affReq.contains(aff.groupId())) { - CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); + assert grp != null : aff.groupId(); + assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion()); - assert grp != null : aff.groupId(); - assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion()); + List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, evts.discoveryCache()); - List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache); + // Calculate ideal assignments. + if (!grp.affinity().centralizedAffinityFunction()) + grp.affinity().calculate(evts.topologyVersion(), evts.lastEvent(), evts.discoveryCache()); - // Calculate ideal assignments. - if (!grp.affinity().centralizedAffinityFunction()) - grp.affinity().calculate(topologyVersion(), discoEvt, discoCache); + grp.affinity().initialize(evts.topologyVersion(), assignments); - grp.affinity().initialize(topologyVersion(), assignments); + grp.topology().initPartitions(this); - try { - grp.topology().initPartitions(this); + cnt++; + } } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Interrupted when initialize local partitions."); - return; - } + assert affReq.size() == cnt : cnt; + } + else { + cctx.affinity().onTopologyChange(this, false); + + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal() || cacheGroupStopping(grp.groupId())) + continue; - cnt++; + grp.topology().beforeExchange(this, true); + } } } - assert affReq.size() == cnt : cnt; - } + updatePartitionFullMap(msg); - updatePartitionFullMap(msg); + IgniteCheckedException err = null; - IgniteCheckedException err = null; + if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) { + err = new IgniteCheckedException("Cluster state change failed"); - if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap())) { - err = new IgniteCheckedException("Cluster state change failed"); + cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest()); + } - cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest()); + onDone(exchCtx.events().topologyVersion(), err); + } + catch (IgniteCheckedException e) { + onDone(e); } - - onDone(exchId.topologyVersion(), err); } /** @@ -2280,7 +2388,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte CacheGroupContext grp = cctx.cache().cacheGroup(grpId); if (grp != null) { - grp.topology().update(topologyVersion(), + grp.topology().update(exchCtx.events().topologyVersion(), entry.getValue(), cntrMap, msg.partsToReload(cctx.localNodeId(), grpId)); @@ -2289,7 +2397,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) { - cctx.exchange().clientTopology(grpId, this).update(topologyVersion(), + cctx.exchange().clientTopology(grpId, this).update(exchCtx.events().topologyVersion(), entry.getValue(), cntrMap, Collections.<Integer>emptySet()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 8a5dbbb..8c11ff2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -152,6 +152,11 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa cp.partsToReloadBytes = partsToReloadBytes; cp.topVer = topVer; cp.cachesAff = cachesAff; + cp.resTopVer = resTopVer; + } + + public void resultTopologyVersion(AffinityTopologyVersion resTopVer) { + this.resTopVer = resTopVer; } AffinityTopologyVersion resultTopologyVersion() { @@ -159,18 +164,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** - * @param cachesAff Affinity. * @return Message copy. */ - GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) { - assert !F.isEmpty(cachesAff) : cachesAff; - + GridDhtPartitionsFullMessage copy() { GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage(); copyStateTo(cp); - cp.cachesAff = cachesAff; - return cp; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3e22eac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 7efd4aa..37de068 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -188,7 +188,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { int partCnt = grp.affinity().partitions(); - assert exchFut == null || exchFut.topologyVersion().equals(top.topologyVersion()) : + assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.topologyVersion()) : "Topology version mismatch [exchId=" + exchId + ", grp=" + grp.name() + ", topVer=" + top.topologyVersion() + ']'; @@ -242,7 +242,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (msg == null) { assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), - exchId.topologyVersion(), + assigns.topologyVersion(), grp.groupId())); } @@ -309,7 +309,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (msg == null) { assigns.put(n, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), - exchId.topologyVersion(), + assigns.topologyVersion(), grp.groupId())); }
