Repository: ignite Updated Branches: refs/heads/ignite-5075 aee103166 -> 19a4c4d58
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19a4c4d5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19a4c4d5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19a4c4d5 Branch: refs/heads/ignite-5075 Commit: 19a4c4d58d3434f497a031abc3880a3b3d69a4a5 Parents: aee1031 Author: sboikov <[email protected]> Authored: Thu May 11 18:45:01 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 11 19:13:51 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 97 ++++++++++----- .../processors/cache/CacheGroupDescriptor.java | 4 + .../cache/CacheGroupInfrastructure.java | 118 ++++++++++++++----- .../processors/cache/ClusterCachesInfo.java | 2 - .../processors/cache/ExchangeActions.java | 8 +- .../processors/cache/GridCacheProcessor.java | 44 +++++-- .../cache/IgniteCacheOffheapManager.java | 19 ++- .../cache/IgniteCacheOffheapManagerImpl.java | 12 +- 8 files changed, 222 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index f85d110..90e6c8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -189,7 +189,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param topVer Expected topology version. */ - private void onCacheStopped(AffinityTopologyVersion topVer) { + private void onCacheGroupStopped(AffinityTopologyVersion topVer) { CacheAffinityChangeMessage msg = null; synchronized (mux) { @@ -402,26 +402,52 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap initStartedGroupOnCoordinator(fut, grpDesc); } - for (DynamicCacheChangeRequest req : exchActions.closeRequests(cctx.localNodeId())) { - cctx.cache().blockGateway(req); -// TODO IGNITE-5075. -// if (crd) { -// GridCacheContext cacheCtx = cctx.cacheContext(cacheId); -// -// // Client cache was stopped, need create 'client' CacheHolder. -// if (cacheCtx != null && !cacheCtx.affinityNode()) { -// CacheHolder cache = caches.remove(cacheId); -// -// assert !cache.client() : cache; -// -// cache = CacheHolder2.create(cctx, -// cctx.cache().cacheDescriptor(cacheId), -// fut, -// cache.affinity()); -// -// caches.put(cacheId, cache); -// } -// } + List<ExchangeActions.ActionData> closeReqs = exchActions.closeRequests(cctx.localNodeId()); + + for (ExchangeActions.ActionData req : closeReqs) { + cctx.cache().blockGateway(req.request()); + + if (crd) { + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(req.descriptor().groupDescriptor().groupId()); + + assert grp != null; + + if (grp.affinityNode()) + continue; + + boolean grpClosed = false; + + if (grp.sharedGroup()) { + boolean cacheRemaining = false; + + for (GridCacheContext ctx : cctx.cacheContexts()) { + if (ctx.group() == grp && !cacheClosed(ctx.cacheId(), closeReqs)) { + cacheRemaining = true; + + break; + } + } + + if (!cacheRemaining) + grpClosed = true; + } + else + grpClosed = true; + + // All client caches were stopped, need create 'client' CacheGroupHolder. + if (grpClosed) { + CacheGroupHolder grpHolder = grpHolders.remove(grp.groupId()); + + assert !grpHolder.client() : grpHolder; + + grpHolder = CacheGroupHolder2.create(cctx, + registeredGrps.get(grp.groupId()), + fut, + grp.affinity()); + + grpHolders.put(grp.groupId(), grpHolder); + } + } } for (ExchangeActions.ActionData action : exchActions.stopRequests()) @@ -429,18 +455,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap Set<Integer> stoppedGrps = null; - for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) { - if (crd && grpDesc.config().getCacheMode() != LOCAL) { - CacheGroupHolder cacheGrp = grpHolders.remove(grpDesc.groupId()); + if (crd) { + for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) { + if (grpDesc.config().getCacheMode() != LOCAL) { + CacheGroupHolder cacheGrp = grpHolders.remove(grpDesc.groupId()); - assert cacheGrp != null : grpDesc; + assert cacheGrp != null : grpDesc; - if (stoppedGrps == null) - stoppedGrps = new HashSet<>(); + if (stoppedGrps == null) + stoppedGrps = new HashSet<>(); - stoppedGrps.add(cacheGrp.groupId()); + stoppedGrps.add(cacheGrp.groupId()); - cctx.io().removeHandler(cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); + cctx.io().removeHandler(cacheGrp.groupId(), GridDhtAffinityAssignmentResponse.class); + } } } @@ -466,7 +494,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - onCacheStopped(topVer); + onCacheGroupStopped(topVer); } }); } @@ -475,6 +503,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap return exchActions.clientOnlyExchange(); } + private boolean cacheClosed(int cacheId, List<ExchangeActions.ActionData> closeReqs) { + for (ExchangeActions.ActionData req : closeReqs) { + if (req.descriptor().cacheId() == cacheId) + return true; + } + + return false; + } + /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java index c418002..10acc36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java @@ -102,6 +102,10 @@ public class CacheGroupDescriptor { return caches != null; } + public boolean sharedGroup() { + return grpName != null; + } + public String groupName() { return grpName; } http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 04b45a0..8c9c4c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -52,25 +52,28 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF */ public class CacheGroupInfrastructure { /** */ + private final IgniteLogger log; + + /** */ private GridAffinityAssignmentCache aff; /** */ private final int grpId; /** */ - private final CacheConfiguration ccfg; + private UUID rcvdFrom; /** */ - private final GridCacheSharedContext ctx; + private final AffinityTopologyVersion locStartVer; /** */ - private final IgniteLogger log; + private final CacheConfiguration ccfg; /** */ - private GridDhtPartitionTopologyImpl top; + private final GridCacheSharedContext ctx; /** */ - private final AffinityTopologyVersion locStartVer; + private GridDhtPartitionTopologyImpl top; /** */ private IgniteCacheOffheapManager offheapMgr; @@ -93,9 +96,6 @@ public class CacheGroupInfrastructure { /** ReuseList instance this group is associated with */ private final ReuseList reuseList; - /** */ - private final CacheType cacheType; - /** IO policy. */ private final byte ioPlc; @@ -105,8 +105,13 @@ public class CacheGroupInfrastructure { /** */ private boolean storeCacheId; + /** Flag indicating that this cache is in a recovery mode. */ + // TODO IGNITE-5075 see GridCacheContext#needsRecovery + private boolean needsRecovery; + /** */ - private UUID rcvdFrom; + private GridCacheContext singleCacheCtx; + /** * @param grpId Group ID. @@ -129,7 +134,6 @@ public class CacheGroupInfrastructure { this.grpId = grpId; this.rcvdFrom = rcvdFrom; - this.cacheType = cacheType; this.ctx = ctx; this.ccfg = ccfg; this.affNode = affNode; @@ -148,6 +152,9 @@ public class CacheGroupInfrastructure { log = ctx.kernalContext().log(getClass()); } + /** + * @return Node ID initiated cache group start. + */ public UUID receivedFrom() { return rcvdFrom; } @@ -163,6 +170,9 @@ public class CacheGroupInfrastructure { return depEnabled; } + /** + * @return Preloader. + */ public GridCachePreloader preloader() { return preldr; } @@ -174,15 +184,18 @@ public class CacheGroupInfrastructure { return ioPlc; } - /** */ - private GridCacheContext singleCacheCtx; - + /** + * @param singleCacheCtx Cache context if group contains single cache. + */ public void cacheContext(GridCacheContext singleCacheCtx) { assert !sharedGroup(); this.singleCacheCtx = singleCacheCtx; } + /** + * @return Cache context if group contains single cache. + */ public GridCacheContext cacheContext() { assert !sharedGroup(); @@ -214,11 +227,15 @@ public class CacheGroupInfrastructure { /** * TODO IGNITE-5075: get rid of CacheObjectContext? + * @return Cache object context. */ public CacheObjectContext cacheObjectContext() { return cacheObjCtx; } + /** + * @return Cache shared context. + */ public GridCacheSharedContext shared() { return ctx; } @@ -230,18 +247,30 @@ public class CacheGroupInfrastructure { return memPlc; } + /** + * @return {@code True} if local node is affinity node. + */ public boolean affinityNode() { return affNode; } + /** + * @return Topology. + */ + public GridDhtPartitionTopology topology() { + if (top == null) + throw new IllegalStateException("Topology is not initialized: " + name()); + + return top; + } + + /** + * @return Offheap manager. + */ public IgniteCacheOffheapManager offheap() { return offheapMgr; } - /** Flag indicating that this cache is in a recovery mode. */ - // TODO IGNITE-5075 see GridCacheContext#needsRecovery - private boolean needsRecovery; - /** * @return Current cache state. Must only be modified during exchange. */ @@ -261,6 +290,9 @@ public class CacheGroupInfrastructure { return false; } + /** + * @return Topology version when group was started on local node. + */ public AffinityTopologyVersion localStartVersion() { return locStartVer; } @@ -272,22 +304,37 @@ public class CacheGroupInfrastructure { return ccfg.getCacheMode() == LOCAL; } + /** + * @return Cache configuration. + */ public CacheConfiguration config() { return ccfg; } + /** + * @return Affinity. + */ public GridAffinityAssignmentCache affinity() { return aff; } + /** + * @return Group name. + */ @Nullable public String name() { return ccfg.getGroupName(); } + /** + * @return Group ID. + */ public int groupId() { return grpId; } + /** + * @return {@code True} if group can contain multiple caches. + */ public boolean sharedGroup() { return ccfg.getGroupName() != null; } @@ -297,11 +344,33 @@ public class CacheGroupInfrastructure { return false; } + /** + * + */ public void onKernalStop() { - if (preldr != null) - preldr.onKernalStop(); + preldr.onKernalStop(); + + offheapMgr.onKernalStop(); } + /** + * @param cacheId Cache ID. + * @param destroy Destroy flag. + */ + void stopCache(int cacheId, boolean destroy) { + offheapMgr.stopCache(cacheId, destroy); + } + + /** + * + */ + void stopGroup() { + offheapMgr.stop(); + } + + /** + * @throws IgniteCheckedException If failed. + */ public void start() throws IgniteCheckedException { aff = new GridAffinityAssignmentCache(ctx.kernalContext(), name(), @@ -404,10 +473,9 @@ public class CacheGroupInfrastructure { } /** - * @param reconnectFut + * @param reconnectFut Reconnect future. */ public void onDisconnected(IgniteFuture reconnectFut) { - // TODO IGNITE-5075. IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, "Failed to wait for topology update, client disconnected."); @@ -426,7 +494,6 @@ public class CacheGroupInfrastructure { * */ public void onReconnected() { - // TODO IGNITE-5075. aff.onReconnected(); if (top != null) @@ -435,11 +502,4 @@ public class CacheGroupInfrastructure { if (preldr != null) preldr.onReconnected(); } - - public GridDhtPartitionTopology topology() { - if (top == null) - throw new IllegalStateException("Topology is not initialized: " + name()); - - return top; - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 8b23e5b..b62a3cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -374,8 +374,6 @@ class ClusterCachesInfo { } } else if (req.stop()) { - assert req.stop() ^ req.close() : req; - if (desc != null) { DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index 977f544..acfd487 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -71,8 +71,8 @@ public class ExchangeActions { * @param nodeId Local node ID. * @return Close cache requests. */ - public List<DynamicCacheChangeRequest> closeRequests(UUID nodeId) { - List<DynamicCacheChangeRequest> res = null; + public List<ActionData> closeRequests(UUID nodeId) { + List<ActionData> res = null; if (cachesToClose != null) { for (ActionData req : cachesToClose.values()) { @@ -80,12 +80,12 @@ public class ExchangeActions { if (res == null) res = new ArrayList<>(cachesToClose.size()); - res.add(req.req); + res.add(req); } } } - return res != null ? res : Collections.<DynamicCacheChangeRequest>emptyList(); + return res != null ? res : Collections.<ActionData>emptyList(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index be3b4fd..a242649 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -971,6 +971,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { stopCache(cache, cancel, false); } + for (CacheGroupInfrastructure grp :cacheGrps.values()) + stopCacheGroup(grp.groupId()); + cachesInfo.clearCaches(); } @@ -993,6 +996,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { // No new caches should be added after this point. exch.onKernalStop(cancel); + for (CacheGroupInfrastructure grp : cacheGrps.values()) + grp.onKernalStop(); + onKernalStopCaches(cancel); cancelFutures(); @@ -1052,6 +1058,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (IgniteInternalFuture fut : pendingTemplateFuts.values()) ((GridFutureAdapter)fut).onDone(err); + for (CacheGroupInfrastructure grp : cacheGrps.values()) + grp.onDisconnected(reconnectFut); + for (GridCacheAdapter cache : caches.values()) { GridCacheContext cctx = cache.context(); @@ -1079,6 +1088,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { Set<String> stoppedCaches = cachesInfo.onReconnected(); + // TODO IGNITE-5075. + for (CacheGroupInfrastructure grp : cacheGrps.values()) + grp.onReconnected(); + for (final GridCacheAdapter cache : caches.values()) { boolean stopped = stoppedCaches.contains(cache.name()); @@ -1120,13 +1133,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } } -// TODO -// if (clientReconnectReqs != null) { -// for (Map.Entry<UUID, DynamicCacheChangeBatch> e : clientReconnectReqs.entrySet()) -// processClientReconnectData(e.getKey(), e.getValue()); -// -// clientReconnectReqs = null; -// } sharedCtx.onReconnected(); @@ -1248,6 +1254,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel, destroy); } + ctx.group().stopCache(ctx.cacheId(), destroy); + ctx.kernalContext().continuous().onCacheStop(ctx); ctx.kernalContext().cache().context().database().onCacheStop(ctx); @@ -1979,9 +1987,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx.removeCacheContext(ctx); -// assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment IDs [req=" + req + -// ", ctxDepId=" + ctx.dynamicDeploymentId() + ']'; - onKernalStop(cache, req.destroy()); stopCache(cache, true, req.destroy()); @@ -2019,8 +2024,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { prepareCacheStop(action.request()); } - for (DynamicCacheChangeRequest req : exchActions.closeRequests(ctx.localNodeId())) { - String cacheName = req.cacheName(); + for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop()) + stopCacheGroup(grpDesc.groupId()); + + for (ExchangeActions.ActionData req : exchActions.closeRequests(ctx.localNodeId())) { + String cacheName = req.request().cacheName(); IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); @@ -2037,7 +2045,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy.context().gate().onStopped(); - prepareCacheStop(req); + prepareCacheStop(req.request()); } } } @@ -2045,6 +2053,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param grpId Group ID. + */ + private void stopCacheGroup(int grpId) { + CacheGroupInfrastructure grp = cacheGrps.remove(grpId); + + if (grp != null) + grp.stopGroup(); + } + + /** * @param cacheName Cache name. * @param deploymentId Future deployment ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 5a3e0c0..4afaa23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -39,11 +39,28 @@ import org.jetbrains.annotations.Nullable; */ @SuppressWarnings("WeakerAccess") public interface IgniteCacheOffheapManager { + /** + * @param ctx Context. + * @param grp Cache group. + * @throws IgniteCheckedException If failed. + */ public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException;; + /** + * + */ public void onKernalStop(); - public void stop(boolean destroy); + /** + * @param cacheId Cache ID. + * @param destroy Destroy data flag. + */ + public void stopCache(int cacheId, boolean destroy); + + /** + * + */ + public void stop(); /** * Partition counter update callback. May be overridden by plugin-provided subclasses. http://git-wip-us.apache.org/repos/asf/ignite/blob/19a4c4d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index cfbec12..8321af3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -168,9 +168,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void stop(final boolean destroy) { + @Override public void stopCache(int cacheId, final boolean destroy) { if (destroy && grp.affinityNode()) - destroyCacheDataStructures(destroy); + destroyCacheDataStructures(cacheId, destroy); + } + + /** {@inheritDoc} */ + @Override public void stop() { + // TODO IGNITE-5075. } /** {@inheritDoc} */ @@ -181,7 +186,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** * */ - protected void destroyCacheDataStructures(boolean destroy) { + protected void destroyCacheDataStructures(int cacheId, boolean destroy) { + // TODO IGNITE-5075. assert grp.affinityNode(); try {
