Repository: ignite
Updated Branches:
  refs/heads/master 90c0af887 -> 4ee181ce4


IGNITE-7869 Dynamic start cache by stored cache data


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ee181ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ee181ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ee181ce

Branch: refs/heads/master
Commit: 4ee181ce42972cb5197d780a79e0a96c26fe77e4
Parents: 90c0af8
Author: Anton Kalashnikov <[email protected]>
Authored: Mon Mar 12 12:53:36 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Mon Mar 12 12:53:36 2018 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheDescriptor.java           |  17 ++-
 .../processors/cache/GridCacheProcessor.java    | 139 +++++++++----------
 .../processors/cache/StoredCacheData.java       |   9 +-
 3 files changed, 86 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee181ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 4771c3e..cad8414 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -284,7 +284,6 @@ public class DynamicCacheDescriptor {
         this.rcvdFromVer = rcvdFromVer;
     }
 
-
     /**
      * @return Start topology version or {@code null} if cache configured 
statically.
      */
@@ -346,6 +345,22 @@ public class DynamicCacheDescriptor {
         }
     }
 
+    /**
+     * Form a {@link StoredCacheData} with all data to correctly restore cache 
params when its configuration is read
+     * from page store. Essentially, this method takes from {@link 
DynamicCacheDescriptor} all that's needed to start
+     * cache correctly, leaving out everything else.
+     */
+    public StoredCacheData toStoredData() {
+        assert schema != null;
+
+        StoredCacheData res = new StoredCacheData(cacheConfiguration());
+
+        res.queryEntities(schema().entities());
+        res.sql(sql());
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheDescriptor.class, this, "cacheName", 
U.maskName(cacheCfg.getName()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee181ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4648f25..eec964a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -34,6 +34,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
 import javax.management.MBeanServer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -99,8 +100,8 @@ import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaS
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FsyncModeFileWriteAheadLogManager;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheDistributedQueryManager;
 import 
org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
@@ -508,7 +509,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             assertParameter(cc.getTransactionManagerLookupClassName() == null,
                 "transaction manager can not be used with ATOMIC cache");
 
-        if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() 
!= null)&& !cc.isOnheapCacheEnabled())
+        if ((cc.getEvictionPolicyFactory() != null || cc.getEvictionPolicy() 
!= null) && !cc.isOnheapCacheEnabled())
             throw new IgniteCheckedException("Onheap cache must be enabled if 
eviction policy is configured [cacheName="
                 + U.maskName(cc.getName()) + "]");
 
@@ -1167,12 +1168,12 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         if (log.isInfoEnabled()) {
             log.info("Started cache [name=" + cfg.getName() +
-                ", id="+cacheCtx.cacheId() +
+                ", id=" + cacheCtx.cacheId() +
                 (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() 
: "") +
                 ", memoryPolicyName=" + memPlcName +
                 ", mode=" + cfg.getCacheMode() +
                 ", atomicity=" + cfg.getAtomicityMode() +
-                ", backups=" + cfg.getBackups() +']');
+                ", backups=" + cfg.getBackups() + ']');
         }
     }
 
@@ -1189,9 +1190,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             if (!cache.isNear() && ctx.shared().wal() != null) {
                 try {
                     ctx.shared().wal().fsync(null);
-                } catch (IgniteCheckedException e) {
+                }
+                catch (IgniteCheckedException e) {
                     U.error(log, "Failed to flush write-ahead log on cache 
stop " +
-                            "[cache=" + ctx.name() + "]", e);
+                        "[cache=" + ctx.name() + "]", e);
                 }
             }
 
@@ -1343,8 +1345,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param cacheObjCtx Cache object context.
      * @param affNode {@code True} if local node affinity node.
      * @param updatesAllowed Updates allowed flag.
-     * @param disabledAfterStart If true, then we will discard restarting 
state from proxies. If false then we will change
-     *  state of proxies to restarting
+     * @param disabledAfterStart If true, then we will discard restarting 
state from proxies. If false then we will
+     * change state of proxies to restarting
      * @return Cache context.
      * @throws IgniteCheckedException If failed to create cache.
      */
@@ -1845,10 +1847,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             ccfg.setNearConfiguration(reqNearCfg);
         }
 
-        StoredCacheData cacheData = toStoredData(desc);
-
         if (sharedCtx.pageStore() != null && affNode)
-            sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), 
cacheData);
+            sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), 
desc.toStoredData());
 
         String grpName = startCfg.getGroupName();
 
@@ -1923,7 +1923,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             if (proxy == null)
                 continue;
 
-             GridCacheContext<?, ?> cacheCtx = 
sharedCtx.cacheContext(CU.cacheId(proxy.getName()));
+            GridCacheContext<?, ?> cacheCtx = 
sharedCtx.cacheContext(CU.cacheId(proxy.getName()));
 
             if (cacheCtx == null)
                 continue;
@@ -2091,7 +2091,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         for (GridCacheAdapter<?, ?> cache : caches.values()) {
             GridCacheContext<?, ?> cacheCtx = cache.context();
 
-            if (cacheCtx.startTopologyVersion().equals(startTopVer) ) {
+            if (cacheCtx.startTopologyVersion().equals(startTopVer)) {
                 if (!jCacheProxies.containsKey(cacheCtx.name())) {
                     IgniteCacheProxyImpl newProxy = new 
IgniteCacheProxyImpl(cache.context(), cache, false);
 
@@ -2444,8 +2444,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param msg Message.
      * @param topVer Current topology version.
      * @param curState Current cluster state.
-     * @throws IgniteCheckedException If configuration validation failed.
      * @return Exchange actions.
+     * @throws IgniteCheckedException If configuration validation failed.
      */
     public ExchangeActions onStateChangeRequest(
         ChangeGlobalStateMessage msg,
@@ -2595,8 +2595,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 return new GridFinishedFuture<>();
 
             CacheConfiguration ccfg = F.isEmpty(templateName)
-                ?  getOrCreateConfigFromTemplate(cacheName)
-                :  getOrCreateConfigFromTemplate(templateName);
+                ? getOrCreateConfigFromTemplate(cacheName)
+                : getOrCreateConfigFromTemplate(templateName);
 
             ccfg.setName(cacheName);
 
@@ -2785,7 +2785,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 sql,
                 failIfExists,
                 failIfNotStarted,
-                false);
+                false,
+                null);
 
             if (req != null) {
                 if (req.clientStartOnly())
@@ -2837,26 +2838,28 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      */
     public IgniteInternalFuture<?> 
dynamicStartCaches(Collection<CacheConfiguration> ccfgList, boolean 
failIfExists,
         boolean checkThreadTx, boolean disabledAfterStart) {
-        return dynamicStartCaches(ccfgList, null, failIfExists, checkThreadTx, 
disabledAfterStart);
+        return dynamicStartCachesByStoredConf(
+            
ccfgList.stream().map(StoredCacheData::new).collect(Collectors.toList()),
+            failIfExists,
+            checkThreadTx,
+            disabledAfterStart
+        );
     }
 
     /**
      * Dynamically starts multiple caches.
      *
-     * @param ccfgList Collection of cache configuration.
-     * @param cacheType Cache type.
+     * @param storedCacheDataList Collection of stored cache data.
      * @param failIfExists Fail if exists flag.
      * @param checkThreadTx If {@code true} checks that current thread does 
not have active transactions.
      * @param disabledAfterStart If true, cache proxies will be only activated 
after {@link #restartProxies()}.
      * @return Future that will be completed when all caches are deployed.
      */
-    private IgniteInternalFuture<?> dynamicStartCaches(
-        Collection<CacheConfiguration> ccfgList,
-        CacheType cacheType,
+    public IgniteInternalFuture<?> dynamicStartCachesByStoredConf(
+        Collection<StoredCacheData> storedCacheDataList,
         boolean failIfExists,
         boolean checkThreadTx,
-        boolean disabledAfterStart
-    ) {
+        boolean disabledAfterStart) {
         if (checkThreadTx)
             checkEmptyTransactions();
 
@@ -2864,40 +2867,28 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         Map<String, DynamicCacheChangeRequest> clientReqs = null;
 
         try {
-            for (CacheConfiguration ccfg : ccfgList) {
-                CacheType ct = cacheType;
-
-                if (ct == null) {
-                    if (CU.isUtilityCache(ccfg.getName()))
-                        ct = CacheType.UTILITY;
-                    else if (internalCaches.contains(ccfg.getName()))
-                        ct = CacheType.INTERNAL;
-                    else if 
(DataStructuresProcessor.isDataStructureCache(ccfg.getName()))
-                        ct = CacheType.DATA_STRUCTURES;
-                    else
-                        ct = CacheType.USER;
-                }
-
+            for (StoredCacheData ccfg : storedCacheDataList) {
                 DynamicCacheChangeRequest req = prepareCacheChangeRequest(
-                    ccfg,
-                    ccfg.getName(),
+                    ccfg.config(),
+                    ccfg.config().getName(),
                     null,
-                    ct,
-                    false,
+                    resolveCacheType(ccfg.config()),
+                    ccfg.sql(),
                     failIfExists,
                     true,
-                    disabledAfterStart);
+                    disabledAfterStart,
+                    ccfg.queryEntities());
 
                 if (req != null) {
                     if (req.clientStartOnly()) {
                         if (clientReqs == null)
-                            clientReqs = U.newLinkedHashMap(ccfgList.size());
+                            clientReqs = 
U.newLinkedHashMap(storedCacheDataList.size());
 
                         clientReqs.put(req.cacheName(), req);
                     }
                     else {
                         if (srvReqs == null)
-                            srvReqs = new ArrayList<>(ccfgList.size());
+                            srvReqs = new 
ArrayList<>(storedCacheDataList.size());
 
                         srvReqs.add(req);
                     }
@@ -2931,6 +2922,18 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             return new GridFinishedFuture<>();
     }
 
+    /** Resolve cache type for input cacheType */
+    @NotNull private CacheType resolveCacheType(CacheConfiguration ccfg) {
+        if (CU.isUtilityCache(ccfg.getName()))
+            return CacheType.UTILITY;
+        else if (internalCaches.contains(ccfg.getName()))
+            return CacheType.INTERNAL;
+        else if (DataStructuresProcessor.isDataStructureCache(ccfg.getName()))
+            return CacheType.DATA_STRUCTURES;
+        else
+            return CacheType.USER;
+    }
+
     /**
      * @param cacheName Cache name to destroy.
      * @param sql If the cache needs to be destroyed only if it was created as 
the result of SQL {@code CREATE TABLE}
@@ -2989,6 +2992,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
     /**
      * Prepares cache stop request.
+     *
      * @param cacheName Cache names to destroy.
      * @param restart Restart flag.
      * @param destroy Cache data destroy flag. Setting to {@code true} will 
cause removing all cache data from store.
@@ -3006,6 +3010,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
     /**
      * Starts cache stop request as cache change batch.
+     *
      * @param reqs cache stop requests.
      * @return compound future.
      */
@@ -3108,7 +3113,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param cacheName Cache name.
      * @return Cache type.
      */
-    public CacheType cacheType(String cacheName ) {
+    public CacheType cacheType(String cacheName) {
         if (CU.isUtilityCache(cacheName))
             return CacheType.UTILITY;
         else if (internalCaches.contains(cacheName))
@@ -3128,30 +3133,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         assert desc != null;
 
         if (sharedCtx.pageStore() != null && 
!sharedCtx.kernalContext().clientNode() &&
-            CU.isPersistentCache(desc.cacheConfiguration(), 
sharedCtx.gridConfig().getDataStorageConfiguration())) {
-            StoredCacheData data = toStoredData(desc);
-
-            sharedCtx.pageStore().storeCacheData(data, true);
-        }
-    }
-
-    /**
-     * Form a {@link StoredCacheData} with all data to correctly restore cache 
params when its configuration
-     * is read from page store. Essentially, this method takes from {@link 
DynamicCacheDescriptor} all that's
-     * needed to start cache correctly, leaving out everything else.
-     *
-     * @param desc Cache descriptor to process.
-     * @return {@link StoredCacheData} based on {@code desc}.
-     */
-    private static StoredCacheData toStoredData(DynamicCacheDescriptor desc) {
-        A.notNull(desc, "desc");
-
-        StoredCacheData res = new StoredCacheData(desc.cacheConfiguration());
-
-        res.queryEntities(desc.schema() == null ? 
Collections.<QueryEntity>emptyList() : desc.schema().entities());
-        res.sql(desc.sql());
-
-        return res;
+            CU.isPersistentCache(desc.cacheConfiguration(), 
sharedCtx.gridConfig().getDataStorageConfiguration()))
+            sharedCtx.pageStore().storeCacheData(desc.toStoredData(), true);
     }
 
     /**
@@ -3355,7 +3338,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /**
      * Reset restarting caches.
      */
-    public void resetRestartingCaches(){
+    public void resetRestartingCaches() {
         cachesInfo.restartingCaches().clear();
     }
 
@@ -3677,7 +3660,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             cache = jCacheProxies.get(cacheName);
         }
 
-        return cache != null ? (IgniteCacheProxy<K, V>) cache.gatewayWrapper() 
: null;
+        return cache != null ? (IgniteCacheProxy<K, V>)cache.gatewayWrapper() 
: null;
     }
 
     /**
@@ -3955,7 +3938,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     @SuppressWarnings("unchecked")
     private void registerMbean(Object obj, @Nullable String cacheName, boolean 
near)
         throws IgniteCheckedException {
-        if(U.IGNITE_MBEANS_DISABLED)
+        if (U.IGNITE_MBEANS_DISABLED)
             return;
 
         assert obj != null;
@@ -3993,7 +3976,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param near Near flag.
      */
     private void unregisterMbean(Object o, @Nullable String cacheName, boolean 
near) {
-        if(U.IGNITE_MBEANS_DISABLED)
+        if (U.IGNITE_MBEANS_DISABLED)
             return;
 
         assert o != null;
@@ -4141,6 +4124,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
      * @param failIfExists Fail if exists flag.
      * @param failIfNotStarted If {@code true} fails if cache is not started.
      * @param disabledAfterStart If true, cache proxies will be only activated 
after {@link #restartProxies()}.
+     * @param qryEntities Query entities.
      * @return Request or {@code null} if cache already exists.
      * @throws IgniteCheckedException if some of pre-checks failed
      * @throws CacheExistsException if cache exists and failIfExists flag is 
{@code true}
@@ -4153,7 +4137,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         boolean sql,
         boolean failIfExists,
         boolean failIfNotStarted,
-        boolean disabledAfterStart
+        boolean disabledAfterStart,
+        @Nullable Collection<QueryEntity> qryEntities
     ) throws IgniteCheckedException {
         DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
 
@@ -4208,7 +4193,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 initialize(cfg, cacheObjCtx);
 
                 req.startCacheConfiguration(cfg);
-                req.schema(new QuerySchema(cfg.getQueryEntities()));
+                req.schema(new QuerySchema(qryEntities != null ? qryEntities : 
cfg.getQueryEntities()));
             }
         }
         else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee181ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
index 5a88036..26b4f9b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
@@ -39,7 +39,7 @@ public class StoredCacheData implements Serializable {
 
     /** Cache configuration. */
     @GridToStringInclude
-    private final CacheConfiguration<?, ?> ccfg;
+    private CacheConfiguration<?, ?> ccfg;
 
     /** Query entities. */
     @GridToStringInclude
@@ -61,6 +61,13 @@ public class StoredCacheData implements Serializable {
     }
 
     /**
+     * @param ccfg Cache configuration.
+     */
+    public void config(CacheConfiguration<?, ?> ccfg) {
+        this.ccfg = ccfg;
+    }
+
+    /**
      * @return Cache configuration.
      */
     public CacheConfiguration<?, ?> config() {

Reply via email to