Fixed update sequence.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4907f7d8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4907f7d8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4907f7d8 Branch: refs/heads/ignite-6181-1 Commit: 4907f7d87a08fb5ca5d4d74ef8b61db0fd207855 Parents: a01837b Author: Ilya Lantukh <ilant...@gridgain.com> Authored: Mon Aug 28 14:58:50 2017 +0300 Committer: Ilya Lantukh <ilant...@gridgain.com> Committed: Mon Aug 28 14:58:50 2017 +0300 ---------------------------------------------------------------------- .../dht/GridDhtPartitionTopologyImpl.java | 59 +++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4907f7d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index e0f54b3..f7f71a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -521,7 +521,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { DiscoveryEvent evt = evts0.get(i); if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) - updateSeq = removeNode(evt.eventNode().id(), updateSeq); + removeNode(evt.eventNode().id()); } } @@ -1243,6 +1243,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']'); } + + if (newPart.nodeId().equals(ctx.localNodeId())) + updateSeq.setIfGreater(newPart.updateSequence()); } else { // If for some nodes current partition has a newer map, @@ -1272,6 +1275,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } } + else { + GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId()); + + if (locNodeMap != null) + updateSeq.setIfGreater(locNodeMap.updateSequence()); + } if (!fullMapUpdated) { if (log.isDebugEnabled()) { @@ -1379,8 +1388,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); - node2part.newUpdateSequence(updateSeq); - if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); @@ -1537,11 +1544,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionMap cur = node2part.get(parts.nodeId()); if (force) { - if (cur != null && cur.topologyVersion().initialized()) { + if (cur != null && cur.topologyVersion().initialized()) parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); - - this.updateSeq.setIfGreater(cur.updateSequence()); - } } else if (isStaleUpdate(cur, parts)) { U.warn(log, "Stale update for single partition map update (will ignore) [exchId=" + exchId + @@ -1551,6 +1555,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return false; } + long updateSeq = this.updateSeq.incrementAndGet(); + + node2part.newUpdateSequence(updateSeq); + boolean changed = false; if (cur == null || !cur.equals(parts)) @@ -1558,10 +1566,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part.put(parts.nodeId(), parts); - this.updateSeq.setIfGreater(parts.updateSequence()); - - long updateSeq = this.updateSeq.incrementAndGet(); - // During exchange diff is calculated after all messages are received and affinity initialized. if (exchId == null && !grp.isReplicated()) { if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { @@ -1915,14 +1919,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - long seqVal = 0; - - if (updateSeq) { - seqVal = this.updateSeq.incrementAndGet(); - - node2part = new GridDhtPartitionFullMap(node2part, seqVal); - } - for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { GridDhtPartitionMap partMap = e.getValue(); @@ -1938,14 +1934,19 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { result.add(e.getKey()); } - if (updateSeq) - partMap.updateSequence(seqVal, partMap.topologyVersion()); + partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); + + if (partMap.nodeId().equals(ctx.localNodeId())) + this.updateSeq.setIfGreater(partMap.updateSequence()); U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + "[nodeId=" + e.getKey() + ", cacheOrGroupName=" + grp.cacheOrGroupName() + ", partId=" + p + ", haveHistory=" + haveHistory + "]"); } } + + if (updateSeq) + node2part = new GridDhtPartitionFullMap(node2part, this.updateSeq.incrementAndGet()); } finally { lock.writeLock().unlock(); @@ -2115,11 +2116,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param nodeId Node to remove. - * @param updateSeq Update sequence. - * - * @return Update sequence. */ - private long removeNode(UUID nodeId, long updateSeq) { + private void removeNode(UUID nodeId) { assert nodeId != null; assert lock.isWriteLockedByCurrentThread(); @@ -2130,14 +2128,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ClusterNode loc = ctx.localNode(); if (node2part != null) { - assert updateSeq >= node2part.updateSequence(); - - if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) { - updateSeq++; - - node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq, + if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) + node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.get(), node2part, false); - } else node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); @@ -2156,8 +2149,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { consistencyCheck(); } - - return updateSeq; } /** {@inheritDoc} */