Repository: ignite
Updated Branches:
  refs/heads/ignite-4296 a9c5205d8 -> eb57fe16d


ignite-4296


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb57fe16
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb57fe16
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb57fe16

Branch: refs/heads/ignite-4296
Commit: eb57fe16de3f2d6222f4bb58936fd4846e462dc9
Parents: a9c5205
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Dec 1 15:09:17 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Dec 1 17:33:25 2016 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       | 125 ++++++++++---------
 1 file changed, 66 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eb57fe16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index b8d6b83..15c41e8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -44,7 +44,6 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -341,9 +340,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-            initPartitions0(oldest, exchFut, updateSeq);
+            initPartitions0(currentCoordinator(), exchFut, updateSeq);
 
             consistencyCheck();
         }
@@ -353,12 +350,21 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     }
 
     /**
+     * @return Coordinator node ID.
+     */
+    private UUID currentCoordinator() {
+        ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+        return oldest != null ? oldest.id() : null;
+    }
+
+    /**
      * @param oldest Oldest server node.
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
      */
-    private void initPartitions0(ClusterNode oldest, 
GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
-        ClusterNode loc = cctx.localNode();
+    private void initPartitions0(UUID oldest, GridDhtPartitionsExchangeFuture 
exchFut, long updateSeq) {
+        UUID locId = cctx.localNodeId();
 
         assert oldest != null || cctx.kernalContext().clientNode();
 
@@ -382,7 +388,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         if (cctx.rebalanceEnabled()) {
             boolean added = exchFut.isCacheAdded(cctx.cacheId(), 
exchId.topologyVersion());
 
-            boolean first = (loc.equals(oldest) && 
loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
+            boolean first = (locId.equals(oldest) && 
locId.equals(exchId.nodeId()) && exchId.isJoined()) || added;
 
             if (first) {
                 assert exchId.isJoined() || added;
@@ -399,7 +405,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         if (log.isDebugEnabled())
                             log.debug("Owned partition for oldest node: " + 
locPart);
 
-                        updateLocal(oldest, p, loc.id(), locPart.state(), 
updateSeq);
+                        updateSeq = updateLocal(oldest, p, locPart.state(), 
updateSeq);
                     }
                 }
             }
@@ -421,7 +427,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         if (state.active()) {
                             locPart.rent(false);
 
-                            updateLocal(oldest, p, loc.id(), locPart.state(), 
updateSeq);
+                            updateSeq = updateLocal(oldest, p, 
locPart.state(), updateSeq);
 
                             if (log.isDebugEnabled())
                                 log.debug("Evicting partition with rebalancing 
disabled " +
@@ -445,9 +451,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      * @param updateSeq Update sequence.
      */
-    private void createPartitions(ClusterNode oldest, List<List<ClusterNode>> 
aff, long updateSeq) {
-        ClusterNode loc = cctx.localNode();
-
+    private void createPartitions(UUID oldest, List<List<ClusterNode>> aff, 
long updateSeq) {
         int num = cctx.affinity().partitions();
 
         for (int p = 0; p < num; p++) {
@@ -457,7 +461,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     // will be created in MOVING state.
                     GridDhtLocalPartition locPart = createPartition(p);
 
-                    updateLocal(oldest, p, loc.id(), locPart.state(), 
updateSeq);
+                    updateSeq = updateLocal(oldest, p, locPart.state(), 
updateSeq);
                 }
             }
             // If this node's map is empty, we pre-create local partitions,
@@ -526,11 +530,11 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             }
 
             if (affReady)
-                initPartitions0(oldest, exchFut, updateSeq);
+                initPartitions0(oldest != null ? oldest.id() : null, exchFut, 
updateSeq);
             else {
                 List<List<ClusterNode>> aff = 
cctx.affinity().idealAssignment();
 
-                createPartitions(oldest, aff, updateSeq);
+                createPartitions(oldest != null ? oldest.id() : null, aff, 
updateSeq);
             }
 
             consistencyCheck();
@@ -551,8 +555,6 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture 
exchFut) throws IgniteCheckedException {
         boolean changed = waitForRent();
 
-        ClusterNode loc = cctx.localNode();
-
         int num = cctx.affinity().partitions();
 
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
@@ -577,7 +579,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(topVer);
+            UUID oldest = currentCoordinator();
 
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, 
false, false);
@@ -605,7 +607,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 assert owned : "Failed to own partition 
[cacheName" + cctx.name() + ", locPart=" +
                                     locPart + ']';
 
-                                updateLocal(oldest, p, loc.id(), 
locPart.state(), updateSeq);
+                                updateSeq = updateLocal(oldest, p, 
locPart.state(), updateSeq);
 
                                 changed = true;
 
@@ -625,7 +627,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                     locPart + ", owners = " + owners + ']');
                         }
                         else
-                            updateLocal(oldest, p, loc.id(), locPart.state(), 
updateSeq);
+                            updateSeq = updateLocal(oldest, p, 
locPart.state(), updateSeq);
                     }
                 }
                 else {
@@ -635,7 +637,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         if (state == MOVING) {
                             locPart.rent(false);
 
-                            updateLocal(oldest, p, loc.id(), locPart.state(), 
updateSeq);
+                            updateSeq = updateLocal(oldest, p, 
locPart.state(), updateSeq);
 
                             changed = true;
 
@@ -808,8 +810,11 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 map.put(i, part.state());
             }
 
-            return new GridDhtPartitionMap2(cctx.nodeId(), updateSeq.get(), 
topVer,
-                Collections.unmodifiableMap(map), true);
+            return new GridDhtPartitionMap2(cctx.nodeId(),
+                updateSeq.get(),
+                topVer,
+                Collections.unmodifiableMap(map),
+                true);
         }
         finally {
             lock.readLock().unlock();
@@ -1041,23 +1046,24 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 return false;
             }
 
-            final long updateSeq = this.updateSeq.incrementAndGet();
-
             if (exchId != null)
                 lastExchangeId = exchId;
 
             if (node2part != null) {
-                for (GridDhtPartitionMap2 part : node2part.values()) {
-                    GridDhtPartitionMap2 newPart = partMap.get(part.nodeId());
+                for (Map.Entry<UUID, GridDhtPartitionMap2> e : 
node2part.entrySet()) {
+                    GridDhtPartitionMap2 newPart = partMap.get(e.getKey());
+
+                    if (newPart == null)
+                        continue;
 
-                    // If for some nodes current partition has a newer map,
-                    // then we keep the newer value.
-                    if (newPart != null &&
-                        (newPart.updateSequence() < part.updateSequence() || (
+                    GridDhtPartitionMap2 part = e.getValue();
+
+                    assert part.nodeId().equals(e.getKey());
+
+                    if (newPart.updateSequence() < part.updateSequence() || (
                             cctx.startTopologyVersion() != null &&
                                 newPart.topologyVersion() != null && // 
Backward compatibility.
-                                
cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0))
-                        ) {
+                                
cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0)) {
                         if (log.isDebugEnabled())
                             log.debug("Overriding partition map in full update 
map [exchId=" + exchId + ", curPart=" +
                                 mapString(part) + ", newPart=" + 
mapString(newPart) + ']');
@@ -1098,6 +1104,8 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             part2node = p2n;
 
+            final long updateSeq = this.updateSeq.incrementAndGet();
+
             boolean changed = false;
 
             AffinityTopologyVersion affVer = 
cctx.affinity().affinityTopologyVersion();
@@ -1105,9 +1113,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             if (!affVer.equals(AffinityTopologyVersion.NONE) && 
affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = 
cctx.affinity().assignments(topVer);
 
-                ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-                changed = checkEvictions(oldest, updateSeq, aff);
+                changed = checkEvictions(currentCoordinator(), updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1251,9 +1257,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         if (!affVer.equals(AffinityTopologyVersion.NONE) && 
affVer.compareTo(topVer) >= 0) {
             List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-            ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-            changed = checkEvictions(oldest, updateSeq, aff);
+            changed = checkEvictions(currentCoordinator(), updateSeq, aff);
 
             updateRebalanceVersion(aff);
         }
@@ -1283,7 +1287,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(ClusterNode oldest, final long updateSeq, 
List<List<ClusterNode>> aff) {
+    private boolean checkEvictions(final UUID oldest, long updateSeq, 
List<List<ClusterNode>> aff) {
         boolean changed = false;
 
         UUID locId = cctx.nodeId();
@@ -1306,7 +1310,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
                         part.rent(false);
 
-                        updateLocal(oldest, part.id(), locId, part.state(), 
updateSeq);
+                        updateSeq = updateLocal(oldest, part.id(), 
part.state(), updateSeq);
 
                         changed = true;
 
@@ -1331,7 +1335,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                 if (locId.equals(n.id())) {
                                     part.rent(false);
 
-                                    updateLocal(oldest, part.id(), locId, 
part.state(), updateSeq);
+                                    updateSeq = updateLocal(oldest, part.id(), 
part.state(), updateSeq);
 
                                     changed = true;
 
@@ -1356,17 +1360,16 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
      *
      * @param oldest Oldest server node.
      * @param p Partition.
-     * @param nodeId Node ID.
      * @param state State.
      * @param updateSeq Update sequence.
+     * @return Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private void updateLocal(ClusterNode oldest, int p, UUID nodeId, 
GridDhtPartitionState state, long updateSeq) {
-        assert nodeId.equals(cctx.nodeId());
+    private long updateLocal(final UUID oldest, int p, GridDhtPartitionState 
state, long updateSeq) {
         assert oldest != null || cctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
-        if (cctx.localNode().equals(oldest)) {
+        if (cctx.localNodeId().equals(oldest)) {
             long seq = node2part.updateSequence();
 
             if (seq != updateSeq) {
@@ -1388,11 +1391,19 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
             }
         }
 
-        GridDhtPartitionMap2 map = node2part.get(nodeId);
+        UUID locNodeId = cctx.localNodeId();
+
+        GridDhtPartitionMap2 map = node2part.get(locNodeId);
 
-        if (map == null)
-            node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, 
updateSeq, topVer,
-                Collections.<Integer, GridDhtPartitionState>emptyMap(), 
false));
+        if (map == null) {
+            map = new GridDhtPartitionMap2(locNodeId,
+                updateSeq,
+                topVer,
+                Collections.<Integer, GridDhtPartitionState>emptyMap(),
+                false);
+
+            node2part.put(locNodeId, map);
+        }
 
         map.updateSequence(updateSeq, topVer);
 
@@ -1403,7 +1414,9 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         if (ids == null)
             part2node.put(p, ids = U.newHashSet(3));
 
-        ids.add(nodeId);
+        ids.add(locNodeId);
+
+        return updateSeq;
     }
 
     /**
@@ -1449,15 +1462,11 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean own(GridDhtLocalPartition part) {
-        ClusterNode loc = cctx.localNode();
-
         lock.writeLock().lock();
 
         try {
             if (part.own()) {
-                ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-                updateLocal(oldest, part.id(), loc.id(), part.state(), 
updateSeq.incrementAndGet());
+                updateLocal(currentCoordinator(), part.id(), part.state(), 
updateSeq.incrementAndGet());
 
                 consistencyCheck();
 
@@ -1485,9 +1494,7 @@ class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
             long seq = updateSeq ? this.updateSeq.incrementAndGet() : 
this.updateSeq.get();
 
-            ClusterNode oldest = 
cctx.discovery().oldestAliveCacheServerNode(topVer);
-
-            updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), 
seq);
+            updateLocal(currentCoordinator(), part.id(), part.state(), seq);
 
             consistencyCheck();
         }

Reply via email to