Fixed update sequence.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4907f7d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4907f7d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4907f7d8

Branch: refs/heads/ignite-6181-1
Commit: 4907f7d87a08fb5ca5d4d74ef8b61db0fd207855
Parents: a01837b
Author: Ilya Lantukh <ilant...@gridgain.com>
Authored: Mon Aug 28 14:58:50 2017 +0300
Committer: Ilya Lantukh <ilant...@gridgain.com>
Committed: Mon Aug 28 14:58:50 2017 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       | 59 +++++++++-----------
 1 file changed, 25 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4907f7d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index e0f54b3..f7f71a1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -521,7 +521,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                             DiscoveryEvent evt = evts0.get(i);
 
                             if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
-                                updateSeq = removeNode(evt.eventNode().id(), 
updateSeq);
+                                removeNode(evt.eventNode().id());
                         }
                     }
 
@@ -1243,6 +1243,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 ", curPart=" + mapString(part) +
                                 ", newPart=" + mapString(newPart) + ']');
                         }
+
+                        if (newPart.nodeId().equals(ctx.localNodeId()))
+                            updateSeq.setIfGreater(newPart.updateSequence());
                     }
                     else {
                         // If for some nodes current partition has a newer map,
@@ -1272,6 +1275,12 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     }
                 }
             }
+            else {
+                GridDhtPartitionMap locNodeMap = 
partMap.get(ctx.localNodeId());
+
+                if (locNodeMap != null)
+                    updateSeq.setIfGreater(locNodeMap.updateSequence());
+            }
 
             if (!fullMapUpdated) {
                 if (log.isDebugEnabled()) {
@@ -1379,8 +1388,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            node2part.newUpdateSequence(updateSeq);
-
             if (readyTopVer.initialized() && 
readyTopVer.equals(lastTopChangeVer)) {
                 AffinityAssignment  aff = 
grp.affinity().readyAffinity(readyTopVer);
 
@@ -1537,11 +1544,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             GridDhtPartitionMap cur = node2part.get(parts.nodeId());
 
             if (force) {
-                if (cur != null && cur.topologyVersion().initialized()) {
+                if (cur != null && cur.topologyVersion().initialized())
                     parts.updateSequence(cur.updateSequence(), 
cur.topologyVersion());
-
-                    this.updateSeq.setIfGreater(cur.updateSequence());
-                }
             }
             else  if (isStaleUpdate(cur, parts)) {
                 U.warn(log, "Stale update for single partition map update 
(will ignore) [exchId=" + exchId +
@@ -1551,6 +1555,10 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 return false;
             }
 
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            node2part.newUpdateSequence(updateSeq);
+
             boolean changed = false;
 
             if (cur == null || !cur.equals(parts))
@@ -1558,10 +1566,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             node2part.put(parts.nodeId(), parts);
 
-            this.updateSeq.setIfGreater(parts.updateSequence());
-
-            long updateSeq = this.updateSeq.incrementAndGet();
-
             // During exchange diff is calculated after all messages are 
received and affinity initialized.
             if (exchId == null && !grp.isReplicated()) {
                 if (readyTopVer.initialized() && 
readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
@@ -1915,14 +1919,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 }
             }
 
-            long seqVal = 0;
-
-            if (updateSeq) {
-                seqVal = this.updateSeq.incrementAndGet();
-
-                node2part = new GridDhtPartitionFullMap(node2part, seqVal);
-            }
-
             for (Map.Entry<UUID, GridDhtPartitionMap> e : 
node2part.entrySet()) {
                 GridDhtPartitionMap partMap = e.getValue();
 
@@ -1938,14 +1934,19 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         result.add(e.getKey());
                     }
 
-                    if (updateSeq)
-                        partMap.updateSequence(seqVal, 
partMap.topologyVersion());
+                    partMap.updateSequence(partMap.updateSequence() + 1, 
partMap.topologyVersion());
+
+                    if (partMap.nodeId().equals(ctx.localNodeId()))
+                        this.updateSeq.setIfGreater(partMap.updateSequence());
 
                     U.warn(log, "Partition has been scheduled for rebalancing 
due to outdated update counter " +
                         "[nodeId=" + e.getKey() + ", cacheOrGroupName=" + 
grp.cacheOrGroupName() +
                         ", partId=" + p + ", haveHistory=" + haveHistory + 
"]");
                 }
             }
+
+            if (updateSeq)
+                node2part = new GridDhtPartitionFullMap(node2part, 
this.updateSeq.incrementAndGet());
         }
         finally {
             lock.writeLock().unlock();
@@ -2115,11 +2116,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
     /**
      * @param nodeId Node to remove.
-     * @param updateSeq Update sequence.
-     *
-     * @return Update sequence.
      */
-    private long removeNode(UUID nodeId, long updateSeq) {
+    private void removeNode(UUID nodeId) {
         assert nodeId != null;
         assert lock.isWriteLockedByCurrentThread();
 
@@ -2130,14 +2128,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         ClusterNode loc = ctx.localNode();
 
         if (node2part != null) {
-            assert updateSeq >= node2part.updateSequence();
-
-            if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
-                updateSeq++;
-
-                node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), 
updateSeq,
+            if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id()))
+                node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), 
updateSeq.get(),
                     node2part, false);
-            }
             else
                 node2part = new GridDhtPartitionFullMap(node2part, 
node2part.updateSequence());
 
@@ -2156,8 +2149,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             consistencyCheck();
         }
-
-        return updateSeq;
     }
 
     /** {@inheritDoc} */

Reply via email to