Repository: ignite Updated Branches: refs/heads/ignite-5075-pds 848e8eeaa -> a3ef1cc3c
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a3ef1cc3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a3ef1cc3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a3ef1cc3 Branch: refs/heads/ignite-5075-pds Commit: a3ef1cc3ce70b69e928074569a92bf77203b5dda Parents: 848e8ee Author: sboikov <[email protected]> Authored: Thu Jun 1 10:29:04 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 1 10:29:04 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 13 ++++++++++++ .../pagemem/snapshot/SnapshotOperation.java | 18 ++++++++++++++-- .../processors/cache/ExchangeActions.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 22 +++++++++++++++++++- .../GridCacheDatabaseSharedManager.java | 14 +++++++------ .../database/file/FilePageStoreManager.java | 11 +++++++--- 6 files changed, 67 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a3ef1cc3/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index a2e62a0..4bf36ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1746,6 +1746,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Checks if node is a data node for the given cache group. + * + * @param node Node to check. + * @param grpId Cache group ID. + * @return {@code True} if node is a cache data node. + */ + public boolean cacheGroupAffinityNode(ClusterNode node, int grpId) { + CacheGroupAffinity aff = registeredCacheGrps.get(grpId); + + return CU.affinityNode(node, aff.cacheFilter); + } + + /** * @param node Node to check. * @param cacheName Cache name. * @return {@code True} if node has near cache enabled. http://git-wip-us.apache.org/repos/asf/ignite/blob/a3ef1cc3/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java index 39a76dd..bdcc05a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java @@ -37,7 +37,10 @@ public class SnapshotOperation implements Serializable { */ private final long snapshotId; - /** */ + /** Cache group ids. */ + private final Set<Integer> cacheGrpIds; + + /** Cache names. */ private final Set<String> cacheNames; /** Message. */ @@ -49,6 +52,7 @@ public class SnapshotOperation implements Serializable { /** * @param type Type. * @param snapshotId Snapshot id. + * @param cacheGrpIds Cache group ids. * @param cacheNames Cache names. * @param msg * @param extraParam Additional parameter. @@ -56,12 +60,14 @@ public class SnapshotOperation implements Serializable { public SnapshotOperation( SnapshotOperationType type, long snapshotId, + Set<Integer> cacheGrpIds, Set<String> cacheNames, String msg, Object extraParam ) { this.type = type; this.snapshotId = snapshotId; + this.cacheGrpIds = cacheGrpIds; this.cacheNames = cacheNames; this.msg = msg; this.extraParam = extraParam; @@ -84,10 +90,17 @@ public class SnapshotOperation implements Serializable { } /** - * Cache names included to this snapshot. + * Cache group ids included to this snapshot. * * @return Cache names. */ + public Set<Integer> cacheGroupIds() { + return cacheGrpIds; + } + + /** + * Cache names included to this snapshot. + */ public Set<String> cacheNames() { return cacheNames; } @@ -170,6 +183,7 @@ public class SnapshotOperation implements Serializable { "type=" + type + ", snapshotId=" + snapshotId + ", cacheNames=" + cacheNames + + ", cacheGroupIds=" + cacheGrpIds + ", msg='" + msg + '\'' + ", extraParam=" + extraParam + '}'; http://git-wip-us.apache.org/repos/asf/ignite/blob/a3ef1cc3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index 8baf70e..0a34314 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -333,7 +333,7 @@ public class ExchangeActions { /** * @param grpDesc Group descriptor. */ - void addCacheGroupToStop(CacheGroupDescriptor grpDesc) { + public void addCacheGroupToStop(CacheGroupDescriptor grpDesc) { assert grpDesc != null; if (cacheGrpsToStop == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/a3ef1cc3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 7c5c3b4..0e4fc30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDisc import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; @@ -570,8 +571,24 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT exchActions = new ExchangeActions(); + Set<String> cacheNames = new HashSet<>(); + + Collection<CacheGroupDescriptor> grpDescs = new ArrayList<>(); + + for (Integer grpId : op.cacheGroupIds()) { + CacheGroupInfrastructure cacheGrp = cctx.cache().cacheGroup(grpId); + + if (cacheGrp == null) + continue; + + grpDescs.add(cctx.cache().cacheGroupDescriptors().get(grpId)); + + for (Integer cacheId : cacheGrp.cacheIds()) + cacheNames.add(cctx.cacheContext(cacheId).name()); + } + List<DynamicCacheChangeRequest> destroyRequests = getStopCacheRequests( - cctx.cache(), op.cacheNames(), cctx.localNodeId()); + cctx.cache(), cacheNames, cctx.localNodeId()); if (!F.isEmpty(destroyRequests)) { //Emulate destroy cache request for (DynamicCacheChangeRequest req : destroyRequests) { @@ -579,6 +596,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT .cacheDescriptor(CU.cacheId(req.cacheName()))); } + for (CacheGroupDescriptor grpDesc : grpDescs) + exchActions.addCacheGroupToStop(grpDesc); + if (op.type() == SnapshotOperationType.RESTORE) cctx.cache().onCustomEvent(new DynamicCacheChangeBatch(destroyRequests), topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/a3ef1cc3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java index 042d44c..9825f17 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java @@ -769,12 +769,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan for (IgniteBiTuple<CacheGroupInfrastructure, Boolean> tup : stoppedGrps) { CacheGroupInfrastructure grp = tup.get1(); - try { - cctx.pageStore().shutdownForCacheGroup(grp, tup.get2()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully clean page store resources for destroyed cache " + - "[cache=" + grp.cacheOrGroupName() + "]", e); + if (grp.affinityNode()) { + try { + cctx.pageStore().shutdownForCacheGroup(grp, tup.get2()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to gracefully clean page store resources for destroyed cache " + + "[cache=" + grp.cacheOrGroupName() + "]", e); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a3ef1cc3/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java index 9c8f4f3..fc13b7c 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java @@ -217,6 +217,8 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg); File file; + assert cacheWorkDir.exists() : "Work directory does not exist: " + cacheWorkDir; + if (grpDesc.sharedGroup()) file = new File(cacheWorkDir, ccfg.getName() + CACHE_CONF_FILENAME); else @@ -549,11 +551,14 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen } /** - * @param cacheName Cache name. + * @param ccfg Cache configuration. * @return Store dir for given cache. */ - public File cacheWorkDir(String cacheName) { - return new File(storeWorkDir, "cache-" + cacheName); + public File cacheWorkDir(CacheConfiguration ccfg) { + String dirName = ccfg.getGroupName() == null ? + CACHE_DIR_PREFIX + ccfg.getName() : CACHE_GRP_DIR_PREFIX + ccfg.getGroupName(); + + return new File(storeWorkDir, dirName); } /**
