Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 93724584a -> f1128f859


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1128f85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1128f85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1128f85

Branch: refs/heads/ignite-5075
Commit: f1128f8594e14011234eaaa50c9b5415f5e63334
Parents: 9372458
Author: sboikov <[email protected]>
Authored: Wed May 10 16:06:53 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed May 10 16:13:31 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 33 ++++-----
 .../GridDhtPartitionsExchangeFuture.java        | 76 +++++++++++---------
 .../GridDhtPartitionsSingleMessage.java         | 12 ++--
 3 files changed, 63 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f1128f85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c5401e0..e426426 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1256,19 +1256,16 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
msg.partitions().entrySet()) {
-                    Integer cacheId = entry.getKey();
+                    Integer grpId = entry.getKey();
 
-                    GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
-
-                    if (cacheCtx != null && !cacheCtx.started())
-                        continue; // Can safely ignore background exchange.
+                    CacheGroupInfrastructure grp = 
cctx.cache().cacheGroup(grpId);
 
                     GridDhtPartitionTopology top = null;
 
-                    if (cacheCtx == null)
-                        top = clientTops.get(cacheId);
-                    else if (!cacheCtx.isLocal())
-                        top = cacheCtx.topology();
+                    if (grp == null)
+                        top = clientTops.get(grpId);
+                    else if (!grp.isLocal())
+                        top = grp.topology();
 
                     if (top != null)
                         updated |= top.update(null, entry.getValue(), null) != 
null;
@@ -1302,25 +1299,25 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                 boolean updated = false;
 
                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : 
msg.partitions().entrySet()) {
-                    Integer cacheId = entry.getKey();
+                    Integer grpId = entry.getKey();
 
-                    GridCacheContext<K, V> cacheCtx = 
cctx.cacheContext(cacheId);
+                    CacheGroupInfrastructure grp = 
cctx.cache().cacheGroup(grpId);
 
-                    if (cacheCtx != null &&
-                        
cacheCtx.cacheStartTopologyVersion().compareTo(entry.getValue().topologyVersion())
 > 0)
+                    if (grp != null &&
+                        
grp.groupStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
                         continue;
 
                     GridDhtPartitionTopology top = null;
 
-                    if (cacheCtx == null)
-                        top = clientTops.get(cacheId);
-                    else if (!cacheCtx.isLocal())
-                        top = cacheCtx.topology();
+                    if (grp == null)
+                        top = clientTops.get(grpId);
+                    else if (!grp.isLocal())
+                        top = grp.topology();
 
                     if (top != null) {
                         updated |= top.update(null, entry.getValue(), null) != 
null;
 
-                        cctx.affinity().checkRebalanceState(top, cacheId);
+                        cctx.affinity().checkRebalanceState(top, grpId);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1128f85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1e656b0..c7457c3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -606,11 +606,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
         try {
             if (crd != null) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (cacheCtx.isLocal())
+                for (CacheGroupInfrastructure grp : 
cctx.cache().cacheGroups()) {
+                    if (grp.isLocal())
                         continue;
 
-                    cacheCtx.topology().beforeExchange(this, !centralizedAff);
+                    grp.topology().beforeExchange(this, !centralizedAff);
                 }
             }
         }
@@ -624,24 +624,24 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void updateTopologies(boolean crd) throws IgniteCheckedException {
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            GridClientPartitionTopology clientTop = 
cctx.exchange().clearClientTopology(cacheCtx.cacheId());
+            GridClientPartitionTopology clientTop = 
cctx.exchange().clearClientTopology(grp.groupId());
 
             long updSeq = clientTop == null ? -1 : 
clientTop.lastUpdateSequence();
 
-            GridDhtPartitionTopology top = cacheCtx.topology();
+            GridDhtPartitionTopology top = grp.topology();
 
             if (crd) {
-                boolean updateTop = 
exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
+                boolean updateTop = 
exchId.topologyVersion().equals(grp.localStartVersion());
 
                 if (updateTop && clientTop != null)
                     top.update(exchId, clientTop.partitionMap(true), 
clientTop.updateCounters(false));
             }
 
-            top.updateTopologyVersion(exchId, this, updSeq, 
stopping(cacheCtx.cacheId()));
+            top.updateTopologyVersion(exchId, this, updSeq, 
cacheGroupStopping(grp.groupId()));
         }
 
         for (GridClientPartitionTopology top : 
cctx.exchange().clientTopologies())
@@ -786,11 +786,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
         assert !cctx.kernalContext().clientNode();
 
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            cacheCtx.preloader().onTopologyChanged(this);
+            grp.preloader().onTopologyChanged(this);
         }
 
         waitPartitionRelease();
@@ -808,8 +808,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                 // Partition release future is done so we can flush the 
write-behind store.
                 cacheCtx.store().forceFlush();
             }
+        }
 
-            cacheCtx.topology().beforeExchange(this, !centralizedAff);
+        for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                continue;
+
+            grp.topology().beforeExchange(this, !centralizedAff);
         }
 
         cctx.database().beforeExchange(this);
@@ -922,11 +927,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      *
      */
     private void onLeft() {
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.isLocal())
+        for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+            if (grp.isLocal())
                 continue;
 
-            cacheCtx.preloader().unwindUndeploys();
+            grp.preloader().unwindUndeploys();
         }
 
         cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), 
exchId.topologyVersion());
@@ -1113,18 +1118,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         boolean realExchange = !dummy && !forcePreload;
 
         if (err == null && realExchange) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
                 try {
                     if (centralizedAff)
-                        cacheCtx.topology().initPartitions(this);
+                        grp.topology().initPartitions(this);
                 }
                 catch (IgniteInterruptedCheckedException e) {
                     U.error(log, "Failed to initialize partitions.", e);
                 }
+            }
 
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 GridCacheContext drCacheCtx = cacheCtx.isNear() ? 
cacheCtx.near().dht().context() : cacheCtx;
 
                 if (drCacheCtx.isDrEnabled()) {
@@ -1177,8 +1184,8 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             initFut.onDone(err == null);
 
             if (exchId.isLeft()) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts())
-                    
cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
+                for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups())
+                    grp.config().getAffinity().removeNode(exchId.nodeId());
             }
 
             exchActions = null;
@@ -1525,9 +1532,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             if (Thread.currentThread().isInterrupted())
                 return;
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (!cacheCtx.isLocal())
-                    cacheCtx.topology().detectLostPartitions(discoEvt);
+            for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+                if (!grp.isLocal())
+                    grp.topology().detectLostPartitions(discoEvt);
             }
         }
     }
@@ -1540,6 +1547,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             if (Thread.currentThread().isInterrupted())
                 return;
 
+            // TODO: IGNITE-5075.
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (!cacheCtx.isLocal() && 
cacheNames.contains(cacheCtx.name()))
                     cacheCtx.topology().resetLostPartitions();
@@ -1555,9 +1563,9 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
             assert crd.isLocal();
 
             if (!crd.equals(discoCache.serverNodes().get(0))) {
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (!cacheCtx.isLocal())
-                        cacheCtx.topology().beforeExchange(this, 
!centralizedAff);
+                for (CacheGroupInfrastructure grp : 
cctx.cache().cacheGroups()) {
+                    if (!grp.isLocal())
+                        grp.topology().beforeExchange(this, !centralizedAff);
                 }
             }
 
@@ -1631,11 +1639,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
      */
     private void assignPartitionsStates() {
         if (cctx.database().persistenceEnabled()) {
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (cacheCtx.isLocal())
+            for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
                     continue;
 
-                assignPartitionStates(cacheCtx.topology());
+                assignPartitionStates(grp.topology());
             }
         }
     }
@@ -1942,13 +1950,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                             List<ClusterNode> empty = Collections.emptyList();
 
-                            for (GridCacheContext cacheCtx : 
cctx.cacheContexts()) {
-                                List<List<ClusterNode>> affAssignment = new 
ArrayList<>(cacheCtx.affinity().partitions());
+                            for (CacheGroupInfrastructure grp : 
cctx.cache().cacheGroups()) {
+                                List<List<ClusterNode>> affAssignment = new 
ArrayList<>(grp.affinity().partitions());
 
-                                for (int i = 0; i < 
cacheCtx.affinity().partitions(); i++)
+                                for (int i = 0; i < 
grp.affinity().partitions(); i++)
                                     affAssignment.add(empty);
 
-                                
cacheCtx.affinity().affinityCache().initialize(topologyVersion(), 
affAssignment);
+                                grp.affinity().initialize(topologyVersion(), 
affAssignment);
                             }
 
                             onDone(topologyVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/f1128f85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 30d35a2..8d95a58 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -139,23 +139,23 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void partitionUpdateCounters(int cacheId, Map<Integer, T2<Long, 
Long>> cntrMap) {
+    public void partitionUpdateCounters(int grpId, Map<Integer, T2<Long, 
Long>> cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
-        partCntrs.put(cacheId, cntrMap);
+        partCntrs.put(grpId, cntrMap);
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Partition update counters.
      */
-    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
cacheId) {
+    @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int 
grpId) {
         if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
+            Map<Integer, T2<Long, Long>> res = partCntrs.get(grpId);
 
             return res != null ? res : Collections.<Integer, T2<Long, 
Long>>emptyMap();
         }

Reply via email to