Repository: ignite Updated Branches: refs/heads/ignite-5075 6eed51a2e -> 0e49e9150
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e49e915 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e49e915 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e49e915 Branch: refs/heads/ignite-5075 Commit: 0e49e915092f1b61e0085c13afb64296e06c24c2 Parents: 6eed51a Author: sboikov <[email protected]> Authored: Tue May 23 12:08:22 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 23 12:08:22 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 25 +++++++++++- .../affinity/GridAffinityAssignmentCache.java | 2 +- .../internal/processors/cache/CacheData.java | 2 +- .../processors/cache/CacheGroupData.java | 2 +- .../processors/cache/CacheGroupDescriptor.java | 2 +- .../cache/CacheGroupInfrastructure.java | 14 +++---- .../cache/CacheNodeCommonDiscoveryData.java | 14 ------- .../processors/cache/ClusterCachesInfo.java | 22 +++++------ .../cache/ClusterCachesReconnectResult.java | 16 +------- .../processors/cache/GridCacheIoManager.java | 40 -------------------- .../processors/cache/GridCacheProcessor.java | 12 +++--- .../processors/cache/IgniteCacheGroupsTest.java | 4 -- 12 files changed, 48 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index b1eb3de..a2e62a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -318,12 +318,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { */ public void addCacheGroup(CacheGroupDescriptor grpDesc, IgnitePredicate<ClusterNode> filter, CacheMode cacheMode) { CacheGroupAffinity old = registeredCacheGrps.put(grpDesc.groupId(), - new CacheGroupAffinity(filter, cacheMode)); + new CacheGroupAffinity(grpDesc.cacheOrGroupName(), filter, cacheMode)); assert old == null : old; } /** + * @param grpDesc Cache group descriptor. + */ + public void removeCacheGroup(CacheGroupDescriptor grpDesc) { + CacheGroupAffinity rmvd = registeredCacheGrps.remove(grpDesc.groupId()); + + assert rmvd != null : grpDesc.cacheOrGroupName(); + } + + /** * Adds dynamic cache filter. * * @param grpId Cache group ID. @@ -2704,6 +2713,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * */ private static class CacheGroupAffinity { + /** */ + private final String name; + /** Nodes filter. */ private final IgnitePredicate<ClusterNode> cacheFilter; @@ -2711,14 +2723,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final CacheMode cacheMode; /** + * @param name Name. * @param cacheFilter Node filter. * @param cacheMode Cache mode. */ - CacheGroupAffinity(IgnitePredicate<ClusterNode> cacheFilter, + CacheGroupAffinity( + String name, + IgnitePredicate<ClusterNode> cacheFilter, CacheMode cacheMode) { + this.name = name; this.cacheFilter = cacheFilter; this.cacheMode = cacheMode; } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheGroupAffinity [name=" + name + ']'; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 88f8b7a..b478462 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -134,7 +134,7 @@ public class GridAffinityAssignmentCache { assert ctx != null; assert aff != null; assert nodeFilter != null; - assert grpId > 0; + assert grpId != 0; this.ctx = ctx; this.aff = aff; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java index a2a1ee1..84e616d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java @@ -87,7 +87,7 @@ public class CacheData implements Serializable { assert rcvdFrom != null : cacheCfg.getName(); assert deploymentId != null : cacheCfg.getName(); assert template || cacheId != 0 : cacheCfg.getName(); - assert template || grpId > 0 : cacheCfg.getName(); + assert template || grpId != 0 : cacheCfg.getName(); this.cacheCfg = cacheCfg; this.cacheId = cacheId; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java index 9c1afff..0b94782 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java @@ -74,7 +74,7 @@ public class CacheGroupData implements Serializable { IgniteUuid deploymentId, Map<String, Integer> caches) { assert cacheCfg != null; - assert grpId > 0 : grpId; + assert grpId != 0; assert deploymentId != null; this.cacheCfg = cacheCfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java index 7876792..c4976e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java @@ -76,7 +76,7 @@ public class CacheGroupDescriptor { IgniteUuid deploymentId, Map<String, Integer> caches) { assert cacheCfg != null; - assert grpId > 0 : grpId; + assert grpId != 0; this.grpName = grpName; this.grpId = grpId; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 81c0ae0..6d61ab1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -63,8 +63,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF * */ public class CacheGroupInfrastructure { - /** Group ID (can be changed after client reconnect). */ - private int grpId; + /** Group ID. */ + private final int grpId; /** Node ID cache group was received from. */ private final UUID rcvdFrom; @@ -151,9 +151,9 @@ public class CacheGroupInfrastructure { FreeList freeList, ReuseList reuseList, AffinityTopologyVersion locStartVer) { - assert grpId > 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; assert ccfg != null; assert memPlc != null || !affNode; + assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; this.grpId = grpId; this.rcvdFrom = rcvdFrom; @@ -766,13 +766,9 @@ public class CacheGroupInfrastructure { } /** - * @param grpId New group ID. + * */ - public void onReconnected(int grpId) { - assert grpId > 0 : grpId; - - this.grpId = grpId; - + public void onReconnected() { aff.onReconnected(); if (top != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java index da6fc76..4c70cb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java @@ -31,9 +31,6 @@ class CacheNodeCommonDiscoveryData implements Serializable { private static final long serialVersionUID = 0L; /** */ - private final int cacheGrpIdGen; - - /** */ @GridToStringInclude private final Map<String, CacheData> caches; @@ -52,35 +49,24 @@ class CacheNodeCommonDiscoveryData implements Serializable { * @param caches Started caches. * @param templates Configured templates. * @param cacheGrps Started cache groups. - * @param cacheGrpIdGen Current counter for group ID assignment. * @param clientNodesMap Information about cache client nodes. */ CacheNodeCommonDiscoveryData(Map<String, CacheData> caches, Map<String, CacheData> templates, Map<Integer, CacheGroupData> cacheGrps, - int cacheGrpIdGen, Map<String, Map<UUID, Boolean>> clientNodesMap) { assert caches != null; assert templates != null; assert cacheGrps != null; - assert cacheGrpIdGen > 0 : cacheGrpIdGen; assert clientNodesMap != null; this.caches = caches; this.templates = templates; - this.cacheGrpIdGen = cacheGrpIdGen; this.cacheGrps = cacheGrps; this.clientNodesMap = clientNodesMap; } /** - * @return Current counter for group ID assignment. - */ - int currentCacheGroupId() { - return cacheGrpIdGen; - } - - /** * @return Started cache groups. */ Map<Integer, CacheGroupData> cacheGroups() { http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 708b8f6..bd30fbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -66,9 +66,6 @@ class ClusterCachesInfo { /** */ private final ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGrps = new ConcurrentHashMap<>(); - /** */ - private int cacheGrpIdGen = 1; - /** Cache templates. */ private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>(); @@ -420,6 +417,8 @@ class ClusterCachesInfo { if (!grpDesc.hasCaches()) { registeredCacheGrps.remove(grpDesc.groupId()); + ctx.discovery().removeCacheGroup(grpDesc); + exchangeActions.addCacheGroupToStop(grpDesc); } } @@ -683,7 +682,6 @@ class ClusterCachesInfo { return new CacheNodeCommonDiscoveryData(caches, templates, cacheGrps, - cacheGrpIdGen, ctx.discovery().clientNodesMap()); } @@ -699,10 +697,6 @@ class ClusterCachesInfo { CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); - cacheGrpIdGen = cachesData.currentCacheGroupId(); - - assert cacheGrpIdGen > 0 : cacheGrpIdGen; - // Replace locally registered data with actual data received from cluster. registeredCaches.clear(); registeredCacheGrps.clear(); @@ -999,7 +993,12 @@ class ClusterCachesInfo { } } - int grpId = cacheGrpIdGen++; + int grpId; + + if (startedCacheCfg.getGroupName() == null) + grpId = CU.cacheId(startedCacheCfg.getName()); + else + grpId = CU.cacheId("grp#" + startedCacheCfg.getGroupName()); Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId); @@ -1127,7 +1126,6 @@ class ClusterCachesInfo { Set<String> stoppedCaches = new HashSet<>(); Set<Integer> stoppedCacheGrps = new HashSet<>(); - Map<Integer, Integer> newCacheGrpIds = new HashMap<>(); for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) { CacheGroupDescriptor locDesc = e.getValue(); @@ -1152,7 +1150,7 @@ class ClusterCachesInfo { if (stopped) stoppedCacheGrps.add(locDesc.groupId()); else - newCacheGrpIds.put(locDesc.groupId(), desc.groupId()); + assert locDesc.groupId() == desc.groupId(); } for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) { @@ -1183,7 +1181,7 @@ class ClusterCachesInfo { cachesOnDisconnect = null; - return new ClusterCachesReconnectResult(stoppedCacheGrps, stoppedCaches, newCacheGrpIds); + return new ClusterCachesReconnectResult(stoppedCacheGrps, stoppedCaches); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java index 42f203d..23854c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Map; import java.util.Set; import org.apache.ignite.internal.util.typedef.internal.S; @@ -31,27 +30,14 @@ class ClusterCachesReconnectResult { /** */ private final Set<String> stoppedCaches; - /** */ - private final Map<Integer, Integer> newCacheGrpIds; - /** * @param stoppedCacheGrps Stopped cache groups. * @param stoppedCaches Stopped caches. - * @param newCacheGrpIds New cache group IDs. */ ClusterCachesReconnectResult(Set<Integer> stoppedCacheGrps, - Set<String> stoppedCaches, - Map<Integer, Integer> newCacheGrpIds) { + Set<String> stoppedCaches) { this.stoppedCacheGrps = stoppedCacheGrps; this.stoppedCaches = stoppedCaches; - this.newCacheGrpIds = newCacheGrpIds; - } - - /** - * @return New cache group IDs. - */ - Map<Integer, Integer> newCacheGroupIds() { - return newCacheGrpIds; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index f63f18c..114a1e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -1284,46 +1284,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** - * @param newGrpIds New cache group IDs. - */ - void onReconnected(Map<Integer, Integer> newGrpIds) { - assert grpHandlers.orderedHandlers.isEmpty() : grpHandlers.orderedHandlers; - - Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); - Map<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> clsHandlers = new HashMap<>(); - - for (Map.Entry<Integer, Integer> idEntry : newGrpIds.entrySet()) { - Integer oldId = idEntry.getKey(); - Integer newId = idEntry.getValue(); - - if (!oldId.equals(newId)) { - IgniteBiInClosure[] c = grpHandlers.idxClsHandlers.remove(oldId); - - if (c != null) { - Object old = idxClsHandlers.put(newId, c); - - assert old == null; - } - - for (Map.Entry<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> e : grpHandlers.clsHandlers.entrySet()) { - if (e.getKey().hndId == oldId) { - IgniteBiInClosure<UUID, GridCacheMessage> c0 = grpHandlers.clsHandlers.remove(e.getKey()); - - assert c0 != null; - - Object old = clsHandlers.put(new ListenerKey(newId, e.getKey().msgCls), c0); - - assert old == null; - } - } - } - } - - grpHandlers.idxClsHandlers.putAll(idxClsHandlers); - grpHandlers.clsHandlers.putAll(clsHandlers); - } - - /** * @param msgHandlers Handlers. * @param hndId ID to remove handlers for. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e00ba5f..a5f6f1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1137,19 +1137,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - for (CacheGroupInfrastructure grp : cacheGrps.values()) { - Integer grpId = reconnectRes.newCacheGroupIds().get(grp.groupId()); + final Set<Integer> stoppedGrps = reconnectRes.stoppedCacheGroups(); - if (grpId != null) - grp.onReconnected(grpId); - else + for (CacheGroupInfrastructure grp : cacheGrps.values()) { + if (stoppedGrps.contains(grp.groupId())) cacheGrps.remove(grp.groupId()); + else + grp.onReconnected(); } sharedCtx.onReconnected(); - sharedCtx.io().onReconnected(reconnectRes.newCacheGroupIds()); - for (GridCacheAdapter cache : reconnected) cache.context().gate().reconnected(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e49e915/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 1a133dc..4529f77 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -124,12 +124,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - //cfg.setLateAffinityAssignment(false); - cfg.setClientMode(client); - cfg.setMarshaller(null); - if (ccfgs != null) { cfg.setCacheConfiguration(ccfgs);
