IGNITE-8911 Fixed while cache is restarting it's possible to start new cache with this name - Fixes #5717.
Signed-off-by: Dmitriy Govorukhin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3197f9b4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3197f9b4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3197f9b4 Branch: refs/heads/master Commit: 3197f9b48d658a0ca19388612d87f4e5bde5e15c Parents: 6ffaba5 Author: EdShangGG <[email protected]> Authored: Sat Dec 29 16:58:23 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Sat Dec 29 16:58:23 2018 +0300 ---------------------------------------------------------------------- .../jdbc2/JdbcAbstractDmlStatementSelfTest.java | 2 +- .../ignite/IgniteCacheRestartingException.java | 25 +- .../apache/ignite/internal/IgniteKernal.java | 23 +- .../processors/cache/ClusterCachesInfo.java | 571 ++++++++++++------- .../cache/DynamicCacheChangeRequest.java | 22 +- .../cache/GatewayProtectedCacheProxy.java | 25 +- .../processors/cache/GridCacheAdapter.java | 1 + .../GridCachePartitionExchangeManager.java | 10 +- .../processors/cache/GridCacheProcessor.java | 150 +++-- .../processors/cache/IgniteCacheProxyImpl.java | 457 +++++++++++++-- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../GridNearAtomicSingleUpdateFuture.java | 8 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +- .../colocated/GridDhtColocatedLockFuture.java | 6 +- .../distributed/near/GridNearLockFuture.java | 6 +- .../near/GridNearTxAbstractEnlistFuture.java | 6 +- .../near/TxTopologyVersionFuture.java | 6 +- .../persistence/file/FilePageStoreManager.java | 109 ++-- .../cache/persistence/pagemem/PageMemoryEx.java | 2 +- .../IgniteTxImplicitSingleStateImpl.java | 6 +- .../cache/transactions/IgniteTxStateImpl.java | 6 +- .../AtomicDataStructureProxy.java | 2 +- .../datastructures/DataStructuresProcessor.java | 22 +- .../datastructures/GridCacheRemovable.java | 7 + .../datastructures/GridCacheSetProxy.java | 2 +- .../cache/IgniteDynamicCacheStartSelfTest.java | 66 ++- ...PdsCacheStartStopWithFreqCheckpointTest.java | 2 +- .../query/h2/H2IndexingAbstractGeoSelfTest.java | 29 +- .../cache/index/AbstractSchemaSelfTest.java | 2 +- .../DynamicIndexAbstractBasicSelfTest.java | 2 +- 30 files changed, 1171 insertions(+), 418 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java index 636a717..0c3a885 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java @@ -131,7 +131,7 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - ((IgniteEx)ignite(0)).context().cache().dynamicDestroyCache(DEFAULT_CACHE_NAME, true, true, false); + ((IgniteEx)ignite(0)).context().cache().dynamicDestroyCache(DEFAULT_CACHE_NAME, true, true, false, null); if (conn != null) { conn.close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/IgniteCacheRestartingException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCacheRestartingException.java b/modules/core/src/main/java/org/apache/ignite/IgniteCacheRestartingException.java index a3a7490..1dbfc67 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCacheRestartingException.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCacheRestartingException.java @@ -18,6 +18,7 @@ package org.apache.ignite; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; @@ -29,26 +30,34 @@ public class IgniteCacheRestartingException extends IgniteException { private static final long serialVersionUID = 0L; /** */ - private final IgniteFuture<?> restartFut; + private final transient IgniteFuture<?> restartFut; + + /** + * @param cacheName Error message. + */ + public IgniteCacheRestartingException(String cacheName) { + this(null, cacheName, null); + } /** * @param restartFut Restart future. - * @param msg Error message. + * @param cacheName Error message. */ - public IgniteCacheRestartingException(IgniteFuture<?> restartFut, String msg) { - this(restartFut, msg, null); + public IgniteCacheRestartingException(IgniteFuture<?> restartFut, String cacheName) { + this(restartFut, cacheName, null); } /** * @param restartFut Restart future. - * @param msg Error message. + * @param cacheName Cache name what is restarting. * @param cause Optional nested exception (can be {@code null}). */ public IgniteCacheRestartingException( IgniteFuture<?> restartFut, - String msg, - @Nullable Throwable cause) { - super(msg, cause); + String cacheName, + @Nullable Throwable cause + ) { + super("Cache is restarting:" + cacheName + ", you could wait restart completion with restartFuture", cause); this.restartFut = restartFut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b84771a..3a3af8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -3039,7 +3039,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { Boolean res = false; - if (ctx.cache().cache(cacheName) == null) { + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName, false, true); + + if (cache == null) { res = sql ? ctx.cache().dynamicStartSqlCache(cacheCfg).get() : ctx.cache().dynamicStartCache(cacheCfg, @@ -3048,9 +3050,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { false, true, true).get(); - } - return new IgniteBiTuple<>(ctx.cache().publicJCache(cacheName), res); + return new IgniteBiTuple<>(ctx.cache().publicJCache(cacheName), res); + } + else + return new IgniteBiTuple<>(cache, res); } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); @@ -3298,7 +3302,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { checkClusterState(); - return ctx.cache().dynamicDestroyCache(cacheName, sql, checkThreadTx, false); + return ctx.cache().dynamicDestroyCache(cacheName, sql, checkThreadTx, false, null); } finally { unguard(); @@ -3318,7 +3322,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { checkClusterState(); - return ctx.cache().dynamicDestroyCaches(cacheNames, checkThreadTx, false); + return ctx.cache().dynamicDestroyCaches(cacheNames, checkThreadTx); } finally { unguard(); @@ -3334,10 +3338,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { try { checkClusterState(); - if (ctx.cache().cache(cacheName) == null) + IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName, false, true); + + if (cache == null) { ctx.cache().getOrCreateFromTemplate(cacheName, true).get(); - return ctx.cache().publicJCache(cacheName); + return ctx.cache().publicJCache(cacheName); + } + + return cache; } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index b97d12f..e3b67f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.query.QuerySchema; import org.apache.ignite.internal.processors.query.QuerySchemaPatch; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -79,6 +78,9 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType * Logic related to cache discovery data processing. */ class ClusterCachesInfo { + /** Representation of null for restarting caches map */ + private static final IgniteUuid NULL_OBJECT = new IgniteUuid(); + /** Version since which merge of config is supports. */ private static final IgniteProductVersion V_MERGE_CONFIG_SINCE = IgniteProductVersion.fromString("2.5.0"); @@ -94,8 +96,8 @@ class ClusterCachesInfo { /** Cache templates. */ private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>(); - /** Caches currently being restarted. */ - private final Set<String> restartingCaches = new GridConcurrentHashSet<>(); + /** Caches currently being restarted (with restarter id). */ + private final ConcurrentHashMap<String, IgniteUuid> restartingCaches = new ConcurrentHashMap<>(); /** */ private final IgniteLogger log; @@ -411,7 +413,7 @@ class ClusterCachesInfo { requests.add(DynamicCacheChangeRequest.stopRequest(ctx, cacheName, cacheDescr.sql(), true)); } - processCacheChangeRequests(exchangeActions, requests, topVer,false); + processCacheChangeRequests(exchangeActions, requests, topVer, false); failMsg.exchangeActions(exchangeActions); } @@ -468,296 +470,399 @@ class ClusterCachesInfo { ExchangeActions exchangeActions, Collection<DynamicCacheChangeRequest> reqs, AffinityTopologyVersion topVer, - boolean persistedCfgs) { + boolean persistedCfgs + ) { CacheChangeProcessResult res = new CacheChangeProcessResult(); final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>(); - for (DynamicCacheChangeRequest req : reqs) { - if (req.template()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); + for (DynamicCacheChangeRequest req : reqs) + processCacheChangeRequest0(req, exchangeActions, topVer, persistedCfgs, res, reqsToComplete); - assert ccfg != null : req; + if (!F.isEmpty(res.addedDescs)) { + AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer; - DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName()); + for (DynamicCacheDescriptor desc : res.addedDescs) { + assert desc.template() || res.needExchange; - if (desc == null) { - DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, - ccfg, - req.cacheType(), - null, - true, - req.initiatingNodeId(), - false, - false, - req.deploymentId(), - req.schema()); + desc.startTopologyVersion(startTopVer); + } + } - DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc); + if (!F.isEmpty(reqsToComplete)) { + ctx.closure().callLocalSafe(new Callable<Void>() { + @Override public Void call() throws Exception { + for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t : reqsToComplete) { + final DynamicCacheChangeRequest req = t.get1(); + AffinityTopologyVersion waitTopVer = t.get2(); - assert old == null; + IgniteInternalFuture<?> fut = waitTopVer != null ? + ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null; - res.addedDescs.add(templateDesc); + if (fut == null || fut.isDone()) + ctx.cache().completeCacheStartFuture(req, false, null); + else { + fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + ctx.cache().completeCacheStartFuture(req, false, null); + } + }); + } + } + + return null; } + }); + } - if (!persistedCfgs) - ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId()); + return res; + } - continue; - } + /** + * @param req Cache change request. + * @param exchangeActions Exchange actions to update. + * @param topVer Topology version. + * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation. + * @param res Accumulator for cache change process results. + * @param reqsToComplete Accumulator for cache change requests which should be completed after + * ({@link org.apache.ignite.internal.processors.cache.GridCacheProcessor#pendingFuts} + */ + private void processCacheChangeRequest0( + DynamicCacheChangeRequest req, + ExchangeActions exchangeActions, + AffinityTopologyVersion topVer, + boolean persistedCfgs, + CacheChangeProcessResult res, + List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete + ) { + String cacheName = req.cacheName(); - assert !req.clientStartOnly() : req; + if (req.template()) { + processTemplateAddRequest(persistedCfgs, res, req); - DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); + return; + } - boolean needExchange = false; + assert !req.clientStartOnly() : req; - boolean clientCacheStart = false; + DynamicCacheDescriptor desc = registeredCaches.get(cacheName); - AffinityTopologyVersion waitTopVer = null; + boolean needExchange = false; - if (req.start()) { - // Starting a new cache. - if (desc == null) { - String conflictErr = checkCacheConflict(req.startCacheConfiguration()); + boolean clientCacheStart = false; - if (conflictErr != null) { - U.warn(log, "Ignore cache start request. " + conflictErr); + AffinityTopologyVersion waitTopVer = null; - IgniteCheckedException err = new IgniteCheckedException("Failed to start " + - "cache. " + conflictErr); + if (req.start()) { + boolean proceedFuther = true; - if (persistedCfgs) - res.errs.add(err); - else - ctx.cache().completeCacheStartFuture(req, false, err); + if (restartingCaches.containsKey(cacheName) && + ((req.restartId() == null && restartingCaches.get(cacheName) != NULL_OBJECT) + || (req.restartId() != null &&!req.restartId().equals(restartingCaches.get(cacheName))))) { - continue; - } + if (req.failIfExists()) { + ctx.cache().completeCacheStartFuture(req, false, + new CacheExistsException("Failed to start cache (a cache is restarting): " + cacheName)); + } - if (req.clientStartOnly()) { - assert !persistedCfgs; + proceedFuther = false; + } - ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + - "client cache (a cache with the given name is not started): " + req.cacheName())); - } - else { - SchemaOperationException err = QueryUtils.checkQueryEntityConflicts( - req.startCacheConfiguration(), registeredCaches.values()); + if (proceedFuther) { + if (desc == null) { /* Starting a new cache.*/ + if (!processStartNewCacheRequest(exchangeActions, topVer, persistedCfgs, res, req, cacheName)) + return; - if (err != null) { - if (persistedCfgs) - res.errs.add(err); + needExchange = true; + } + else { + clientCacheStart = processStartAlreadyStartedCacheRequest(topVer, persistedCfgs, req, cacheName, desc); + + if (!clientCacheStart) { + if (desc.clientCacheStartVersion() != null) + waitTopVer = desc.clientCacheStartVersion(); + else { + AffinityTopologyVersion nodeStartVer = + new AffinityTopologyVersion(ctx.discovery().localNode().order(), 0); + + if (desc.startTopologyVersion() != null) + waitTopVer = desc.startTopologyVersion(); else - ctx.cache().completeCacheStartFuture(req, false, err); + waitTopVer = desc.receivedFromStartVersion(); - continue; + if (waitTopVer == null || nodeStartVer.compareTo(waitTopVer) > 0) + waitTopVer = nodeStartVer; } + } + } + } + } + else if (req.resetLostPartitions()) { + if (desc != null) { + needExchange = true; - CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration(); + exchangeActions.addCacheToResetLostPartitions(req, desc); + } + } + else if (req.stop()) { + if (desc != null) { + if (req.sql() && !desc.sql()) { + ctx.cache().completeCacheStartFuture(req, false, + new IgniteCheckedException("Only cache created with CREATE TABLE may be removed with " + + "DROP TABLE [cacheName=" + cacheName + ']')); - assert req.cacheType() != null : req; - assert F.eq(ccfg.getName(), req.cacheName()) : req; + return; + } - int cacheId = CU.cacheId(req.cacheName()); + processStopCacheRequest(exchangeActions, req, cacheName, desc); - CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions, - topVer, - ccfg, - cacheId, - req.initiatingNodeId(), - req.deploymentId(), - req.encryptionKey()); + needExchange = true; + } + } + else + assert false : req; - DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, - ccfg, - req.cacheType(), - grpDesc, - false, - req.initiatingNodeId(), - false, - req.sql(), - req.deploymentId(), - req.schema()); + if (!needExchange) { + if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId())) + reqsToComplete.add(new T2<>(req, waitTopVer)); + } + else + res.needExchange = true; + } - DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc); + /** + * @param req Cache change request. + * @param exchangeActions Exchange actions to update. + * @param cacheName Cache name. + * @param desc Dynamic cache descriptor. + */ + private void processStopCacheRequest( + ExchangeActions exchangeActions, + DynamicCacheChangeRequest req, + String cacheName, + DynamicCacheDescriptor desc + ) { + DynamicCacheDescriptor old = registeredCaches.remove(cacheName); - restartingCaches.remove(ccfg.getName()); + if (req.restart()) { + IgniteUuid restartId = req.restartId(); - assert old == null; + restartingCaches.put(cacheName, restartId == null ? NULL_OBJECT : restartId); + } - ctx.discovery().setCacheFilter( - startDesc.cacheId(), - grpDesc.groupId(), - ccfg.getName(), - ccfg.getNearConfiguration() != null); + assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']'; - if (!persistedCfgs) { - ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); - } + ctx.discovery().removeCacheFilter(cacheName); - res.addedDescs.add(startDesc); + exchangeActions.addCacheToStop(req, desc); - exchangeActions.addCacheToStart(req, startDesc); + CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId()); - needExchange = true; - } - } - else { - assert !persistedCfgs; - assert req.initiatingNodeId() != null : req; + assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc; - if (req.failIfExists()) { - ctx.cache().completeCacheStartFuture(req, false, - new CacheExistsException("Failed to start cache " + - "(a cache with the same name is already started): " + req.cacheName())); - } - else { - // Cache already exists, it is possible client cache is needed. - ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); + grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId()); - boolean clientReq = node != null && - !ctx.discovery().cacheAffinityNode(node, req.cacheName()); + if (!grpDesc.hasCaches()) { + registeredCacheGrps.remove(grpDesc.groupId()); - if (clientReq) { - ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); + ctx.discovery().removeCacheGroup(grpDesc); - if (node.id().equals(req.initiatingNodeId())) { - desc.clientCacheStartVersion(topVer); + exchangeActions.addCacheGroupToStop(grpDesc, req.destroy()); - clientCacheStart = true; + assert exchangeActions.checkStopRequestConsistency(grpDesc.groupId()); - ctx.discovery().clientCacheStartEvent(req.requestId(), F.asMap(req.cacheName(), req), null); - } - } - } + // If all caches in group will be destroyed it is not necessary to destroy single cache + // because group will be stopped anyway. + if (req.destroy()) { + for (ExchangeActions.CacheActionData action : exchangeActions.cacheStopRequests()) { + if (action.descriptor().groupId() == grpDesc.groupId()) + action.request().destroy(false); } + } + } + } - if (!needExchange && !clientCacheStart && desc != null) { - if (desc.clientCacheStartVersion() != null) - waitTopVer = desc.clientCacheStartVersion(); - else { - AffinityTopologyVersion nodeStartVer = - new AffinityTopologyVersion(ctx.discovery().localNode().order(), 0); + /** + * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation. + * @param res Accumulator for cache change process results. + * @param req Dynamic cache change request. + */ + private void processTemplateAddRequest( + boolean persistedCfgs, + CacheChangeProcessResult res, + DynamicCacheChangeRequest req + ) { + CacheConfiguration ccfg = req.startCacheConfiguration(); - if (desc.startTopologyVersion() != null) - waitTopVer = desc.startTopologyVersion(); - else - waitTopVer = desc.receivedFromStartVersion(); + assert ccfg != null : req; - if (waitTopVer == null || nodeStartVer.compareTo(waitTopVer) > 0) - waitTopVer = nodeStartVer; - } - } - } - else if (req.resetLostPartitions()) { - if (desc != null) { - needExchange = true; + DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName()); - exchangeActions.addCacheToResetLostPartitions(req, desc); - } - } - else if (req.stop()) { - if (desc != null) { - if (req.sql() && !desc.sql()) { - ctx.cache().completeCacheStartFuture(req, false, - new IgniteCheckedException("Only cache created with CREATE TABLE may be removed with " + - "DROP TABLE [cacheName=" + req.cacheName() + ']')); - - continue; - } + if (desc == null) { + DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx, + ccfg, + req.cacheType(), + null, + true, + req.initiatingNodeId(), + false, + false, + req.deploymentId(), + req.schema()); - DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName()); + DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc); - if (req.restart()) - restartingCaches.add(req.cacheName()); + assert old == null; - assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']'; + res.addedDescs.add(templateDesc); + } - ctx.discovery().removeCacheFilter(req.cacheName()); + if (!persistedCfgs) + ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId()); + } - needExchange = true; + /** + * @param topVer Topology version. + * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation. + * @param req Cache change request. + * @param cacheName Cache name. + * @param desc Dynamic cache descriptor. + * @return True if it is needed to start client cache. + */ + private boolean processStartAlreadyStartedCacheRequest( + AffinityTopologyVersion topVer, + boolean persistedCfgs, + DynamicCacheChangeRequest req, + String cacheName, + DynamicCacheDescriptor desc + ) { + assert !persistedCfgs; + assert req.initiatingNodeId() != null : req; + + if (req.failIfExists()) { + ctx.cache().completeCacheStartFuture(req, false, + new CacheExistsException("Failed to start cache " + + "(a cache with the same name is already started): " + cacheName)); + } + else { + // Cache already exists, it is possible client cache is needed. + ClusterNode node = ctx.discovery().node(req.initiatingNodeId()); - exchangeActions.addCacheToStop(req, desc); + boolean clientReq = node != null && + !ctx.discovery().cacheAffinityNode(node, cacheName); - CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId()); + if (clientReq) { + ctx.discovery().addClientNode(cacheName, + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); - assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc; + if (node.id().equals(req.initiatingNodeId())) { + desc.clientCacheStartVersion(topVer); - grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId()); + ctx.discovery().clientCacheStartEvent(req.requestId(), F.asMap(cacheName, req), null); - if (!grpDesc.hasCaches()) { - registeredCacheGrps.remove(grpDesc.groupId()); + return true; + } + } + } - ctx.discovery().removeCacheGroup(grpDesc); + return false; + } - exchangeActions.addCacheGroupToStop(grpDesc, req.destroy()); + /** + * @param exchangeActions Exchange actions to update. + * @param topVer Topology version. + * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation. + * @param res Accumulator for cache change process results. + * @param req Cache change request. + * @param cacheName Cache name. + * @return True if there was no errors. + */ + private boolean processStartNewCacheRequest( + ExchangeActions exchangeActions, + AffinityTopologyVersion topVer, + boolean persistedCfgs, + CacheChangeProcessResult res, + DynamicCacheChangeRequest req, + String cacheName + ) { + String conflictErr = checkCacheConflict(req.startCacheConfiguration()); - assert exchangeActions.checkStopRequestConsistency(grpDesc.groupId()); + if (conflictErr != null) { + U.warn(log, "Ignore cache start request. " + conflictErr); - // If all caches in group will be destroyed it is not necessary to destroy single cache - // because group will be stopped anyway. - if (req.destroy()) { - for (ExchangeActions.CacheActionData action : exchangeActions.cacheStopRequests()) { - if (action.descriptor().groupId() == grpDesc.groupId()) - action.request().destroy(false); - } - } - } - } - } - else - assert false : req; + IgniteCheckedException err = new IgniteCheckedException("Failed to start " + + "cache. " + conflictErr); - if (!needExchange) { - if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId())) - reqsToComplete.add(new T2<>(req, waitTopVer)); - } + if (persistedCfgs) + res.errs.add(err); else - res.needExchange = true; + ctx.cache().completeCacheStartFuture(req, false, err); + + return false; } - if (!F.isEmpty(res.addedDescs)) { - AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer; + SchemaOperationException err = QueryUtils.checkQueryEntityConflicts( + req.startCacheConfiguration(), registeredCaches.values()); - for (DynamicCacheDescriptor desc : res.addedDescs) { - assert desc.template() || res.needExchange; + if (err != null) { + if (persistedCfgs) + res.errs.add(err); + else + ctx.cache().completeCacheStartFuture(req, false, err); - desc.startTopologyVersion(startTopVer); - } + return false; } - if (!F.isEmpty(reqsToComplete)) { - ctx.closure().callLocalSafe(new Callable<Void>() { - @Override public Void call() throws Exception { - for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t : reqsToComplete) { - final DynamicCacheChangeRequest req = t.get1(); - AffinityTopologyVersion waitTopVer = t.get2(); + CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration(); - IgniteInternalFuture<?> fut = waitTopVer != null ? - ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null; + assert req.cacheType() != null : req; + assert F.eq(ccfg.getName(), cacheName) : req; - if (fut == null || fut.isDone()) - ctx.cache().completeCacheStartFuture(req, false, null); - else { - fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - ctx.cache().completeCacheStartFuture(req, false, null); - } - }); - } - } + int cacheId = CU.cacheId(cacheName); - return null; - } - }); + CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions, + topVer, + ccfg, + cacheId, + req.initiatingNodeId(), + req.deploymentId(), + req.encryptionKey()); + + DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx, + ccfg, + req.cacheType(), + grpDesc, + false, + req.initiatingNodeId(), + false, + req.sql(), + req.deploymentId(), + req.schema()); + + DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc); + + restartingCaches.remove(ccfg.getName()); + + assert old == null; + + ctx.discovery().setCacheFilter( + startDesc.cacheId(), + grpDesc.groupId(), + ccfg.getName(), + ccfg.getNearConfiguration() != null); + + if (!persistedCfgs) { + ctx.discovery().addClientNode(cacheName, + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); } - return res; + res.addedDescs.add(startDesc); + + exchangeActions.addCacheToStart(req, startDesc); + + return true; } /** @@ -779,7 +884,7 @@ class ClusterCachesInfo { * @return Collection of currently restarting caches. */ Collection<String> restartingCaches() { - return restartingCaches; + return restartingCaches.keySet(); } /** @@ -987,7 +1092,7 @@ class ClusterCachesInfo { templates.put(desc.cacheName(), cacheData); } - Collection<String> restarting = new HashSet<>(restartingCaches); + Collection<String> restarting = new HashSet<>(restartingCaches.keySet()); return new CacheNodeCommonDiscoveryData(caches, templates, @@ -1360,7 +1465,8 @@ class ClusterCachesInfo { * @return Exchange action. * @throws IgniteCheckedException If configuration validation failed. */ - public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer, DiscoveryDataClusterState curState) + public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer, + DiscoveryDataClusterState curState) throws IgniteCheckedException { ExchangeActions exchangeActions = new ExchangeActions(); @@ -1600,7 +1706,7 @@ class ClusterCachesInfo { //If conflict was detected we don't merge config and we leave existed config. if (!hasSchemaPatchConflict && !patchesToApply.isEmpty()) - for(Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry: patchesToApply.entrySet()){ + for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) { if (entry.getKey().applySchemaPatch(entry.getValue())) saveCacheConfiguration(entry.getKey()); } @@ -1761,7 +1867,8 @@ class ClusterCachesInfo { Integer cacheId, UUID rcvdFrom, IgniteUuid deploymentId, - @Nullable byte[] encKey) { + @Nullable byte[] encKey + ) { if (startedCacheCfg.getGroupName() != null) { CacheGroupDescriptor desc = cacheGroupByName(startedCacheCfg.getGroupName()); @@ -1814,7 +1921,8 @@ class ClusterCachesInfo { * @param exchActions Optional exchange actions to update if new group was added. * @param startedCacheCfg Started cache configuration. */ - private boolean resolvePersistentFlag(@Nullable ExchangeActions exchActions, CacheConfiguration<?, ?> startedCacheCfg) { + private boolean resolvePersistentFlag(@Nullable ExchangeActions exchActions, + CacheConfiguration<?, ?> startedCacheCfg) { if (!ctx.clientNode()) { // On server, we always can determine whether cache is persistent by local storage configuration. return CU.isPersistentCache(startedCacheCfg, ctx.config().getDataStorageConfiguration()); @@ -1961,6 +2069,7 @@ class ClusterCachesInfo { /** * Returns registered cache descriptors ordered by {@code comparator} + * * @param comparator Comparator (DIRECT, REVERSE or custom) to order cache descriptors. * @return Ordered by comparator cache descriptors. */ @@ -2106,6 +2215,28 @@ class ClusterCachesInfo { } /** + * @param cacheName Cache name. + * @return {@code True} if cache is restarting. + */ + public boolean isRestarting(String cacheName) { + return restartingCaches.containsKey(cacheName); + } + + /** + * @param cacheName Cache name which restart were cancelled. + */ + public void removeRestartingCache(String cacheName) { + restartingCaches.remove(cacheName); + } + + /** + * Clear up information about restarting caches. + */ + public void removeRestartingCaches() { + restartingCaches.clear(); + } + + /** * Holds direct comparator (first system caches) and reverse comparator (first user caches). * Use DIRECT comparator for ordering cache start operations. * Use REVERSE comparator for ordering cache stop operations. http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 5b8a89c..8128230 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache; +import java.io.Serializable; +import java.util.UUID; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridKernalContext; @@ -25,9 +27,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import java.io.Serializable; -import java.util.UUID; - /** * Cache start/stop request. */ @@ -68,6 +67,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** Restart flag. */ private boolean restart; + /** Restart operation id. */ + private IgniteUuid restartId; + /** Cache active on start or not*/ private boolean disabledAfterStart; @@ -265,6 +267,20 @@ public class DynamicCacheChangeRequest implements Serializable { } /** + * @return Id of restart to allow only initiator start the restarting cache. + */ + public IgniteUuid restartId() { + return restartId; + } + + /** + * @param restartId Id of cache restart requester. + */ + public void restartId(IgniteUuid restartId) { + this.restartId = restartId; + } + + /** * @return Cache name. */ public String cacheName() { http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java index 0735a88..ef861b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -49,10 +49,8 @@ import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.internal.AsyncSupportAdapter; -import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.GridKernalState; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteClosure; @@ -1565,12 +1563,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite try { IgniteInternalCache<K, V> cache = context().kernalContext().cache().<K, V>publicJCache(context().name()).internalProxy(); - GridFutureAdapter<Void> fut = proxyImpl.opportunisticRestart(); - - if (fut == null) - proxyImpl.onRestarted(cache.context(), cache.context().cache()); - else - new IgniteFutureImpl<>(fut).get(); + proxyImpl.opportunisticRestart(cache); return gate(); } catch (IgniteCheckedException ice) { @@ -1587,8 +1580,18 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite private CacheOperationGate onEnter() { GridCacheGateway<K, V> gate = checkProxyIsValid(gate(), true); - return new CacheOperationGate(gate, - lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx)); + try { + return new CacheOperationGate(gate, + lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx)); + } + catch (IllegalStateException e) { + boolean isCacheProxy = delegate instanceof IgniteCacheProxyImpl; + + if (isCacheProxy) + ((IgniteCacheProxyImpl) delegate).checkRestart(true); + + throw e; // If we reached this line. + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index a1c403b..f0e6cd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -49,6 +49,7 @@ import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index de1054b..d2304d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -134,8 +134,8 @@ import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; -import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout; import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT; @@ -1894,8 +1894,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana dumpPendingObjects(exchTopVer, diagCtx); - for (CacheGroupContext grp : cctx.cache().cacheGroups()) - grp.preloader().dumpDebugInfo(); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + GridCachePreloader preloader = grp.preloader(); + + if (preloader != null) + preloader.dumpDebugInfo(); + } cctx.affinity().dumpDebugInfo(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3197f9b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 1c44eaf..dadd719 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1120,8 +1120,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Blocks all available gateways */ public void blockGateways() { - for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values()) - proxy.context().gate().onStopped(); + for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) + proxy.context0().gate().onStopped(); } /** {@inheritDoc} */ @@ -1861,7 +1861,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy.onRestarted(cacheCtx, cache); if (cacheCtx.dataStructuresCache()) - ctx.dataStructures().restart(proxy.internalProxy()); + ctx.dataStructures().restart(cache.name(), proxy.internalProxy()); } } } @@ -2662,12 +2662,44 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy.onRestarted(cacheCtx, cacheCtx.cache()); if (cacheCtx.dataStructuresCache()) - ctx.dataStructures().restart(proxy.internalProxy()); + ctx.dataStructures().restart(proxy.getName(), proxy.internalProxy()); } } } /** + * Complete stopping of caches if they were marked as restarting but it failed. + * @return Cache names of proxies which were restarted. + */ + public List<String> resetRestartingProxies() { + List<String> res = new ArrayList<>(); + + for (Map.Entry<String, IgniteCacheProxyImpl<?, ?>> e : jCacheProxies.entrySet()) { + IgniteCacheProxyImpl<?, ?> proxy = e.getValue(); + + if (proxy == null) + continue; + + if (proxy.isRestarting()) { + String cacheName = e.getKey(); + + res.add(cacheName); + + jCacheProxies.remove(cacheName); + + proxy.onRestarted(null, null); + + if (DataStructuresProcessor.isDataStructureCache(cacheName)) + ctx.dataStructures().restart(cacheName, null); + } + } + + cachesInfo.removeRestartingCaches(); + + return res; + } + + /** * @param desc Group descriptor. * @param cacheType Cache type. * @param affNode Affinity node flag. @@ -2751,16 +2783,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { cache.active(false); } - if (proxy != null) { - if (stop) { - if (restart) - proxy.restart(); + if (stop) { + if (restart) { + GridCacheAdapter<?, ?> cache; - proxy.context().gate().stopped(); + if (proxy == null && (cache = caches.get(cacheName)) != null) { + proxy = new IgniteCacheProxyImpl(cache.context(), cache, false); + + IgniteCacheProxyImpl<?, ?> oldProxy = jCacheProxies.putIfAbsent(cacheName, proxy); + + if (oldProxy != null) + proxy = oldProxy; + } + + if (proxy != null) + proxy.suspend(); } - else - proxy.closeProxy(); + + if (proxy != null) + proxy.context0().gate().stopped(); } + else if (proxy != null) + proxy.closeProxy(); } /** @@ -2784,7 +2828,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { proxy = jCacheProxies.get(req.cacheName()); if (proxy != null) - proxy.restart(); + proxy.suspend(); } else { completeProxyInitialize(req.cacheName()); @@ -2793,7 +2837,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } if (proxy != null) - proxy.context().gate().onStopped(); + proxy.context0().gate().onStopped(); } /** @@ -2837,7 +2881,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCacheProxyImpl<?, ?> newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false); if (!cache.active()) - newProxy.restart(); + newProxy.suspend(); addjCacheProxy(cacheCtx.name(), newProxy); } @@ -3708,6 +3752,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sql, failIfExists, failIfNotStarted, + null, false, null, ccfg != null && ccfg.isEncryptionEnabled() ? grpKeys.iterator().next() : null); @@ -3812,15 +3857,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. * @return Future that will be completed when all caches are deployed. */ - public IgniteInternalFuture<Boolean> dynamicStartCaches(Collection<CacheConfiguration> ccfgList, + public IgniteInternalFuture<Boolean> dynamicStartCaches( + Collection<CacheConfiguration> ccfgList, boolean failIfExists, - boolean checkThreadTx, boolean disabledAfterStart) { + boolean checkThreadTx, + boolean disabledAfterStart + ) { return dynamicStartCachesByStoredConf( ccfgList.stream().map(StoredCacheData::new).collect(Collectors.toList()), failIfExists, checkThreadTx, - disabledAfterStart - ); + disabledAfterStart, + null); } /** @@ -3830,13 +3878,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param failIfExists Fail if exists flag. * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. + * @param restartId Restart requester id (it'll allow to start this cache only him). * @return Future that will be completed when all caches are deployed. */ public IgniteInternalFuture<Boolean> dynamicStartCachesByStoredConf( Collection<StoredCacheData> storedCacheDataList, boolean failIfExists, boolean checkThreadTx, - boolean disabledAfterStart) { + boolean disabledAfterStart, + IgniteUuid restartId + ) { if (checkThreadTx) checkEmptyTransactions(); @@ -3857,6 +3908,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.sql(), failIfExists, true, + restartId, disabledAfterStart, ccfg.queryEntities(), ccfg.config().isEncryptionEnabled() ? grpKeysIter.next() : null); @@ -3927,10 +3979,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { * command. * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @param restart Restart flag. + * @param restartId Restart requester id (it'll allow to start this cache only him). * @return Future that will be completed when cache is destroyed. */ - public IgniteInternalFuture<Boolean> dynamicDestroyCache(String cacheName, boolean sql, boolean checkThreadTx, - boolean restart) { + public IgniteInternalFuture<Boolean> dynamicDestroyCache( + String cacheName, + boolean sql, + boolean checkThreadTx, + boolean restart, + IgniteUuid restartId + ) { assert cacheName != null; if (checkThreadTx) @@ -3941,6 +3999,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.stop(true); req.destroy(true); req.restart(restart); + req.restartId(restartId); return F.first(initiateCacheChanges(F.asList(req))); } @@ -3948,30 +4007,30 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * @param cacheNames Collection of cache names to destroy. * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. - * @param restart Restart flag. * @return Future that will be completed when cache is destroyed. */ - public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx, - boolean restart) { - return dynamicDestroyCaches(cacheNames, checkThreadTx, restart, true); + public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx) { + return dynamicDestroyCaches(cacheNames, checkThreadTx, true); } /** * @param cacheNames Collection of cache names to destroy. * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. - * @param restart Restart flag. * @param destroy Cache data destroy flag. Setting to <code>true</code> will cause removing all cache data * @return Future that will be completed when cache is destroyed. */ - public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx, - boolean restart, boolean destroy) { + public IgniteInternalFuture<?> dynamicDestroyCaches( + Collection<String> cacheNames, + boolean checkThreadTx, + boolean destroy + ) { if (checkThreadTx) checkEmptyTransactions(); List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size()); for (String cacheName : cacheNames) { - reqs.add(createStopRequest(cacheName, restart, destroy)); + reqs.add(createStopRequest(cacheName, false, null, destroy)); } return dynamicChangeCaches(reqs); @@ -3982,15 +4041,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param cacheName Cache names to destroy. * @param restart Restart flag. + * @param restartId Restart requester id (it'll allow to start this cache only him). * @param destroy Cache data destroy flag. Setting to {@code true} will cause removing all cache data from store. * @return Future that will be completed when cache is destroyed. */ - @NotNull public DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, boolean destroy) { + @NotNull public DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) { DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false, true); req.stop(true); req.destroy(destroy); req.restart(restart); + req.restartId(restartId); return req; } @@ -4053,7 +4114,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { checkEmptyTransactions(); if (proxy.context().isLocal()) - return dynamicDestroyCache(cacheName, false, true, false); + return dynamicDestroyCache(cacheName, false, true, false, null); return startClientCacheChange(null, Collections.singleton(cacheName)); } @@ -4375,10 +4436,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Reset restarting caches. + * @param cacheName Cache to check. + * @return Cache is under restarting. */ - public void resetRestartingCaches() { - cachesInfo.restartingCaches().clear(); + public boolean isCacheRestarting(String cacheName) { + return cachesInfo.isRestarting(cacheName); } /** @@ -4790,8 +4852,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc = cacheDescriptor(name); - if (desc == null) + if (desc == null) { + if (cachesInfo.isRestarting(name)) { + IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(name); + + assert proxy != null: name; + + proxy.internalProxy(); //should throw exception + + // we have procceed, try again + return cacheConfiguration(name); + } + throw new IllegalStateException("Cache doesn't exist: " + name); + } else return desc.cacheConfiguration(); } @@ -5264,6 +5338,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param sql Whether the cache needs to be created as the result of SQL {@code CREATE TABLE} command. * @param failIfExists Fail if exists flag. * @param failIfNotStarted If {@code true} fails if cache is not started. + * @param restartId Restart requester id (it'll allow to start this cache only him). * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. * @param qryEntities Query entities. * @param encKey Encryption key. @@ -5279,6 +5354,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean sql, boolean failIfExists, boolean failIfNotStarted, + IgniteUuid restartId, boolean disabledAfterStart, @Nullable Collection<QueryEntity> qryEntities, @Nullable byte[] encKey @@ -5295,6 +5371,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.encryptionKey(encKey); + req.restartId(restartId); + if (ccfg != null) { cloneCheckSerializable(ccfg); @@ -5340,7 +5418,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { initialize(cfg, cacheObjCtx); if (cachesInfo.restartingCaches().contains(req.cacheName())) - req.schema(new QuerySchema(qryEntities)); + req.schema(new QuerySchema(qryEntities == null? cfg.getQueryEntities() : qryEntities)); else req.schema(new QuerySchema(qryEntities != null ? QueryUtils.normalizeQueryEntities(qryEntities, cfg) : cfg.getQueryEntities()));
