Repository: ignite Updated Branches: refs/heads/master 90c0af887 -> 4ee181ce4
IGNITE-7869 Dynamic start cache by stored cache data Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ee181ce Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ee181ce Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ee181ce Branch: refs/heads/master Commit: 4ee181ce42972cb5197d780a79e0a96c26fe77e4 Parents: 90c0af8 Author: Anton Kalashnikov <[email protected]> Authored: Mon Mar 12 12:53:36 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Mon Mar 12 12:53:36 2018 +0300 ---------------------------------------------------------------------- .../cache/DynamicCacheDescriptor.java | 17 ++- .../processors/cache/GridCacheProcessor.java | 139 +++++++++---------- .../processors/cache/StoredCacheData.java | 9 +- 3 files changed, 86 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee181ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 4771c3e..cad8414 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -284,7 +284,6 @@ public class DynamicCacheDescriptor { this.rcvdFromVer = rcvdFromVer; } - /** * @return Start topology version or {@code null} if cache configured statically. */ @@ -346,6 +345,22 @@ public class DynamicCacheDescriptor { } } + /** + * Form a {@link StoredCacheData} with all data to correctly restore cache params when its configuration is read + * from page store. Essentially, this method takes from {@link DynamicCacheDescriptor} all that's needed to start + * cache correctly, leaving out everything else. + */ + public StoredCacheData toStoredData() { + assert schema != null; + + StoredCacheData res = new StoredCacheData(cacheConfiguration()); + + res.queryEntities(schema().entities()); + res.sql(sql()); + + return res; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName())); http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee181ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 4648f25..eec964a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import javax.management.MBeanServer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -99,8 +100,8 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaS import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; -import org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager; import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; @@ -508,7 +509,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { assertParameter(cc.getTransactionManagerLookupClassName() == null, "transaction manager can not be used with ATOMIC cache"); - if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() != null)&& !cc.isOnheapCacheEnabled()) + if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() != null) && !cc.isOnheapCacheEnabled()) throw new IgniteCheckedException("Onheap cache must be enabled if eviction policy is configured [cacheName=" + U.maskName(cc.getName()) + "]"); @@ -1167,12 +1168,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isInfoEnabled()) { log.info("Started cache [name=" + cfg.getName() + - ", id="+cacheCtx.cacheId() + + ", id=" + cacheCtx.cacheId() + (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") + ", memoryPolicyName=" + memPlcName + ", mode=" + cfg.getCacheMode() + ", atomicity=" + cfg.getAtomicityMode() + - ", backups=" + cfg.getBackups() +']'); + ", backups=" + cfg.getBackups() + ']'); } } @@ -1189,9 +1190,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!cache.isNear() && ctx.shared().wal() != null) { try { ctx.shared().wal().fsync(null); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { U.error(log, "Failed to flush write-ahead log on cache stop " + - "[cache=" + ctx.name() + "]", e); + "[cache=" + ctx.name() + "]", e); } } @@ -1343,8 +1345,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cacheObjCtx Cache object context. * @param affNode {@code True} if local node affinity node. * @param updatesAllowed Updates allowed flag. - * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will change - * state of proxies to restarting + * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will + * change state of proxies to restarting * @return Cache context. * @throws IgniteCheckedException If failed to create cache. */ @@ -1845,10 +1847,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.setNearConfiguration(reqNearCfg); } - StoredCacheData cacheData = toStoredData(desc); - if (sharedCtx.pageStore() != null && affNode) - sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), cacheData); + sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), desc.toStoredData()); String grpName = startCfg.getGroupName(); @@ -1923,7 +1923,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (proxy == null) continue; - GridCacheContext<?, ?> cacheCtx = sharedCtx.cacheContext(CU.cacheId(proxy.getName())); + GridCacheContext<?, ?> cacheCtx = sharedCtx.cacheContext(CU.cacheId(proxy.getName())); if (cacheCtx == null) continue; @@ -2091,7 +2091,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter<?, ?> cache : caches.values()) { GridCacheContext<?, ?> cacheCtx = cache.context(); - if (cacheCtx.startTopologyVersion().equals(startTopVer) ) { + if (cacheCtx.startTopologyVersion().equals(startTopVer)) { if (!jCacheProxies.containsKey(cacheCtx.name())) { IgniteCacheProxyImpl newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false); @@ -2444,8 +2444,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param msg Message. * @param topVer Current topology version. * @param curState Current cluster state. - * @throws IgniteCheckedException If configuration validation failed. * @return Exchange actions. + * @throws IgniteCheckedException If configuration validation failed. */ public ExchangeActions onStateChangeRequest( ChangeGlobalStateMessage msg, @@ -2595,8 +2595,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(); CacheConfiguration ccfg = F.isEmpty(templateName) - ? getOrCreateConfigFromTemplate(cacheName) - : getOrCreateConfigFromTemplate(templateName); + ? getOrCreateConfigFromTemplate(cacheName) + : getOrCreateConfigFromTemplate(templateName); ccfg.setName(cacheName); @@ -2785,7 +2785,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { sql, failIfExists, failIfNotStarted, - false); + false, + null); if (req != null) { if (req.clientStartOnly()) @@ -2837,26 +2838,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ public IgniteInternalFuture<?> dynamicStartCaches(Collection<CacheConfiguration> ccfgList, boolean failIfExists, boolean checkThreadTx, boolean disabledAfterStart) { - return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx, disabledAfterStart); + return dynamicStartCachesByStoredConf( + ccfgList.stream().map(StoredCacheData::new).collect(Collectors.toList()), + failIfExists, + checkThreadTx, + disabledAfterStart + ); } /** * Dynamically starts multiple caches. * - * @param ccfgList Collection of cache configuration. - * @param cacheType Cache type. + * @param storedCacheDataList Collection of stored cache data. * @param failIfExists Fail if exists flag. * @param checkThreadTx If {@code true} checks that current thread does not have active transactions. * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. * @return Future that will be completed when all caches are deployed. */ - private IgniteInternalFuture<?> dynamicStartCaches( - Collection<CacheConfiguration> ccfgList, - CacheType cacheType, + public IgniteInternalFuture<?> dynamicStartCachesByStoredConf( + Collection<StoredCacheData> storedCacheDataList, boolean failIfExists, boolean checkThreadTx, - boolean disabledAfterStart - ) { + boolean disabledAfterStart) { if (checkThreadTx) checkEmptyTransactions(); @@ -2864,40 +2867,28 @@ public class GridCacheProcessor extends GridProcessorAdapter { Map<String, DynamicCacheChangeRequest> clientReqs = null; try { - for (CacheConfiguration ccfg : ccfgList) { - CacheType ct = cacheType; - - if (ct == null) { - if (CU.isUtilityCache(ccfg.getName())) - ct = CacheType.UTILITY; - else if (internalCaches.contains(ccfg.getName())) - ct = CacheType.INTERNAL; - else if (DataStructuresProcessor.isDataStructureCache(ccfg.getName())) - ct = CacheType.DATA_STRUCTURES; - else - ct = CacheType.USER; - } - + for (StoredCacheData ccfg : storedCacheDataList) { DynamicCacheChangeRequest req = prepareCacheChangeRequest( - ccfg, - ccfg.getName(), + ccfg.config(), + ccfg.config().getName(), null, - ct, - false, + resolveCacheType(ccfg.config()), + ccfg.sql(), failIfExists, true, - disabledAfterStart); + disabledAfterStart, + ccfg.queryEntities()); if (req != null) { if (req.clientStartOnly()) { if (clientReqs == null) - clientReqs = U.newLinkedHashMap(ccfgList.size()); + clientReqs = U.newLinkedHashMap(storedCacheDataList.size()); clientReqs.put(req.cacheName(), req); } else { if (srvReqs == null) - srvReqs = new ArrayList<>(ccfgList.size()); + srvReqs = new ArrayList<>(storedCacheDataList.size()); srvReqs.add(req); } @@ -2931,6 +2922,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(); } + /** Resolve cache type for input cacheType */ + @NotNull private CacheType resolveCacheType(CacheConfiguration ccfg) { + if (CU.isUtilityCache(ccfg.getName())) + return CacheType.UTILITY; + else if (internalCaches.contains(ccfg.getName())) + return CacheType.INTERNAL; + else if (DataStructuresProcessor.isDataStructureCache(ccfg.getName())) + return CacheType.DATA_STRUCTURES; + else + return CacheType.USER; + } + /** * @param cacheName Cache name to destroy. * @param sql If the cache needs to be destroyed only if it was created as the result of SQL {@code CREATE TABLE} @@ -2989,6 +2992,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Prepares cache stop request. + * * @param cacheName Cache names to destroy. * @param restart Restart flag. * @param destroy Cache data destroy flag. Setting to {@code true} will cause removing all cache data from store. @@ -3006,6 +3010,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Starts cache stop request as cache change batch. + * * @param reqs cache stop requests. * @return compound future. */ @@ -3108,7 +3113,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cacheName Cache name. * @return Cache type. */ - public CacheType cacheType(String cacheName ) { + public CacheType cacheType(String cacheName) { if (CU.isUtilityCache(cacheName)) return CacheType.UTILITY; else if (internalCaches.contains(cacheName)) @@ -3128,30 +3133,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert desc != null; if (sharedCtx.pageStore() != null && !sharedCtx.kernalContext().clientNode() && - CU.isPersistentCache(desc.cacheConfiguration(), sharedCtx.gridConfig().getDataStorageConfiguration())) { - StoredCacheData data = toStoredData(desc); - - sharedCtx.pageStore().storeCacheData(data, true); - } - } - - /** - * Form a {@link StoredCacheData} with all data to correctly restore cache params when its configuration - * is read from page store. Essentially, this method takes from {@link DynamicCacheDescriptor} all that's - * needed to start cache correctly, leaving out everything else. - * - * @param desc Cache descriptor to process. - * @return {@link StoredCacheData} based on {@code desc}. - */ - private static StoredCacheData toStoredData(DynamicCacheDescriptor desc) { - A.notNull(desc, "desc"); - - StoredCacheData res = new StoredCacheData(desc.cacheConfiguration()); - - res.queryEntities(desc.schema() == null ? Collections.<QueryEntity>emptyList() : desc.schema().entities()); - res.sql(desc.sql()); - - return res; + CU.isPersistentCache(desc.cacheConfiguration(), sharedCtx.gridConfig().getDataStorageConfiguration())) + sharedCtx.pageStore().storeCacheData(desc.toStoredData(), true); } /** @@ -3355,7 +3338,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Reset restarting caches. */ - public void resetRestartingCaches(){ + public void resetRestartingCaches() { cachesInfo.restartingCaches().clear(); } @@ -3677,7 +3660,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cache = jCacheProxies.get(cacheName); } - return cache != null ? (IgniteCacheProxy<K, V>) cache.gatewayWrapper() : null; + return cache != null ? (IgniteCacheProxy<K, V>)cache.gatewayWrapper() : null; } /** @@ -3955,7 +3938,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { @SuppressWarnings("unchecked") private void registerMbean(Object obj, @Nullable String cacheName, boolean near) throws IgniteCheckedException { - if(U.IGNITE_MBEANS_DISABLED) + if (U.IGNITE_MBEANS_DISABLED) return; assert obj != null; @@ -3993,7 +3976,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param near Near flag. */ private void unregisterMbean(Object o, @Nullable String cacheName, boolean near) { - if(U.IGNITE_MBEANS_DISABLED) + if (U.IGNITE_MBEANS_DISABLED) return; assert o != null; @@ -4141,6 +4124,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param failIfExists Fail if exists flag. * @param failIfNotStarted If {@code true} fails if cache is not started. * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. + * @param qryEntities Query entities. * @return Request or {@code null} if cache already exists. * @throws IgniteCheckedException if some of pre-checks failed * @throws CacheExistsException if cache exists and failIfExists flag is {@code true} @@ -4153,7 +4137,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean sql, boolean failIfExists, boolean failIfNotStarted, - boolean disabledAfterStart + boolean disabledAfterStart, + @Nullable Collection<QueryEntity> qryEntities ) throws IgniteCheckedException { DynamicCacheDescriptor desc = cacheDescriptor(cacheName); @@ -4208,7 +4193,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { initialize(cfg, cacheObjCtx); req.startCacheConfiguration(cfg); - req.schema(new QuerySchema(cfg.getQueryEntities())); + req.schema(new QuerySchema(qryEntities != null ? qryEntities : cfg.getQueryEntities())); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee181ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java index 5a88036..26b4f9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java @@ -39,7 +39,7 @@ public class StoredCacheData implements Serializable { /** Cache configuration. */ @GridToStringInclude - private final CacheConfiguration<?, ?> ccfg; + private CacheConfiguration<?, ?> ccfg; /** Query entities. */ @GridToStringInclude @@ -61,6 +61,13 @@ public class StoredCacheData implements Serializable { } /** + * @param ccfg Cache configuration. + */ + public void config(CacheConfiguration<?, ?> ccfg) { + this.ccfg = ccfg; + } + + /** * @return Cache configuration. */ public CacheConfiguration<?, ?> config() {
