Repository: ignite Updated Branches: refs/heads/ignite-5075 837b790b9 -> e519adf88
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e519adf8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e519adf8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e519adf8 Branch: refs/heads/ignite-5075 Commit: e519adf88db1d2fd29068a957dfc29ab0a700dc1 Parents: 837b790 Author: sboikov <[email protected]> Authored: Tue May 16 23:24:09 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 16 23:24:09 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 1 + .../cache/CacheAffinitySharedManager.java | 4 +- .../CacheClientReconnectDiscoveryData.java | 64 +++++++- .../processors/cache/CacheGroupData.java | 15 +- .../processors/cache/CacheGroupDescriptor.java | 17 +- .../cache/CacheGroupInfrastructure.java | 10 +- .../cache/CacheNodeCommonDiscoveryData.java | 6 +- .../processors/cache/ClusterCachesInfo.java | 159 +++++++++++++++---- .../cache/ClusterCachesReconnectResult.java | 66 ++++++++ .../processors/cache/GridCacheIoManager.java | 40 +++++ .../GridCachePartitionExchangeManager.java | 9 +- .../processors/cache/GridCacheProcessor.java | 84 ++++++---- 12 files changed, 388 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index fb6637a..c3d12c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -633,6 +633,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { locJoin = new GridFutureAdapter<>(); registeredCaches.clear(); + registeredCacheGrps.clear(); for (AffinityTopologyVersion histVer : discoCacheHist.keySet()) { Object rmvd = discoCacheHist.remove(histVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 0dab5ab..a20719d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -853,7 +853,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** - * @param desc Cache descriptor. + * @param grpDesc Cache group descriptor. * @param aff Affinity. * @param fut Exchange future. * @param fetch Force fetch flag. @@ -1002,6 +1002,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId()); + assert grpDesc != null : grp.nameForLog(); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, grpDesc, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java index f970469..4505728 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java @@ -32,18 +32,31 @@ public class CacheClientReconnectDiscoveryData implements Serializable { private static final long serialVersionUID = 0L; /** */ + private final Map<Integer, CacheGroupInfo> clientCacheGrps; + + /** */ private final Map<String, CacheInfo> clientCaches; /** * @param clientCaches Information about caches started on re-joining client node. + * @param clientCacheGrps Information about cach groups started on re-joining client node. */ - public CacheClientReconnectDiscoveryData(Map<String, CacheInfo> clientCaches) { + CacheClientReconnectDiscoveryData(Map<Integer, CacheGroupInfo> clientCacheGrps, + Map<String, CacheInfo> clientCaches) { + this.clientCacheGrps = clientCacheGrps; this.clientCaches = clientCaches; } /** * @return Information about caches started on re-joining client node. */ + Map<Integer, CacheGroupInfo> clientCacheGroups() { + return clientCacheGrps; + } + + /** + * @return Information about caches started on re-joining client node. + */ Map<String, CacheInfo> clientCaches() { return clientCaches; } @@ -51,6 +64,53 @@ public class CacheClientReconnectDiscoveryData implements Serializable { /** * */ + static class CacheGroupInfo implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final CacheConfiguration ccfg; + + /** */ + private final IgniteUuid deploymentId; + + /** Flags added for future usage. */ + private final byte flags; + + /** + * @param ccfg Cache group configuration. + * @param deploymentId Cache group deployment ID. + * @param flags Flags (for future usage). + */ + CacheGroupInfo(CacheConfiguration ccfg, + IgniteUuid deploymentId, + byte flags) { + assert ccfg != null; + assert deploymentId != null; + + this.ccfg = ccfg; + this.deploymentId = deploymentId; + this.flags = flags; + } + + /** + * @return Cache group configuration. + */ + CacheConfiguration config() { + return ccfg; + } + + /** + * @return Cache group deployment ID. + */ + IgniteUuid deploymentId() { + return deploymentId; + } + } + + /** + * + */ static class CacheInfo implements Serializable { /** */ private static final long serialVersionUID = 0L; @@ -94,7 +154,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable { } /** - * @return Cache configuraiton. + * @return Cache configuration. */ CacheConfiguration config() { return ccfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java index ea2c256..7cf7349 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * @@ -53,17 +54,21 @@ public class CacheGroupData implements Serializable { /** * @param cacheCfg Cache configuration. - * @param grpId + * @param grpName Group name. + * @param grpId Group ID. + * @param rcvdFrom Node ID cache group received from. + * @param deploymentId Deployment ID. + * @param caches Cache group caches. */ - public CacheGroupData(CacheConfiguration cacheCfg, - String grpName, + CacheGroupData( + CacheConfiguration cacheCfg, + @Nullable String grpName, int grpId, UUID rcvdFrom, IgniteUuid deploymentId, Map<String, Integer> caches) { assert cacheCfg != null; - assert grpName != null; - assert grpId != 0; + assert grpId > 0 : grpId; assert deploymentId != null; this.cacheCfg = cacheCfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java index 0e85e7f..e503f4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * @@ -48,15 +49,23 @@ public class CacheGroupDescriptor { /** */ private final UUID rcvdFrom; - CacheGroupDescriptor(String grpName, + /** + * @param cacheCfg Cache configuration. + * @param grpName Group name. + * @param grpId Group ID. + * @param rcvdFrom Node ID cache group received from. + * @param deploymentId Deployment ID. + * @param caches Cache group caches. + */ + CacheGroupDescriptor( + CacheConfiguration cacheCfg, + @Nullable String grpName, int grpId, UUID rcvdFrom, IgniteUuid deploymentId, - CacheConfiguration cacheCfg, Map<String, Integer> caches) { assert cacheCfg != null; - assert grpName != null; - assert grpId != 0; + assert grpId > 0 : grpId; this.grpName = grpName; this.grpId = grpId; http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index e06b627..3f6b549 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -65,7 +65,7 @@ public class CacheGroupInfrastructure { private GridAffinityAssignmentCache aff; /** */ - private final int grpId; + private int grpId; /** */ private UUID rcvdFrom; @@ -699,9 +699,13 @@ public class CacheGroupInfrastructure { } /** - * + * @param grpId New group ID. */ - public void onReconnected() { + public void onReconnected(int grpId) { + assert grpId > 0 : grpId; + + this.grpId = grpId; + aff.onReconnected(); if (top != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java index 55fb087..c799871 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheNodeCommonDiscoveryData.java @@ -43,7 +43,7 @@ class CacheNodeCommonDiscoveryData implements Serializable { /** */ @GridToStringInclude - private final Map<String, CacheGroupData> cacheGrps; + private final Map<Integer, CacheGroupData> cacheGrps; /** */ private final Map<String, Map<UUID, Boolean>> clientNodesMap; @@ -55,7 +55,7 @@ class CacheNodeCommonDiscoveryData implements Serializable { */ CacheNodeCommonDiscoveryData(Map<String, CacheData> caches, Map<String, CacheData> templates, - Map<String, CacheGroupData> cacheGrps, + Map<Integer, CacheGroupData> cacheGrps, int cacheGrpIdGen, Map<String, Map<UUID, Boolean>> clientNodesMap) { assert caches != null; @@ -75,7 +75,7 @@ class CacheNodeCommonDiscoveryData implements Serializable { return cacheGrpIdGen; } - Map<String, CacheGroupData> cacheGroups() { + Map<Integer, CacheGroupData> cacheGroups() { return cacheGrps; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 23a37b0..d90cf7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; @@ -63,7 +64,7 @@ class ClusterCachesInfo { private final ConcurrentMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>(); /** */ - private final ConcurrentMap<String, CacheGroupDescriptor> registeredCacheGrps = new ConcurrentHashMap<>(); + private final ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGrps = new ConcurrentHashMap<>(); /** */ private int cacheGrpIdGen = 1; @@ -75,7 +76,7 @@ class ClusterCachesInfo { private final IgniteLogger log; /** */ - private Map<String, DynamicCacheDescriptor> cachesOnDisconnect; + private CachesOnDisconnect cachesOnDisconnect; /** */ private CacheJoinNodeDiscoveryData joinDiscoData; @@ -387,14 +388,14 @@ class ClusterCachesInfo { exchangeActions.addCacheToStop(req, desc); - CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupDescriptor().groupName()); + CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupDescriptor().groupId()); assert grpDesc != null && grpDesc.groupId() == desc.groupDescriptor().groupId() : desc; grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId()); if (!grpDesc.hasCaches()) { - registeredCacheGrps.remove(grpDesc.groupName()); + registeredCacheGrps.remove(grpDesc.groupId()); exchangeActions.addCacheGroupToStop(grpDesc); } @@ -477,10 +478,24 @@ class ClusterCachesInfo { */ private Serializable joinDiscoveryData() { if (cachesOnDisconnect != null) { + Map<Integer, CacheClientReconnectDiscoveryData.CacheGroupInfo> cacheGrpsInfo = new HashMap<>(); Map<String, CacheClientReconnectDiscoveryData.CacheInfo> cachesInfo = new HashMap<>(); + Map<Integer, CacheGroupDescriptor> grps = cachesOnDisconnect.cacheGrps; + Map<String, DynamicCacheDescriptor> caches = cachesOnDisconnect.caches; + + for (CacheGroupInfrastructure grp : ctx.cache().cacheGroups()) { + CacheGroupDescriptor desc = grps.get(grp.groupId()); + + assert desc != null : grp.nameForLog(); + + cacheGrpsInfo.put(grp.groupId(), new CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(), + desc.deploymentId(), + (byte)0)); + } + for (IgniteInternalCache cache : ctx.cache().caches()) { - DynamicCacheDescriptor desc = cachesOnDisconnect.get(cache.name()); + DynamicCacheDescriptor desc = caches.get(cache.name()); assert desc != null : cache.name(); @@ -491,7 +506,7 @@ class ClusterCachesInfo { (byte)0)); } - return new CacheClientReconnectDiscoveryData(cachesInfo); + return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo); } else { assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active(); @@ -645,7 +660,7 @@ class ClusterCachesInfo { caches.put(desc.cacheName(), cacheData); } - Map<String, CacheGroupData> cacheGrps = new HashMap<>(); + Map<Integer, CacheGroupData> cacheGrps = new HashMap<>(); for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) { CacheGroupData grpData = new CacheGroupData(grpDesc.config(), @@ -655,7 +670,7 @@ class ClusterCachesInfo { grpDesc.deploymentId(), grpDesc.caches()); - cacheGrps.put(grpDesc.groupName(), grpData); + cacheGrps.put(grpDesc.groupId(), grpData); } Map<String, CacheData> templates = new HashMap<>(); @@ -696,15 +711,18 @@ class ClusterCachesInfo { cacheGrpIdGen = cachesData.currentCacheGroupId(); + assert cacheGrpIdGen > 0 : cacheGrpIdGen; + for (CacheGroupData grpData : cachesData.cacheGroups().values()) { - CacheGroupDescriptor grpDesc = new CacheGroupDescriptor(grpData.groupName(), + CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( + grpData.config(), + grpData.groupName(), grpData.groupId(), grpData.receivedFrom(), grpData.deploymentId(), - grpData.config(), grpData.caches()); - CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupName(), grpDesc); + CacheGroupDescriptor old = registeredCacheGrps.put(grpDesc.groupId(), grpDesc); assert old == null : old; @@ -732,7 +750,7 @@ class ClusterCachesInfo { } for (CacheData cacheData : cachesData.caches().values()) { - CacheGroupDescriptor grpDesc = groupDescriptor(cacheData.groupId()); + CacheGroupDescriptor grpDesc = registeredCacheGrps.get(cacheData.groupId()); assert grpDesc != null : cacheData.cacheConfiguration().getName(); @@ -794,15 +812,6 @@ class ClusterCachesInfo { } } - private CacheGroupDescriptor groupDescriptor(int grpId) { - for (CacheGroupDescriptor desc : registeredCacheGrps.values()) { - if (desc.groupId() == grpId) - return desc; - } - - return null; - } - /** * @param clientData Discovery data. * @param clientNodeId Client node ID. @@ -893,6 +902,36 @@ class ClusterCachesInfo { } } + /** + * @param grpName Group name. + * @return Group descriptor. + */ + @Nullable private CacheGroupDescriptor cacheGroupByName(String grpName) { + assert grpName != null; + + for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) { + if (grpName.equals(grpDesc.groupName())) + return grpDesc; + } + + return null; + } + + /** + * @param cacheName Cache name. + * @return Group descriptor. + */ + @Nullable private CacheGroupDescriptor nonSharedCacheGroupByCacheName(String cacheName) { + assert cacheName != null; + + for (CacheGroupDescriptor grpDesc : registeredCacheGrps.values()) { + if (!grpDesc.sharedGroup() && grpDesc.caches().containsKey(cacheName)) + return grpDesc; + } + + return null; + } + private CacheGroupDescriptor registerCacheGroup( ExchangeActions exchActions, CacheConfiguration startedCacheCfg, @@ -900,7 +939,7 @@ class ClusterCachesInfo { UUID rcvdFrom, IgniteUuid deploymentId) { if (startedCacheCfg.getGroupName() != null) { - CacheGroupDescriptor desc = registeredCacheGrps.get(startedCacheCfg.getGroupName()); + CacheGroupDescriptor desc = cacheGroupByName(startedCacheCfg.getGroupName()); if (desc != null) { desc.onCacheAdded(startedCacheCfg.getName(), cacheId); @@ -913,18 +952,15 @@ class ClusterCachesInfo { Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId); - String grpName = startedCacheCfg.getGroupName() != null ? - startedCacheCfg.getGroupName() : startedCacheCfg.getName(); - CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( - grpName, + startedCacheCfg, + startedCacheCfg.getGroupName(), grpId, rcvdFrom, deploymentId, - startedCacheCfg, caches); - CacheGroupDescriptor old = registeredCacheGrps.put(grpName, grpDesc); + CacheGroupDescriptor old = registeredCacheGrps.put(grpId, grpDesc); assert old == null : old; @@ -939,7 +975,7 @@ class ClusterCachesInfo { /** * @return Registered cache groups. */ - ConcurrentMap<String, CacheGroupDescriptor> registeredCacheGroups() { + ConcurrentMap<Integer, CacheGroupDescriptor> registeredCacheGroups() { return registeredCacheGrps; } @@ -949,7 +985,7 @@ class ClusterCachesInfo { */ void validateStartCacheConfiguration(CacheConfiguration ccfg) throws IgniteCheckedException { if (ccfg.getGroupName() != null) { - CacheGroupDescriptor grpDesc = registeredCacheGrps.get(ccfg.getGroupName()); + CacheGroupDescriptor grpDesc = cacheGroupByName(ccfg.getGroupName()); if (grpDesc != null) { assert ccfg.getGroupName().equals(grpDesc.groupName()); @@ -1005,8 +1041,11 @@ class ClusterCachesInfo { * */ void onDisconnect() { - cachesOnDisconnect = new HashMap<>(registeredCaches); + cachesOnDisconnect = new CachesOnDisconnect( + new HashMap<>(registeredCacheGrps), + new HashMap<>(registeredCaches)); + registeredCacheGrps.clear(); registeredCaches.clear(); registeredTemplates.clear(); @@ -1014,14 +1053,42 @@ class ClusterCachesInfo { } /** - * @return Stopped caches names. + * @return Information about stopped caches and cache groups. */ - Set<String> onReconnected() { + ClusterCachesReconnectResult onReconnected() { assert disconnectedState(); Set<String> stoppedCaches = new HashSet<>(); + Set<Integer> stoppedCacheGrps = new HashSet<>(); + Map<Integer, Integer> newCacheGrpIds = new HashMap<>(); + + for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) { + CacheGroupDescriptor locDesc = e.getValue(); + + CacheGroupDescriptor desc; + boolean stopped = true; + + if (locDesc.sharedGroup()) { + desc = cacheGroupByName(locDesc.groupName()); + + if (desc != null && desc.deploymentId().equals(locDesc.deploymentId())) + stopped = false; + } + else { + desc = nonSharedCacheGroupByCacheName(locDesc.config().getName()); + + if (desc != null && + (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId()))) + stopped = false; + } + + if (stopped) + stoppedCacheGrps.add(locDesc.groupId()); + else + newCacheGrpIds.put(locDesc.groupId(), desc.groupId()); + } - for(Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.entrySet()) { + for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) { DynamicCacheDescriptor desc = e.getValue(); String cacheName = e.getKey(); @@ -1049,7 +1116,7 @@ class ClusterCachesInfo { cachesOnDisconnect = null; - return stoppedCaches; + return new ClusterCachesReconnectResult(stoppedCacheGrps, stoppedCaches, newCacheGrpIds); } /** @@ -1071,6 +1138,28 @@ class ClusterCachesInfo { * */ void clearCaches() { + registeredCacheGrps.clear(); + registeredCaches.clear(); } + + /** + * + */ + private static class CachesOnDisconnect { + /** */ + final Map<Integer, CacheGroupDescriptor> cacheGrps; + + /** */ + final Map<String, DynamicCacheDescriptor> caches; + + /** + * @param cacheGrps Cache groups. + * @param caches Caches. + */ + CachesOnDisconnect(Map<Integer, CacheGroupDescriptor> cacheGrps, Map<String, DynamicCacheDescriptor> caches) { + this.cacheGrps = cacheGrps; + this.caches = caches; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java new file mode 100644 index 0000000..e204cac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesReconnectResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.Map; +import java.util.Set; + +/** + * + */ +class ClusterCachesReconnectResult { + /** */ + private final Set<Integer> stoppedCacheGrps; + + /** */ + private final Set<String> stoppedCaches; + + /** */ + private final Map<Integer, Integer> newCacheGrpIds; + + /** + * @param stoppedCacheGrps Stopped cache groups. + * @param stoppedCaches Stopped caches. + */ + ClusterCachesReconnectResult(Set<Integer> stoppedCacheGrps, + Set<String> stoppedCaches, + Map<Integer, Integer> newCacheGrpIds) { + this.stoppedCacheGrps = stoppedCacheGrps; + this.stoppedCaches = stoppedCaches; + this.newCacheGrpIds = newCacheGrpIds; + } + + Map<Integer, Integer> newCacheGroupIds() { + return newCacheGrpIds; + } + + Set<Integer> stoppedCacheGroups() { + return stoppedCacheGrps; + } + + Set<String> stoppedCaches() { + return stoppedCaches; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClusterCachesReconnectResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index bb76541..3c1ab93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -1279,6 +1279,46 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param newGrpIds New cache group IDs. + */ + void onReconnected(Map<Integer, Integer> newGrpIds) { + assert grpHandlers.orderedHandlers.isEmpty() : grpHandlers.orderedHandlers; + + Map<Integer, IgniteBiInClosure[]> idxClsHandlers = new HashMap<>(); + Map<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> clsHandlers = new HashMap<>(); + + for (Map.Entry<Integer, Integer> idEntry : newGrpIds.entrySet()) { + Integer oldId = idEntry.getKey(); + Integer newId = idEntry.getValue(); + + if (!oldId.equals(newId)) { + IgniteBiInClosure[] c = grpHandlers.idxClsHandlers.remove(oldId); + + if (c != null) { + Object old = idxClsHandlers.put(newId, c); + + assert old == null; + } + + for (Map.Entry<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>> e : grpHandlers.clsHandlers.entrySet()) { + if (e.getKey().hndId == oldId) { + IgniteBiInClosure<UUID, GridCacheMessage> c0 = grpHandlers.clsHandlers.remove(e.getKey()); + + assert c0 != null; + + Object old = clsHandlers.put(new ListenerKey(newId, e.getKey().msgCls), c0); + + assert old == null; + } + } + } + } + + grpHandlers.idxClsHandlers.putAll(idxClsHandlers); + grpHandlers.clsHandlers.putAll(clsHandlers); + } + + /** * @param msgHandlers Handlers. * @param hndId ID to remove handlers for. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 7eb6824..fcc2bdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -899,12 +899,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) { + if (exchId != null) { + AffinityTopologyVersion startTopVer = grp.localStartVersion(); + + if (startTopVer.compareTo(exchId.topologyVersion()) > 0) + continue; + } + GridAffinityAssignmentCache affCache = grp.affinity(); GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true); - assert locMap != null || exchId == null : grp.nameForLog(); - if (locMap != null) { addFullPartitionsMap(m, dupData, http://git-wip-us.apache.org/repos/asf/ignite/blob/e519adf8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index b77b05d..8c55cc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1082,41 +1082,35 @@ public class GridCacheProcessor extends GridProcessorAdapter { cachesInfo.onDisconnect(); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { - List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size()); - - GridCompoundFuture<?, ?> stopFut = null; - - Set<String> stoppedCaches = cachesInfo.onReconnected(); + /** + * @param cctx Cache context. + * @param stoppedCaches List where stopped cache should be added. + */ + private void stopCacheOnReconnect(GridCacheContext cctx, List<GridCacheAdapter> stoppedCaches) { + cctx.gate().reconnected(true); - // TODO IGNITE-5075. - for (CacheGroupInfrastructure grp : cacheGrps.values()) - grp.onReconnected(); + sharedCtx.removeCacheContext(cctx); - for (final GridCacheAdapter cache : caches.values()) { - boolean stopped = stoppedCaches.contains(cache.name()); + caches.remove(cctx.name()); + jCacheProxies.remove(cctx.name()); - if (stopped) { - cache.context().gate().reconnected(true); + stoppedCaches.add(cctx.cache()); + } - sharedCtx.removeCacheContext(cache.ctx); + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size()); - caches.remove(cache.name()); - jCacheProxies.remove(cache.name()); + ClusterCachesReconnectResult reconnectRes = cachesInfo.onReconnected(); - IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new Runnable() { - @Override public void run() { - onKernalStop(cache, true); - stopCache(cache, true, false); - } - }); + final List<GridCacheAdapter> stoppedCaches = new ArrayList<>(); - if (stopFut == null) - stopFut = new GridCompoundFuture<>(); + for (final GridCacheAdapter cache : caches.values()) { + boolean stopped = reconnectRes.stoppedCacheGroups().contains(cache.context().groupId()) + || reconnectRes.stoppedCaches().contains(cache.name()); - stopFut.add((IgniteInternalFuture)fut); - } + if (stopped) + stopCacheOnReconnect(cache.context(), stoppedCaches); else { cache.onReconnected(); @@ -1128,7 +1122,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = cacheDescriptor(cctx.name()); - assert desc != null; + assert desc != null : cctx.name(); ctx.query().onCacheStop0(cctx.name()); ctx.query().onCacheStart0(cctx, desc.schema()); @@ -1136,13 +1130,39 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + for (CacheGroupInfrastructure grp : cacheGrps.values()) { + Integer grpId = reconnectRes.newCacheGroupIds().get(grp.groupId()); + + if (grpId != null) + grp.onReconnected(grpId); + else + cacheGrps.remove(grp.groupId()); + } + sharedCtx.onReconnected(); + sharedCtx.io().onReconnected(reconnectRes.newCacheGroupIds()); + for (GridCacheAdapter cache : reconnected) cache.context().gate().reconnected(false); - if (stopFut != null) - stopFut.markInitialized(); + IgniteInternalFuture<?> stopFut = null; + + if (!stoppedCaches.isEmpty()) { + stopFut = ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + for (GridCacheAdapter cache : stoppedCaches) { + CacheGroupInfrastructure grp = cache.context().group(); + + onKernalStop(cache, true); + stopCache(cache, true, false); + + if (!grp.hasCaches()) + grp.stopGroup(); + } + } + }); + } return stopFut; } @@ -1256,12 +1276,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel, destroy); } - ctx.group().stopCache(ctx, destroy); - ctx.kernalContext().continuous().onCacheStop(ctx); ctx.kernalContext().cache().context().database().onCacheStop(ctx); + ctx.group().stopCache(ctx, destroy); + U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore())); if (log.isInfoEnabled())
