Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 837b790b9 -> e519adf88


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e519adf8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e519adf8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e519adf8

Branch: refs/heads/ignite-5075
Commit: e519adf88db1d2fd29068a957dfc29ab0a700dc1
Parents: 837b790
Author: sboikov <[email protected]>
Authored: Tue May 16 23:24:09 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue May 16 23:24:09 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   1 +
 .../cache/CacheAffinitySharedManager.java       |   4 +-
 .../CacheClientReconnectDiscoveryData.java      |  64 +++++++-
 .../processors/cache/CacheGroupData.java        |  15 +-
 .../processors/cache/CacheGroupDescriptor.java  |  17 +-
 .../cache/CacheGroupInfrastructure.java         |  10 +-
 .../cache/CacheNodeCommonDiscoveryData.java     |   6 +-
 .../processors/cache/ClusterCachesInfo.java     | 159 +++++++++++++++----
 .../cache/ClusterCachesReconnectResult.java     |  66 ++++++++
 .../processors/cache/GridCacheIoManager.java    |  40 +++++
 .../GridCachePartitionExchangeManager.java      |   9 +-
 .../processors/cache/GridCacheProcessor.java    |  84 ++++++----
 12 files changed, 388 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index fb6637a..c3d12c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -633,6 +633,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     locJoin = new GridFutureAdapter<>();
 
                     registeredCaches.clear();
+                    registeredCacheGrps.clear();
 
                     for (AffinityTopologyVersion histVer : 
discoCacheHist.keySet()) {
                         Object rmvd = discoCacheHist.remove(histVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 0dab5ab..a20719d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -853,7 +853,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     }
 
     /**
-     * @param desc Cache descriptor.
+     * @param grpDesc Cache group descriptor.
      * @param aff Affinity.
      * @param fut Exchange future.
      * @param fetch Force fetch flag.
@@ -1002,6 +1002,8 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             else {
                 CacheGroupDescriptor grpDesc = 
registeredGrps.get(grp.groupId());
 
+                assert grpDesc != null : grp.nameForLog();
+
                 GridDhtAssignmentFetchFuture fetchFut = new 
GridDhtAssignmentFetchFuture(cctx,
                     grpDesc,
                     topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
index f970469..4505728 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
@@ -32,18 +32,31 @@ public class CacheClientReconnectDiscoveryData implements 
Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private final Map<Integer, CacheGroupInfo> clientCacheGrps;
+
+    /** */
     private final Map<String, CacheInfo> clientCaches;
 
     /**
      * @param clientCaches Information about caches started on re-joining 
client node.
+     * @param clientCacheGrps Information about cach groups started on 
re-joining client node.
      */
-    public CacheClientReconnectDiscoveryData(Map<String, CacheInfo> 
clientCaches) {
+    CacheClientReconnectDiscoveryData(Map<Integer, CacheGroupInfo> 
clientCacheGrps,
+        Map<String, CacheInfo> clientCaches) {
+        this.clientCacheGrps = clientCacheGrps;
         this.clientCaches = clientCaches;
     }
 
     /**
      * @return Information about caches started on re-joining client node.
      */
+    Map<Integer, CacheGroupInfo> clientCacheGroups() {
+        return clientCacheGrps;
+    }
+
+    /**
+     * @return Information about caches started on re-joining client node.
+     */
     Map<String, CacheInfo> clientCaches() {
         return clientCaches;
     }
@@ -51,6 +64,53 @@ public class CacheClientReconnectDiscoveryData implements 
Serializable {
     /**
      *
      */
+    static class CacheGroupInfo implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final CacheConfiguration ccfg;
+
+        /** */
+        private final IgniteUuid deploymentId;
+
+        /** Flags added for future usage. */
+        private final byte flags;
+
+        /**
+         * @param ccfg Cache group configuration.
+         * @param deploymentId Cache group deployment ID.
+         * @param flags Flags (for future usage).
+         */
+        CacheGroupInfo(CacheConfiguration ccfg,
+            IgniteUuid deploymentId,
+            byte flags) {
+            assert ccfg != null;
+            assert deploymentId != null;
+
+            this.ccfg = ccfg;
+            this.deploymentId = deploymentId;
+            this.flags = flags;
+        }
+
+        /**
+         * @return Cache group configuration.
+         */
+        CacheConfiguration config() {
+            return ccfg;
+        }
+
+        /**
+         * @return Cache group deployment ID.
+         */
+        IgniteUuid deploymentId() {
+            return deploymentId;
+        }
+    }
+
+    /**
+     *
+     */
     static class CacheInfo implements Serializable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -94,7 +154,7 @@ public class CacheClientReconnectDiscoveryData implements 
Serializable {
         }
 
         /**
-         * @return Cache configuraiton.
+         * @return Cache configuration.
          */
         CacheConfiguration config() {
             return ccfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
index ea2c256..7cf7349 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -53,17 +54,21 @@ public class CacheGroupData implements Serializable {
 
     /**
      * @param cacheCfg Cache configuration.
-     * @param grpId
+     * @param grpName Group name.
+     * @param grpId  Group ID.
+     * @param rcvdFrom Node ID cache group received from.
+     * @param deploymentId Deployment ID.
+     * @param caches Cache group caches.
      */
-    public CacheGroupData(CacheConfiguration cacheCfg,
-        String grpName,
+    CacheGroupData(
+        CacheConfiguration cacheCfg,
+        @Nullable String grpName,
         int grpId,
         UUID rcvdFrom,
         IgniteUuid deploymentId,
         Map<String, Integer> caches) {
         assert cacheCfg != null;
-        assert grpName != null;
-        assert grpId != 0;
+        assert grpId > 0 : grpId;
         assert deploymentId != null;
 
         this.cacheCfg = cacheCfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
index 0e85e7f..e503f4c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -48,15 +49,23 @@ public class CacheGroupDescriptor {
     /** */
     private final UUID rcvdFrom;
 
-    CacheGroupDescriptor(String grpName,
+    /**
+     * @param cacheCfg Cache configuration.
+     * @param grpName Group name.
+     * @param grpId  Group ID.
+     * @param rcvdFrom Node ID cache group received from.
+     * @param deploymentId Deployment ID.
+     * @param caches Cache group caches.
+     */
+    CacheGroupDescriptor(
+        CacheConfiguration cacheCfg,
+        @Nullable String grpName,
         int grpId,
         UUID rcvdFrom,
         IgniteUuid deploymentId,
-        CacheConfiguration cacheCfg,
         Map<String, Integer> caches) {
         assert cacheCfg != null;
-        assert grpName != null;
-        assert grpId != 0;
+        assert grpId > 0 : grpId;
 
         this.grpName = grpName;
         this.grpId = grpId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index e06b627..3f6b549 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -65,7 +65,7 @@ public class CacheGroupInfrastructure {
     private GridAffinityAssignmentCache aff;
 
     /** */
-    private final int grpId;
+    private int grpId;
 
     /** */
     private UUID rcvdFrom;
@@ -699,9 +699,13 @@ public class CacheGroupInfrastructure {
     }
 
     /**
-     *
+     * @param grpId New group ID.
      */
-    public void onReconnected() {
+    public void onReconnected(int grpId) {
+        assert grpId > 0 : grpId;
+
+        this.grpId = grpId;
+
         aff.onReconnected();
 
         if (top != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
index 55fb087..c799871 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java
@@ -43,7 +43,7 @@ class CacheNodeCommonDiscoveryData implements Serializable {
 
     /** */
     @GridToStringInclude
-    private final Map<String, CacheGroupData> cacheGrps;
+    private final Map<Integer, CacheGroupData> cacheGrps;
 
     /** */
     private final Map<String, Map<UUID, Boolean>> clientNodesMap;
@@ -55,7 +55,7 @@ class CacheNodeCommonDiscoveryData implements Serializable {
      */
     CacheNodeCommonDiscoveryData(Map<String, CacheData> caches,
         Map<String, CacheData> templates,
-        Map<String, CacheGroupData> cacheGrps,
+        Map<Integer, CacheGroupData> cacheGrps,
         int cacheGrpIdGen,
         Map<String, Map<UUID, Boolean>> clientNodesMap) {
         assert caches != null;
@@ -75,7 +75,7 @@ class CacheNodeCommonDiscoveryData implements Serializable {
         return cacheGrpIdGen;
     }
 
-    Map<String, CacheGroupData> cacheGroups() {
+    Map<Integer, CacheGroupData> cacheGroups() {
         return cacheGrps;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 23a37b0..d90cf7f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -63,7 +64,7 @@ class ClusterCachesInfo {
     private final ConcurrentMap<String, DynamicCacheDescriptor> 
registeredCaches = new ConcurrentHashMap<>();
 
     /** */
-    private final ConcurrentMap<String, CacheGroupDescriptor> 
registeredCacheGrps = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, CacheGroupDescriptor> 
registeredCacheGrps = new ConcurrentHashMap<>();
 
     /** */
     private int cacheGrpIdGen = 1;
@@ -75,7 +76,7 @@ class ClusterCachesInfo {
     private final IgniteLogger log;
 
     /** */
-    private Map<String, DynamicCacheDescriptor> cachesOnDisconnect;
+    private CachesOnDisconnect cachesOnDisconnect;
 
     /** */
     private CacheJoinNodeDiscoveryData joinDiscoData;
@@ -387,14 +388,14 @@ class ClusterCachesInfo {
 
                     exchangeActions.addCacheToStop(req, desc);
 
-                    CacheGroupDescriptor grpDesc = 
registeredCacheGrps.get(desc.groupDescriptor().groupName());
+                    CacheGroupDescriptor grpDesc = 
registeredCacheGrps.get(desc.groupDescriptor().groupId());
 
                     assert grpDesc != null && grpDesc.groupId() == 
desc.groupDescriptor().groupId() : desc;
 
                     grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
 
                     if (!grpDesc.hasCaches()) {
-                        registeredCacheGrps.remove(grpDesc.groupName());
+                        registeredCacheGrps.remove(grpDesc.groupId());
 
                         exchangeActions.addCacheGroupToStop(grpDesc);
                     }
@@ -477,10 +478,24 @@ class ClusterCachesInfo {
      */
     private Serializable joinDiscoveryData() {
         if (cachesOnDisconnect != null) {
+            Map<Integer, CacheClientReconnectDiscoveryData.CacheGroupInfo> 
cacheGrpsInfo = new HashMap<>();
             Map<String, CacheClientReconnectDiscoveryData.CacheInfo> 
cachesInfo = new HashMap<>();
 
+            Map<Integer, CacheGroupDescriptor> grps = 
cachesOnDisconnect.cacheGrps;
+            Map<String, DynamicCacheDescriptor> caches = 
cachesOnDisconnect.caches;
+
+            for (CacheGroupInfrastructure grp : ctx.cache().cacheGroups()) {
+                CacheGroupDescriptor desc = grps.get(grp.groupId());
+
+                assert desc != null : grp.nameForLog();
+
+                cacheGrpsInfo.put(grp.groupId(), new 
CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(),
+                    desc.deploymentId(),
+                    (byte)0));
+            }
+
             for (IgniteInternalCache cache : ctx.cache().caches()) {
-                DynamicCacheDescriptor desc = 
cachesOnDisconnect.get(cache.name());
+                DynamicCacheDescriptor desc = caches.get(cache.name());
 
                 assert desc != null : cache.name();
 
@@ -491,7 +506,7 @@ class ClusterCachesInfo {
                     (byte)0));
             }
 
-            return new CacheClientReconnectDiscoveryData(cachesInfo);
+            return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, 
cachesInfo);
         }
         else {
             assert ctx.config().isDaemon() || joinDiscoData != null || 
!ctx.state().active();
@@ -645,7 +660,7 @@ class ClusterCachesInfo {
             caches.put(desc.cacheName(), cacheData);
         }
 
-        Map<String, CacheGroupData> cacheGrps = new HashMap<>();
+        Map<Integer, CacheGroupData> cacheGrps = new HashMap<>();
 
         for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
             CacheGroupData grpData = new CacheGroupData(grpDesc.config(),
@@ -655,7 +670,7 @@ class ClusterCachesInfo {
                 grpDesc.deploymentId(),
                 grpDesc.caches());
 
-            cacheGrps.put(grpDesc.groupName(), grpData);
+            cacheGrps.put(grpDesc.groupId(), grpData);
         }
 
         Map<String, CacheData> templates = new HashMap<>();
@@ -696,15 +711,18 @@ class ClusterCachesInfo {
 
         cacheGrpIdGen = cachesData.currentCacheGroupId();
 
+        assert cacheGrpIdGen > 0 : cacheGrpIdGen;
+
         for (CacheGroupData grpData : cachesData.cacheGroups().values()) {
-            CacheGroupDescriptor grpDesc = new 
CacheGroupDescriptor(grpData.groupName(),
+            CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
+                grpData.config(),
+                grpData.groupName(),
                 grpData.groupId(),
                 grpData.receivedFrom(),
                 grpData.deploymentId(),
-                grpData.config(),
                 grpData.caches());
 
-            CacheGroupDescriptor old = 
registeredCacheGrps.put(grpDesc.groupName(), grpDesc);
+            CacheGroupDescriptor old = 
registeredCacheGrps.put(grpDesc.groupId(), grpDesc);
 
             assert old == null : old;
 
@@ -732,7 +750,7 @@ class ClusterCachesInfo {
         }
 
         for (CacheData cacheData : cachesData.caches().values()) {
-            CacheGroupDescriptor grpDesc = 
groupDescriptor(cacheData.groupId());
+            CacheGroupDescriptor grpDesc = 
registeredCacheGrps.get(cacheData.groupId());
 
             assert grpDesc != null : cacheData.cacheConfiguration().getName();
 
@@ -794,15 +812,6 @@ class ClusterCachesInfo {
         }
     }
 
-    private CacheGroupDescriptor groupDescriptor(int grpId) {
-        for (CacheGroupDescriptor desc : registeredCacheGrps.values()) {
-            if (desc.groupId() == grpId)
-                return desc;
-        }
-
-        return null;
-    }
-
     /**
      * @param clientData Discovery data.
      * @param clientNodeId Client node ID.
@@ -893,6 +902,36 @@ class ClusterCachesInfo {
         }
     }
 
+    /**
+     * @param grpName Group name.
+     * @return Group descriptor.
+     */
+    @Nullable private CacheGroupDescriptor cacheGroupByName(String grpName) {
+        assert grpName != null;
+
+        for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
+            if (grpName.equals(grpDesc.groupName()))
+                return grpDesc;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Group descriptor.
+     */
+    @Nullable private CacheGroupDescriptor 
nonSharedCacheGroupByCacheName(String cacheName) {
+        assert cacheName != null;
+
+        for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) {
+            if (!grpDesc.sharedGroup() && 
grpDesc.caches().containsKey(cacheName))
+                return grpDesc;
+        }
+
+        return null;
+    }
+
     private CacheGroupDescriptor registerCacheGroup(
         ExchangeActions exchActions,
         CacheConfiguration startedCacheCfg,
@@ -900,7 +939,7 @@ class ClusterCachesInfo {
         UUID rcvdFrom,
         IgniteUuid deploymentId) {
         if (startedCacheCfg.getGroupName() != null) {
-            CacheGroupDescriptor desc = 
registeredCacheGrps.get(startedCacheCfg.getGroupName());
+            CacheGroupDescriptor desc = 
cacheGroupByName(startedCacheCfg.getGroupName());
 
             if (desc != null) {
                 desc.onCacheAdded(startedCacheCfg.getName(), cacheId);
@@ -913,18 +952,15 @@ class ClusterCachesInfo {
 
         Map<String, Integer> caches = 
Collections.singletonMap(startedCacheCfg.getName(), cacheId);
 
-        String grpName = startedCacheCfg.getGroupName() != null ?
-            startedCacheCfg.getGroupName() : startedCacheCfg.getName();
-
         CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(
-            grpName,
+            startedCacheCfg,
+            startedCacheCfg.getGroupName(),
             grpId,
             rcvdFrom,
             deploymentId,
-            startedCacheCfg,
             caches);
 
-        CacheGroupDescriptor old = registeredCacheGrps.put(grpName, grpDesc);
+        CacheGroupDescriptor old = registeredCacheGrps.put(grpId, grpDesc);
 
         assert old == null : old;
 
@@ -939,7 +975,7 @@ class ClusterCachesInfo {
     /**
      * @return Registered cache groups.
      */
-    ConcurrentMap<String, CacheGroupDescriptor> registeredCacheGroups() {
+    ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() {
         return registeredCacheGrps;
     }
 
@@ -949,7 +985,7 @@ class ClusterCachesInfo {
      */
     void validateStartCacheConfiguration(CacheConfiguration ccfg) throws 
IgniteCheckedException {
         if (ccfg.getGroupName() != null) {
-            CacheGroupDescriptor grpDesc = 
registeredCacheGrps.get(ccfg.getGroupName());
+            CacheGroupDescriptor grpDesc = 
cacheGroupByName(ccfg.getGroupName());
 
             if (grpDesc != null) {
                 assert ccfg.getGroupName().equals(grpDesc.groupName());
@@ -1005,8 +1041,11 @@ class ClusterCachesInfo {
      *
      */
     void onDisconnect() {
-        cachesOnDisconnect = new HashMap<>(registeredCaches);
+        cachesOnDisconnect = new CachesOnDisconnect(
+            new HashMap<>(registeredCacheGrps),
+            new HashMap<>(registeredCaches));
 
+        registeredCacheGrps.clear();
         registeredCaches.clear();
         registeredTemplates.clear();
 
@@ -1014,14 +1053,42 @@ class ClusterCachesInfo {
     }
 
     /**
-     * @return Stopped caches names.
+     * @return Information about stopped caches and cache groups.
      */
-    Set<String> onReconnected() {
+    ClusterCachesReconnectResult onReconnected() {
         assert disconnectedState();
 
         Set<String> stoppedCaches = new HashSet<>();
+        Set<Integer> stoppedCacheGrps = new HashSet<>();
+        Map<Integer, Integer> newCacheGrpIds = new HashMap<>();
+
+        for (Map.Entry<Integer, CacheGroupDescriptor> e : 
cachesOnDisconnect.cacheGrps.entrySet()) {
+            CacheGroupDescriptor locDesc = e.getValue();
+
+            CacheGroupDescriptor desc;
+            boolean stopped = true;
+
+            if (locDesc.sharedGroup()) {
+                desc = cacheGroupByName(locDesc.groupName());
+
+                if (desc != null && 
desc.deploymentId().equals(locDesc.deploymentId()))
+                    stopped = false;
+            }
+            else {
+                desc = 
nonSharedCacheGroupByCacheName(locDesc.config().getName());
+
+                if (desc != null &&
+                    (surviveReconnect(locDesc.config().getName()) || 
desc.deploymentId().equals(locDesc.deploymentId())))
+                    stopped = false;
+            }
+
+            if (stopped)
+                stoppedCacheGrps.add(locDesc.groupId());
+            else
+                newCacheGrpIds.put(locDesc.groupId(), desc.groupId());
+        }
 
-        for(Map.Entry<String, DynamicCacheDescriptor> e : 
cachesOnDisconnect.entrySet()) {
+        for (Map.Entry<String, DynamicCacheDescriptor> e : 
cachesOnDisconnect.caches.entrySet()) {
             DynamicCacheDescriptor desc = e.getValue();
 
             String cacheName = e.getKey();
@@ -1049,7 +1116,7 @@ class ClusterCachesInfo {
 
         cachesOnDisconnect = null;
 
-        return stoppedCaches;
+        return new ClusterCachesReconnectResult(stoppedCacheGrps, 
stoppedCaches, newCacheGrpIds);
     }
 
     /**
@@ -1071,6 +1138,28 @@ class ClusterCachesInfo {
      *
      */
     void clearCaches() {
+        registeredCacheGrps.clear();
+
         registeredCaches.clear();
     }
+
+    /**
+     *
+     */
+    private static class CachesOnDisconnect {
+        /** */
+        final Map<Integer, CacheGroupDescriptor> cacheGrps;
+
+        /** */
+        final Map<String, DynamicCacheDescriptor> caches;
+
+        /**
+         * @param cacheGrps Cache groups.
+         * @param caches Caches.
+         */
+        CachesOnDisconnect(Map<Integer, CacheGroupDescriptor> cacheGrps, 
Map<String, DynamicCacheDescriptor> caches) {
+            this.cacheGrps = cacheGrps;
+            this.caches = caches;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java
new file mode 100644
index 0000000..e204cac
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ */
+class ClusterCachesReconnectResult {
+    /** */
+    private final Set<Integer> stoppedCacheGrps;
+
+    /** */
+    private final Set<String> stoppedCaches;
+
+    /** */
+    private final Map<Integer, Integer> newCacheGrpIds;
+
+    /**
+     * @param stoppedCacheGrps Stopped cache groups.
+     * @param stoppedCaches Stopped caches.
+     */
+    ClusterCachesReconnectResult(Set<Integer> stoppedCacheGrps,
+        Set<String> stoppedCaches,
+        Map<Integer, Integer> newCacheGrpIds) {
+        this.stoppedCacheGrps = stoppedCacheGrps;
+        this.stoppedCaches = stoppedCaches;
+        this.newCacheGrpIds = newCacheGrpIds;
+    }
+
+    Map<Integer, Integer> newCacheGroupIds() {
+        return newCacheGrpIds;
+    }
+
+    Set<Integer> stoppedCacheGroups() {
+        return stoppedCacheGrps;
+    }
+
+    Set<String> stoppedCaches() {
+        return stoppedCaches;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClusterCachesReconnectResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index bb76541..3c1ab93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1279,6 +1279,46 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param newGrpIds New cache group IDs.
+     */
+    void onReconnected(Map<Integer, Integer> newGrpIds) {
+        assert grpHandlers.orderedHandlers.isEmpty() : 
grpHandlers.orderedHandlers;
+
+        Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>();
+        Map<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> 
clsHandlers = new HashMap<>();
+
+        for (Map.Entry<Integer, Integer> idEntry : newGrpIds.entrySet()) {
+            Integer oldId = idEntry.getKey();
+            Integer newId = idEntry.getValue();
+
+            if (!oldId.equals(newId)) {
+                IgniteBiInClosure[] c = 
grpHandlers.idxClsHandlers.remove(oldId);
+
+                if (c != null) {
+                    Object old = idxClsHandlers.put(newId, c);
+
+                    assert old == null;
+                }
+
+                for (Map.Entry<ListenerKey, IgniteBiInClosure<UUID, 
GridCacheMessage>> e : grpHandlers.clsHandlers.entrySet()) {
+                    if (e.getKey().hndId == oldId) {
+                        IgniteBiInClosure<UUID, GridCacheMessage> c0 = 
grpHandlers.clsHandlers.remove(e.getKey());
+
+                        assert c0 != null;
+
+                        Object old = clsHandlers.put(new ListenerKey(newId, 
e.getKey().msgCls), c0);
+
+                        assert old == null;
+                    }
+                }
+            }
+        }
+
+        grpHandlers.idxClsHandlers.putAll(idxClsHandlers);
+        grpHandlers.clsHandlers.putAll(clsHandlers);
+    }
+
+    /**
      * @param msgHandlers Handlers.
      * @param hndId ID to remove handlers for.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 7eb6824..fcc2bdc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -899,12 +899,17 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
             if (!grp.isLocal()) {
+                if (exchId != null) {
+                    AffinityTopologyVersion startTopVer = 
grp.localStartVersion();
+
+                    if (startTopVer.compareTo(exchId.topologyVersion()) > 0)
+                        continue;
+                }
+
                 GridAffinityAssignmentCache affCache = grp.affinity();
 
                 GridDhtPartitionFullMap locMap = 
grp.topology().partitionMap(true);
 
-                assert locMap != null || exchId == null : grp.nameForLog();
-
                 if (locMap != null) {
                     addFullPartitionsMap(m,
                         dupData,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b77b05d..8c55cc9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1082,41 +1082,35 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         cachesInfo.onDisconnect();
     }
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> onReconnected(boolean 
clusterRestarted) throws IgniteCheckedException {
-        List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
-
-        GridCompoundFuture<?, ?> stopFut = null;
-
-        Set<String> stoppedCaches = cachesInfo.onReconnected();
+    /**
+     * @param cctx Cache context.
+     * @param stoppedCaches List where stopped cache should be added.
+     */
+    private void stopCacheOnReconnect(GridCacheContext cctx, 
List<GridCacheAdapter> stoppedCaches) {
+        cctx.gate().reconnected(true);
 
-        // TODO IGNITE-5075.
-        for (CacheGroupInfrastructure grp : cacheGrps.values())
-            grp.onReconnected();
+        sharedCtx.removeCacheContext(cctx);
 
-        for (final GridCacheAdapter cache : caches.values()) {
-            boolean stopped = stoppedCaches.contains(cache.name());
+        caches.remove(cctx.name());
+        jCacheProxies.remove(cctx.name());
 
-            if (stopped) {
-                cache.context().gate().reconnected(true);
+        stoppedCaches.add(cctx.cache());
+    }
 
-                sharedCtx.removeCacheContext(cache.ctx);
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> onReconnected(boolean 
clusterRestarted) throws IgniteCheckedException {
+        List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size());
 
-                caches.remove(cache.name());
-                jCacheProxies.remove(cache.name());
+        ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected();
 
-                IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new 
Runnable() {
-                    @Override public void run() {
-                        onKernalStop(cache, true);
-                        stopCache(cache, true, false);
-                    }
-                });
+        final List<GridCacheAdapter> stoppedCaches = new ArrayList<>();
 
-                if (stopFut == null)
-                    stopFut = new GridCompoundFuture<>();
+        for (final GridCacheAdapter cache : caches.values()) {
+            boolean stopped = 
reconnectRes.stoppedCacheGroups().contains(cache.context().groupId())
+                || reconnectRes.stoppedCaches().contains(cache.name());
 
-                stopFut.add((IgniteInternalFuture)fut);
-            }
+            if (stopped)
+                stopCacheOnReconnect(cache.context(), stoppedCaches);
             else {
                 cache.onReconnected();
 
@@ -1128,7 +1122,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                     DynamicCacheDescriptor desc = cacheDescriptor(cctx.name());
 
-                    assert desc != null;
+                    assert desc != null : cctx.name();
 
                     ctx.query().onCacheStop0(cctx.name());
                     ctx.query().onCacheStart0(cctx, desc.schema());
@@ -1136,13 +1130,39 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             }
         }
 
+        for (CacheGroupInfrastructure grp : cacheGrps.values()) {
+            Integer grpId = reconnectRes.newCacheGroupIds().get(grp.groupId());
+
+            if (grpId != null)
+                grp.onReconnected(grpId);
+            else
+                cacheGrps.remove(grp.groupId());
+        }
+
         sharedCtx.onReconnected();
 
+        sharedCtx.io().onReconnected(reconnectRes.newCacheGroupIds());
+
         for (GridCacheAdapter cache : reconnected)
             cache.context().gate().reconnected(false);
 
-        if (stopFut != null)
-            stopFut.markInitialized();
+        IgniteInternalFuture<?> stopFut = null;
+
+        if (!stoppedCaches.isEmpty()) {
+            stopFut = ctx.closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    for (GridCacheAdapter cache : stoppedCaches) {
+                        CacheGroupInfrastructure grp = cache.context().group();
+
+                        onKernalStop(cache, true);
+                        stopCache(cache, true, false);
+
+                        if (!grp.hasCaches())
+                            grp.stopGroup();
+                    }
+                }
+            });
+        }
 
         return stopFut;
     }
@@ -1256,12 +1276,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 mgr.stop(cancel, destroy);
         }
 
-        ctx.group().stopCache(ctx, destroy);
-
         ctx.kernalContext().continuous().onCacheStop(ctx);
 
         ctx.kernalContext().cache().context().database().onCacheStop(ctx);
 
+        ctx.group().stopCache(ctx, destroy);
+
         U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), 
ctx.store().configuredStore()));
 
         if (log.isInfoEnabled())

Reply via email to