IGNITE-6512 Add an option to start caches in inactive state - Fixes #2772. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fcabfcad Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fcabfcad Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fcabfcad Branch: refs/heads/ignite-5937 Commit: fcabfcade8840375aad6b37e7ee2cd52cf1f6066 Parents: 7a61c15 Author: Ivan Rakov <[email protected]> Authored: Mon Oct 23 17:11:05 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Oct 23 17:11:05 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 2 + .../cache/CacheAffinitySharedManager.java | 6 +- .../cache/DynamicCacheChangeRequest.java | 17 ++++ .../processors/cache/GridCacheAdapter.java | 17 ++++ .../processors/cache/GridCacheProcessor.java | 98 +++++++++++++++----- 5 files changed, 117 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8a71e1a..ba42a95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2863,6 +2863,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ctx.cache().dynamicStartCaches(cacheCfgs, true, + true, true).get(); List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size()); @@ -2953,6 +2954,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ctx.cache().dynamicStartCaches(cacheCfgs, false, + true, true).get(); List<IgniteCache> createdCaches = new ArrayList<>(cacheCfgs.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index eaaa24d..7266f99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -428,7 +428,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.cache().prepareCacheStart(desc.cacheConfiguration(), desc, startReq.nearCacheConfiguration(), - topVer); + topVer, + startReq.activeAfterStart()); startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null); @@ -751,7 +752,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.cache().prepareCacheStart(req.startCacheConfiguration(), cacheDesc, nearCfg, - evts.topologyVersion()); + evts.topologyVersion(), + req.activeAfterStart()); if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) { if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 2fd8780..cfc2d07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -68,6 +68,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** Restart flag. */ private boolean restart; + /** Cache active on start or not*/ + private boolean activeAfterStart = true; + /** Destroy. */ private boolean destroy; @@ -404,6 +407,20 @@ public class DynamicCacheChangeRequest implements Serializable { this.locallyConfigured = locallyConfigured; } + /** + * @return state of cache after start + */ + public boolean activeAfterStart() { + return activeAfterStart; + } + + /** + * @param activeAfterStart state of cache after start + */ + public void activeAfterStart(boolean activeAfterStart) { + this.activeAfterStart = activeAfterStart; + } + /** {@inheritDoc} */ @Override public String toString() { return "DynamicCacheChangeRequest [cacheName=" + cacheName() + http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9bdce35..e9c86b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -286,6 +286,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Asynchronous operations limit semaphore. */ private Semaphore asyncOpsSem; + /** Active. */ + private volatile boolean active; + /** {@inheritDoc} */ @Override public String name() { return cacheCfg.getName(); @@ -448,6 +451,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * + */ + public boolean active() { + return active; + } + + /** + * @param active Active. + */ + public void active(boolean active) { + this.active = active; + } + + /** * @return Preloader. */ public abstract GridCachePreloader preloader(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fcabfcad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ad8f74a..d4d65dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1292,6 +1292,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cacheObjCtx Cache object context. * @param affNode {@code True} if local node affinity node. * @param updatesAllowed Updates allowed flag. + * @param activeOnStart If true, then we will discard restarting state from proxies. If false then we will change + * state of proxies to restarting * @return Cache context. * @throws IgniteCheckedException If failed to create cache. */ @@ -1302,7 +1304,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { AffinityTopologyVersion locStartTopVer, CacheObjectContext cacheObjCtx, boolean affNode, - boolean updatesAllowed) + boolean updatesAllowed, + boolean activeOnStart) throws IgniteCheckedException { assert cfg != null; @@ -1461,6 +1464,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + cache.active(activeOnStart); + cacheCtx.cache(cache); GridCacheContext<?, ?> ret = cacheCtx; @@ -1682,8 +1687,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { desc.cacheConfiguration(), desc, t.get2(), - exchTopVer - ); + exchTopVer, + true); } } } @@ -1716,8 +1721,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { desc.cacheConfiguration(), desc, null, - exchTopVer - ); + exchTopVer, + true); } } @@ -1729,22 +1734,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param desc Cache descriptor. * @param reqNearCfg Near configuration if specified for client cache start request. * @param exchTopVer Current exchange version. + * @param activeOnStart If true, then we will discard restarting state from proxies. If false then we will change + * state of proxies to restarting * @throws IgniteCheckedException If failed. */ void prepareCacheStart( CacheConfiguration startCfg, DynamicCacheDescriptor desc, @Nullable NearCacheConfiguration reqNearCfg, - AffinityTopologyVersion exchTopVer + AffinityTopologyVersion exchTopVer, + boolean activeOnStart ) throws IgniteCheckedException { assert !caches.containsKey(startCfg.getName()) : startCfg.getName(); CacheConfiguration ccfg = new CacheConfiguration(startCfg); - IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName()); - - boolean proxyRestart = proxy != null && proxy.isRestarting() && !caches.containsKey(ccfg.getName()); - CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); boolean affNode; @@ -1803,7 +1807,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { exchTopVer, cacheObjCtx, affNode, - true); + true, + activeOnStart + ); cacheCtx.dynamicDeploymentId(desc.deploymentId()); @@ -1819,11 +1825,35 @@ public class GridCacheProcessor extends GridProcessorAdapter { onKernalStart(cache); - if (proxyRestart) + IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName()); + + if (activeOnStart && proxy != null && proxy.isRestarting()) proxy.onRestarted(cacheCtx, cache); } /** + * Restarts proxies of caches if they was marked as restarting. + * Requires external synchronization - shouldn't be called concurrently with another caches restart. + */ + public void restartProxies() { + for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) { + if (proxy == null) + continue; + + GridCacheContext<?, ?> cacheCtx = sharedCtx.cacheContext(CU.cacheId(proxy.getName())); + + if (cacheCtx == null) + continue; + + if (proxy.isRestarting()) { + caches.get(proxy.getName()).active(true); + + proxy.onRestarted(cacheCtx, cacheCtx.cache()); + } + } + } + + /** * @param desc Group descriptor. * @param cacheType Cache type. * @param affNode Affinity node flag. @@ -1882,6 +1912,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Break the proxy before exchange future is done. IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cacheName); + if (restart) { + GridCacheAdapter<?, ?> cache = caches.get(cacheName); + + if (cache != null) + cache.active(false); + } + if (proxy != null) { if (stop) { if (restart) @@ -1905,6 +1942,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Break the proxy before exchange future is done. if (req.restart()) { + GridCacheAdapter<?, ?> cache = caches.get(req.cacheName()); + + if (cache != null) + cache.active(false); + proxy = jCacheProxies.get(req.cacheName()); if (proxy != null) @@ -1949,8 +1991,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheContext<?, ?> cacheCtx = cache.context(); if (cacheCtx.startTopologyVersion().equals(startTopVer) ) { - if (!jCacheProxies.containsKey(cacheCtx.name())) - jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false)); + if (!jCacheProxies.containsKey(cacheCtx.name())) { + IgniteCacheProxyImpl newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false); + + if (!cache.active()) + newProxy.restart(); + + jCacheProxies.putIfAbsent(cacheCtx.name(), newProxy); + } if (cacheCtx.preloader() != null) cacheCtx.preloader().onInitialExchangeComplete(err); @@ -2497,7 +2545,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheType, sql, failIfExists, - failIfNotStarted); + failIfNotStarted, + true); if (req != null) { if (req.clientStartOnly()) @@ -2544,11 +2593,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param ccfgList Collection of cache configuration. * @param failIfExists Fail if exists flag. * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. + * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. * @return Future that will be completed when all caches are deployed. */ public IgniteInternalFuture<?> dynamicStartCaches(Collection<CacheConfiguration> ccfgList, boolean failIfExists, - boolean checkThreadTx) { - return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx); + boolean checkThreadTx, boolean activeAfterStart) { + return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx, activeAfterStart); } /** @@ -2558,13 +2608,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cacheType Cache type. * @param failIfExists Fail if exists flag. * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. + * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. * @return Future that will be completed when all caches are deployed. */ private IgniteInternalFuture<?> dynamicStartCaches( Collection<CacheConfiguration> ccfgList, CacheType cacheType, boolean failIfExists, - boolean checkThreadTx + boolean checkThreadTx, + boolean activeAfterStart ) { if (checkThreadTx) checkEmptyTransactions(); @@ -2592,8 +2644,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { ct, false, failIfExists, - true - ); + true, + activeAfterStart); if (req != null) { if (req.clientStartOnly()) { @@ -3755,6 +3807,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param sql Whether the cache needs to be created as the result of SQL {@code CREATE TABLE} command. * @param failIfExists Fail if exists flag. * @param failIfNotStarted If {@code true} fails if cache is not started. + * @param activeAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. * @return Request or {@code null} if cache already exists. * @throws IgniteCheckedException if some of pre-checks failed * @throws CacheExistsException if cache exists and failIfExists flag is {@code true} @@ -3766,7 +3819,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheType cacheType, boolean sql, boolean failIfExists, - boolean failIfNotStarted + boolean failIfNotStarted, + boolean activeAfterStart ) throws IgniteCheckedException { DynamicCacheDescriptor desc = cacheDescriptor(cacheName); @@ -3776,6 +3830,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.failIfExists(failIfExists); + req.activeAfterStart(activeAfterStart); + if (ccfg != null) { cloneCheckSerializable(ccfg);
