Optimizations from ignite-5068.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58b6e05e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58b6e05e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58b6e05e Branch: refs/heads/ignite-5398 Commit: 58b6e05e82978c68ba9e2e53a3b9b866e2c474ca Parents: 52556f4 Author: Ilya Lantukh <[email protected]> Authored: Fri Apr 28 19:57:32 2017 +0300 Committer: Ilya Lantukh <[email protected]> Committed: Fri Apr 28 19:57:32 2017 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 5 + .../dht/GridDhtPartitionTopology.java | 6 + .../dht/GridDhtPartitionTopologyImpl.java | 473 +++++++++++-------- .../GridDhtPartitionsExchangeFuture.java | 9 + 4 files changed, 302 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 5c5a3c4..e71c18f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -778,6 +778,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void onExchangeDone(AffinityAssignment assignment) { + // no-op + } + + /** {@inheritDoc} */ @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) { assert false : "detectLostPartitions should never be called on client topology"; http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 2bef267..5df8714 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -326,4 +326,10 @@ public interface GridDhtPartitionTopology { * @return Set of node IDs that should reload partitions. */ public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq); + + /** + * Callback on exchange done. + * @param assignment New affinity assignment. + */ + public void onExchangeDone(AffinityAssignment assignment); } http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index fb09b38..d1283c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -79,6 +79,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh private static final boolean FULL_MAP_DEBUG = false; /** */ + private static final boolean FAST_DIFF_REBUILD = true; + + /** */ private static final Long ZERO = 0L; /** Context. */ @@ -93,8 +96,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** Node to partition map. */ private GridDhtPartitionFullMap node2part; - /** Partition to node map. */ - private final Map<Integer, Set<UUID>> part2node; + /** */ + private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>(); + + /** */ + private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE; /** */ private GridDhtPartitionExchangeId lastExchangeId; @@ -142,8 +148,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh log = cctx.logger(getClass()); locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions()); - - part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f); } /** {@inheritDoc} */ @@ -160,7 +164,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh try { node2part = null; - part2node.clear(); + diffFromAffinity.clear(); lastExchangeId = null; @@ -168,6 +172,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh topReadyFut = null; + diffFromAffinityVer = AffinityTopologyVersion.NONE; + rebalancedTopVer = AffinityTopologyVersion.NONE; topVer = AffinityTopologyVersion.NONE; @@ -863,18 +869,42 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh List<ClusterNode> nodes = null; - Collection<UUID> nodeIds = part2node.get(p); + if (!topVer.equals(diffFromAffinityVer)) { + log.error("??? node2part [topVer=" + topVer + ", diffVer=" + diffFromAffinityVer + "]"); + + nodes = new ArrayList<>(); + + nodes.addAll(affNodes); - if (!F.isEmpty(nodeIds)) { - for (UUID nodeId : nodeIds) { - HashSet<UUID> affIds = affAssignment.getIds(p); + for (Map.Entry<UUID, GridDhtPartitionMap2> entry : node2part.entrySet()) { + GridDhtPartitionState state = entry.getValue().get(p); - if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) { + ClusterNode n = cctx.discovery().node(entry.getKey()); + + if (n != null && state != null && (state == MOVING || state == OWNING) && !nodes.contains(n) + && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { + nodes.add(n); + } + + } + + return nodes; + } + + Collection<UUID> diffIds = diffFromAffinity.get(p); + + if (!F.isEmpty(diffIds)) { + HashSet<UUID> affIds = affAssignment.getIds(p); + + for (UUID nodeId : diffIds) { + assert !affIds.contains(nodeId); + + if (hasState(p, nodeId, OWNING, MOVING)) { ClusterNode n = cctx.discovery().node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) { - nodes = new ArrayList<>(affNodes.size() + 2); + nodes = new ArrayList<>(affNodes.size() + diffIds.size()); nodes.addAll(affNodes); } @@ -903,7 +933,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null; + Collection<UUID> allIds = F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())); lock.readLock().lock(); @@ -913,20 +943,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh ", node2part=" + node2part + ", cache=" + cctx.name() + ']'; - Collection<UUID> nodeIds = part2node.get(p); - // Node IDs can be null if both, primary and backup, nodes disappear. - int size = nodeIds == null ? 0 : nodeIds.size(); - - if (size == 0) - return Collections.emptyList(); - - List<ClusterNode> nodes = new ArrayList<>(size); - - for (UUID id : nodeIds) { - if (topVer.topologyVersion() > 0 && !F.contains(allIds, id)) - continue; + List<ClusterNode> nodes = new ArrayList<>(); + for (UUID id : allIds) { if (hasState(p, id, state, states)) { ClusterNode n = cctx.discovery().node(id); @@ -1098,30 +1118,39 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh node2part = partMap; - part2node.clear(); + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + + if (diffFromAffinityVer.compareTo(affVer) <= 0) { + AffinityAssignment affAssignment = cctx.affinity().assignment(affVer); - for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) { - for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { - if (e0.getValue() != MOVING && e0.getValue() != OWNING) - continue; + for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) { + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + int p = e0.getKey(); - int p = e0.getKey(); + Set<UUID> diffIds = diffFromAffinity.get(p); - Set<UUID> ids = part2node.get(p); + if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) && + !affAssignment.getIds(p).contains(e.getKey())) { - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partitions. - part2node.put(p, ids = U.newHashSet(3)); + if (diffIds == null) + diffFromAffinity.put(p, diffIds = U.newHashSet(3)); - ids.add(e.getKey()); + diffIds.add(e.getKey()); + } + else { + if (diffIds != null && diffIds.remove(e.getKey())) { + if (diffIds.isEmpty()) + diffFromAffinity.remove(p); + } + } + } } + + diffFromAffinityVer = affVer; } boolean changed = false; - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); - GridDhtPartitionMap2 nodeMap = partMap.get(cctx.localNodeId()); if (nodeMap != null && cctx.shared().database().persistenceEnabled()) { @@ -1296,37 +1325,52 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh node2part.put(parts.nodeId(), parts); - // Add new mappings. - for (Map.Entry<Integer,GridDhtPartitionState> e : parts.entrySet()) { - int p = e.getKey(); + AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); - Set<UUID> ids = part2node.get(p); + if (affVer.compareTo(diffFromAffinityVer) >= 0) { + AffinityAssignment affAssignment = cctx.affinity().assignment(affVer); - if (e.getValue() == MOVING || e.getValue() == OWNING) { - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + // Add new mappings. + for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { + int p = e.getKey(); - changed |= ids.add(parts.nodeId()); - } - else { - if (ids != null) - changed |= ids.remove(parts.nodeId()); + Set<UUID> diffIds = diffFromAffinity.get(p); + + if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING) + && !affAssignment.getIds(p).contains(parts.nodeId())) { + if (diffIds == null) + diffFromAffinity.put(p, diffIds = U.newHashSet(3)); + + if (diffIds.add(parts.nodeId())) + changed = true; + } + else { + if (diffIds != null && diffIds.remove(parts.nodeId())) { + changed = true; + + if (diffIds.isEmpty()) + diffFromAffinity.remove(p); + } + + } } - } - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set<UUID> ids = part2node.get(p); + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { + Set<UUID> ids = diffFromAffinity.get(p); - if (ids != null) - changed |= ids.remove(parts.nodeId()); + if (ids != null && ids.remove(parts.nodeId())) { + changed = true; + + if (ids.isEmpty()) + diffFromAffinity.remove(p); + } + } } - } - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + diffFromAffinityVer = affVer; + } if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer); @@ -1352,44 +1396,96 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } /** {@inheritDoc} */ - @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) { + @Override public void onExchangeDone(AffinityAssignment assignment) { lock.writeLock().lock(); try { - int parts = cctx.affinity().partitions(); + if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) + rebuildDiff(assignment); + } + finally { + lock.writeLock().unlock(); + } + } + + /** + * @param affAssignment New affinity assignment. + */ + private void rebuildDiff(AffinityAssignment affAssignment) { + assert lock.isWriteLockedByCurrentThread(); - Collection<Integer> lost = null; + if (node2part == null) + return; - for (int p = 0; p < parts; p++) { - boolean foundOwner = false; + if (FAST_DIFF_REBUILD) { + Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.name(), affAssignment.topologyVersion())); - Set<UUID> nodeIds = part2node.get(p); + for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) { + int p = e.getKey(); + + Iterator<UUID> iter = e.getValue().iterator(); + + while (iter.hasNext()) { + UUID nodeId = iter.next(); - if (nodeIds != null) { - for (UUID nodeId : nodeIds) { - GridDhtPartitionMap2 partMap = node2part.get(nodeId); + if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId)) + iter.remove(); + } + } + } + else { + for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) { + UUID nodeId = e.getKey(); - GridDhtPartitionState state = partMap.get(p); + for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { + int p0 = e0.getKey(); - if (state == OWNING) { - foundOwner = true; + GridDhtPartitionState state = e0.getValue(); - break; - } + Set<UUID> ids = diffFromAffinity.get(p0); + + if ((state == MOVING || state == OWNING) && !affAssignment.getIds(p0).contains(nodeId)) { + if (ids == null) + diffFromAffinity.put(p0, ids = U.newHashSet(3)); + + ids.add(nodeId); + } + else { + if (ids != null) + ids.remove(nodeId); } } + } + } + + diffFromAffinityVer = affAssignment.topologyVersion(); + } + + /** {@inheritDoc} */ + @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) { + lock.writeLock().lock(); + + try { + if (node2part == null) + return false; + + int parts = cctx.affinity().partitions(); - if (!foundOwner) { - if (lost == null) - lost = new HashSet<>(parts - p, 1.0f); + Set<Integer> lost = new HashSet<>(parts); - lost.add(p); + for (int p = 0; p < parts; p++) + lost.add(p); + + for (GridDhtPartitionMap2 partMap : node2part.values()) { + for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { + if (e.getValue() == OWNING) + lost.remove(e.getKey()); } } boolean changed = false; - if (lost != null) { + if (!lost.isEmpty()) { PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy(); assert plc != null; @@ -1410,16 +1506,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh } // Update map for remote node. else if (plc != PartitionLossPolicy.IGNORE) { - Set<UUID> nodeIds = part2node.get(part); - - if (nodeIds != null) { - for (UUID nodeId : nodeIds) { - GridDhtPartitionMap2 nodeMap = node2part.get(nodeId); - - if (nodeMap.get(part) != EVICTED) - nodeMap.put(part, LOST); - } - } + // TODO +// Set<UUID> nodeIds = part2node.get(part); +// +// if (nodeIds != null) { +// for (UUID nodeId : nodeIds) { +// GridDhtPartitionMap nodeMap = node2part.get(nodeId); +// +// if (nodeMap.get(part) != EVICTED) +// nodeMap.put(part, LOST); +// } +// } } if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) @@ -1440,86 +1537,83 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** {@inheritDoc} */ @Override public void resetLostPartitions() { - lock.writeLock().lock(); - - try { - int parts = cctx.affinity().partitions(); - long updSeq = updateSeq.incrementAndGet(); - - for (int part = 0; part < parts; part++) { - Set<UUID> nodeIds = part2node.get(part); - - if (nodeIds != null) { - boolean lost = false; - - for (UUID node : nodeIds) { - GridDhtPartitionMap2 map = node2part.get(node); - - if (map.get(part) == LOST) { - lost = true; - - break; - } - } - - if (lost) { - GridDhtLocalPartition locPart = localPartition(part, topVer, false); - - if (locPart != null) { - boolean marked = locPart.own(); - - if (marked) - updateLocal(locPart.id(), locPart.state(), updSeq); - } - - for (UUID nodeId : nodeIds) { - GridDhtPartitionMap2 nodeMap = node2part.get(nodeId); - - if (nodeMap.get(part) == LOST) - nodeMap.put(part, OWNING); - } - } - } - } - - checkEvictions(updSeq, cctx.affinity().assignments(topVer)); - - cctx.needsRecovery(false); - } - finally { - lock.writeLock().unlock(); - } + // TODO + +// lock.writeLock().lock(); +// +// try { +// int parts = cctx.affinity().partitions(); +// long updSeq = updateSeq.incrementAndGet(); +// +// for (int part = 0; part < parts; part++) { +// Set<UUID> nodeIds = part2node.get(part); +// +// if (nodeIds != null) { +// boolean lost = false; +// +// for (UUID node : nodeIds) { +// GridDhtPartitionMap2 map = node2part.get(node); +// +// if (map.get(part) == LOST) { +// lost = true; +// +// break; +// } +// } +// +// if (lost) { +// GridDhtLocalPartition locPart = localPartition(part, topVer, false); +// +// if (locPart != null) { +// boolean marked = locPart.own(); +// +// if (marked) +// updateLocal(locPart.id(), locPart.state(), updSeq); +// } +// +// for (UUID nodeId : nodeIds) { +// GridDhtPartitionMap2 nodeMap = node2part.get(nodeId); +// +// if (nodeMap.get(part) == LOST) +// nodeMap.put(part, OWNING); +// } +// } +// } +// } +// +// checkEvictions(updSeq, cctx.affinity().assignments(topVer)); +// +// cctx.needsRecovery(false); +// } +// finally { +// lock.writeLock().unlock(); +// } } /** {@inheritDoc} */ @Override public Collection<Integer> lostPartitions() { + if (cctx.config().getPartitionLossPolicy() == PartitionLossPolicy.IGNORE) + return Collections.emptySet(); + lock.readLock().lock(); try { - Collection<Integer> res = null; + Set<Integer> res = null; int parts = cctx.affinity().partitions(); - for (int part = 0; part < parts; part++) { - Set<UUID> nodeIds = part2node.get(part); - - if (nodeIds != null) { - for (UUID node : nodeIds) { - GridDhtPartitionMap2 map = node2part.get(node); - - if (map.get(part) == LOST) { - if (res == null) - res = new ArrayList<>(parts - part); + for (GridDhtPartitionMap2 partMap : node2part.values()) { + for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { + if (e.getValue() == LOST) { + if (res == null) + res = new HashSet<>(parts); - res.add(part); - - break; - } + res.add(e.getKey()); } } } - return res == null ? Collections.<Integer>emptyList() : res; + return res == null ? Collections.<Integer>emptySet() : res; } finally { lock.readLock().unlock(); @@ -1750,12 +1844,18 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh map.put(p, state); - Set<UUID> ids = part2node.get(p); + if (state == MOVING || state == OWNING) { + AffinityAssignment assignment = cctx.affinity().assignment(diffFromAffinityVer); - if (ids == null) - part2node.put(p, ids = U.newHashSet(3)); + if (!assignment.getIds(p).contains(cctx.localNodeId())) { + Set<UUID> diffIds = diffFromAffinity.get(p); - ids.add(locNodeId); + if (diffIds == null) + diffFromAffinity.put(p, diffIds = U.newHashSet(3)); + + diffIds.add(cctx.localNodeId()); + } + } } return updateSeq; @@ -1787,14 +1887,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (parts != null) { for (Integer p : parts.keySet()) { - Set<UUID> nodeIds = part2node.get(p); + Set<UUID> diffIds = diffFromAffinity.get(p); - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - part2node.remove(p); - } + if (diffIds != null) + diffIds.remove(nodeId); } } @@ -1986,7 +2082,25 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh if (affNodes.isEmpty()) continue; - List<ClusterNode> owners = owners(i); + Set<ClusterNode> owners = U.newHashSet(affNodes.size()); + + for (ClusterNode node : affNodes) { + if (hasState(i, node.id(), OWNING)) + owners.add(node); + } + + Set<UUID> diff = diffFromAffinity.get(i); + + if (diff != null) { + for (UUID nodeId : diff) { + if (hasState(i, nodeId, OWNING)) { + ClusterNode node = cctx.discovery().node(nodeId); + + if (node != null) + owners.add(node); + } + } + } if (affNodes.size() != owners.size() || !owners.containsAll(affNodes)) return; @@ -2035,30 +2149,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Checks consistency after all operations. */ private void consistencyCheck() { - if (CONSISTENCY_CHECK) { - if (node2part == null) - return; - - for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) { - for (Integer p : e.getValue().keySet()) { - Set<UUID> nodeIds = part2node.get(p); - - assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId=" + e.getKey() + ']'; - assert nodeIds.contains(e.getKey()) : "Failed consistency check [part=" + p + ", nodeId=" + - e.getKey() + ", nodeIds=" + nodeIds + ']'; - } - } - - for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) { - for (UUID nodeId : e.getValue()) { - GridDhtPartitionMap2 map = node2part.get(nodeId); - - assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']'; - assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() + - ", nodeId=" + nodeId + ']'; - } - } - } + // no-op } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 078e67b5..28c3956 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1342,6 +1342,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT cctx.database().releaseHistoryForExchange(); + if (err == null && realExchange) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (cacheCtx.isLocal()) + continue; + + cacheCtx.topology().onExchangeDone(cacheCtx.affinity().assignment(topologyVersion())); + } + } + if (super.onDone(res, err) && realExchange) { exchLog.info("exchange finished [topVer=" + topologyVersion() + ", time1=" + duration() +
