IGNITE-6173: SQL: Create schema for client caches without their start. This closes #4663.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d19123d4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d19123d4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d19123d4 Branch: refs/heads/ignite-10044 Commit: d19123d46f367daa783674172485ffc438b6b96a Parents: 1502494 Author: Yuriy Gerzhedovich <[email protected]> Authored: Mon Dec 3 13:07:00 2018 +0300 Committer: devozerov <[email protected]> Committed: Mon Dec 3 13:07:00 2018 +0300 ---------------------------------------------------------------------- .../JdbcThinAbstractDmlStatementSelfTest.java | 5 + .../thin/JdbcThinBulkLoadAbstractSelfTest.java | 49 ++- .../apache/ignite/internal/IgniteKernal.java | 10 +- .../cache/CacheAffinitySharedManager.java | 22 +- .../processors/cache/ClusterCachesInfo.java | 4 + .../processors/cache/GridCacheContextInfo.java | 179 +++++++++++ .../processors/cache/GridCacheProcessor.java | 149 ++++----- .../cache/LocalJoinCachesContext.java | 30 +- .../processors/cache/StartCacheInfo.java | 25 ++ .../processors/query/GridQueryIndexing.java | 33 +- .../processors/query/GridQueryProcessor.java | 89 ++++-- .../internal/processors/query/QueryUtils.java | 30 +- .../schema/SchemaIndexCacheVisitorImpl.java | 3 +- ...IgniteClientCacheInitializationFailTest.java | 25 +- .../query/h2/DmlStatementsProcessor.java | 12 +- .../processors/query/h2/H2RowCache.java | 19 +- .../processors/query/h2/H2RowCacheRegistry.java | 46 +-- .../internal/processors/query/h2/H2Schema.java | 6 +- .../processors/query/h2/H2TableDescriptor.java | 35 ++- .../processors/query/h2/H2TableEngine.java | 2 +- .../internal/processors/query/h2/H2Utils.java | 27 ++ .../processors/query/h2/IgniteH2Indexing.java | 102 ++++--- .../query/h2/database/H2TreeClientIndex.java | 114 +++++++ .../query/h2/database/H2TreeIndex.java | 131 +++----- .../query/h2/database/H2TreeIndexBase.java | 53 ++++ .../query/h2/ddl/DdlStatementsProcessor.java | 71 +---- .../processors/query/h2/dml/UpdatePlan.java | 2 +- .../query/h2/dml/UpdatePlanBuilder.java | 89 ++++-- .../query/h2/opt/GridH2RowDescriptor.java | 14 +- .../query/h2/opt/GridH2SystemIndexFactory.java | 10 +- .../processors/query/h2/opt/GridH2Table.java | 41 ++- .../query/h2/sql/GridSqlQueryParser.java | 10 +- .../visor/verify/ValidateIndexesClosure.java | 6 +- ...dCacheDynamicLoadOnClientPersistentTest.java | 44 +++ .../cache/GridCacheDynamicLoadOnClientTest.java | 304 +++++++++++++++++++ .../cache/index/H2DynamicTableSelfTest.java | 22 ++ .../cache/index/SchemaExchangeSelfTest.java | 149 ++++++--- .../IgniteBinaryCacheQueryTestSuite.java | 5 + 38 files changed, 1478 insertions(+), 489 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java index 405f21a..862e841 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAbstractDmlStatementSelfTest.java @@ -57,6 +57,11 @@ public abstract class JdbcThinAbstractDmlStatementSelfTest extends JdbcThinAbstr } /** {@inheritDoc} */ + @Override protected void afterTestsStopped() { + stopAllGrids(); + } + + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { ignite(0).getOrCreateCache(cacheConfig()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java index 2a4c799..8eeac18 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java @@ -17,22 +17,12 @@ package org.apache.ignite.jdbc.thin; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.QueryEntity; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat; -import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvParser; -import org.apache.ignite.internal.processors.query.QueryUtils; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.testframework.GridTestUtils; - import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.CodingErrorAction; import java.nio.charset.UnsupportedCharsetException; import java.sql.BatchUpdateException; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -41,6 +31,17 @@ import java.util.Collection; import java.util.Collections; import java.util.Objects; import java.util.concurrent.Callable; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat; +import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvParser; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -410,6 +411,32 @@ public abstract class JdbcThinBulkLoadAbstractSelfTest extends JdbcThinAbstractD } /** + * Test imports CSV file into a table on not affinity node and checks the created entries using SELECT statement. + * + * @throws SQLException If failed. + */ + public void testBulkLoadToNonAffinityNode() throws Exception { + IgniteEx client = startGrid(getConfiguration("client").setClientMode(true)); + + try (Connection con = connect(client, null)) { + con.setSchema('"' + DEFAULT_CACHE_NAME + '"'); + + try (Statement stmt = con.createStatement()) { + int updatesCnt = stmt.executeUpdate( + "copy from '" + BULKLOAD_UTF8_CSV_FILE + "' into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv"); + + assertEquals(2, updatesCnt); + + checkNationalCacheContents(TBL_NAME); + } + } + + stopGrid(client.name()); + } + + /** * Imports two-entry CSV file with UTF-8 characters into a table using packet size of one byte * (thus splitting each two-byte UTF-8 character into two packets) * and checks the created entries using SELECT statement. http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 4d8a555..f777980 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -3010,7 +3010,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @Override public <K, V> IgniteBiTuple<IgniteCache<K, V>, Boolean> getOrCreateCache0( CacheConfiguration<K, V> cacheCfg, boolean sql) { A.notNull(cacheCfg, "cacheCfg"); - CU.validateNewCacheName(cacheCfg.getName()); + String cacheName = cacheCfg.getName(); + + CU.validateNewCacheName(cacheName); guard(); @@ -3019,18 +3021,18 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { Boolean res = false; - if (ctx.cache().cache(cacheCfg.getName()) == null) { + if (ctx.cache().cache(cacheName) == null) { res = sql ? ctx.cache().dynamicStartSqlCache(cacheCfg).get() : ctx.cache().dynamicStartCache(cacheCfg, - cacheCfg.getName(), + cacheName, null, false, true, true).get(); } - return new IgniteBiTuple<>((IgniteCache<K, V>)ctx.cache().publicJCache(cacheCfg.getName()), res); + return new IgniteBiTuple<>(ctx.cache().publicJCache(cacheName), res); } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index ce87fa9..07fbef1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -361,7 +361,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap void onCacheGroupCreated(CacheGroupContext grp) { if (!grpHolders.containsKey(grp.groupId())) { cctx.io().addCacheGroupHandler(grp.groupId(), GridDhtAffinityAssignmentResponse.class, - (IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>) this::processAffinityAssignmentResponse); + (IgniteBiInClosure<UUID, GridDhtAffinityAssignmentResponse>)this::processAffinityAssignmentResponse); } } @@ -430,10 +430,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap DynamicCacheChangeRequest changeReq = startReqs.get(desc.cacheName()); return new StartCacheInfo( + desc.cacheConfiguration(), desc, changeReq.nearCacheConfiguration(), topVer, - changeReq.disabledAfterStart() + changeReq.disabledAfterStart(), + true ); }) .collect(Collectors.toList()); @@ -869,7 +871,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final ExchangeDiscoveryEvents evts = fut.context().events(); - long time = System.currentTimeMillis(); + long time = U.currentTimeMillis(); Map<StartCacheInfo, DynamicCacheChangeRequest> startCacheInfos = new LinkedHashMap<>(); @@ -920,6 +922,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap req ); } + else + cctx.kernalContext().cache().initQueryStructuresForNotStartedCache(cacheDesc); } Map<StartCacheInfo, IgniteCheckedException> failedCaches = cctx.cache().prepareStartCachesIfPossible(startCacheInfos.keySet()); @@ -951,14 +955,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (log.isInfoEnabled()) - log.info("Caches starting performed in " + (System.currentTimeMillis() - time) + " ms."); + log.info("Caches starting performed in " + (U.currentTimeMillis() - time) + " ms."); - time = System.currentTimeMillis(); + time = U.currentTimeMillis(); initAffinityOnCacheGroupsStart(fut, exchActions, crd); if (log.isInfoEnabled()) - log.info("Affinity initialization for started caches performed in " + (System.currentTimeMillis() - time) + " ms."); + log.info("Affinity initialization for started caches performed in " + (U.currentTimeMillis() - time) + " ms."); } /** @@ -1522,9 +1526,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert F.isEmpty(affReq) || (!F.isEmpty(receivedAff) && receivedAff.size() >= affReq.size()) : ("Requested and received affinity are different " + - "[requestedCnt=" + (affReq != null ? affReq.size() : "none") + - ", receivedCnt=" + (receivedAff != null ? receivedAff.size() : "none") + - ", msg=" + msg + "]"); + "[requestedCnt=" + (affReq != null ? affReq.size() : "none") + + ", receivedCnt=" + (receivedAff != null ? receivedAff.size() : "none") + + ", msg=" + msg + "]"); final Map<Long, ClusterNode> nodesByOrder = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 6718cf3..8ca14a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -1281,6 +1281,7 @@ class ClusterCachesInfo { if (joinDiscoData != null) { List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = new ArrayList<>(); + List<DynamicCacheDescriptor> locJoinInitCaches = new ArrayList<>(); locCfgsForActivation = new HashMap<>(); boolean active = ctx.state().clusterState().active(); @@ -1329,10 +1330,13 @@ class ClusterCachesInfo { else locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg)); } + else + locJoinInitCaches.add(desc); } locJoinCachesCtx = new LocalJoinCachesContext( locJoinStartCaches, + locJoinInitCaches, new HashMap<>(registeredCacheGrps), new HashMap<>(registeredCaches)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java new file mode 100644 index 0000000..e72d183 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContextInfo.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Cache context information. Required to support query infrastructure for not started caches on non affinity nodes. + */ +@GridToStringExclude +public class GridCacheContextInfo<K, V> { + /** Full cache context. Can be {@code null} in case a cache is not started. */ + @Nullable private volatile GridCacheContext gridCacheContext; + + /** Cache is client or not. */ + private final boolean clientCache; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Dynamic cache deployment ID. */ + private final IgniteUuid dynamicDeploymentId; + + /** Cache configuration. */ + private final CacheConfiguration config; + + /** Cache group ID. */ + private final int groupId; + + /** Cache ID. */ + private final int cacheId; + + /** + * Constructor of full cache context. + * + * @param gridCacheContext Cache context. + * @param clientCache Client cache or not. + */ + public GridCacheContextInfo(GridCacheContext<K, V> gridCacheContext, boolean clientCache) { + this.gridCacheContext = gridCacheContext; + this.ctx = gridCacheContext.kernalContext(); + this.config = gridCacheContext.config(); + this.dynamicDeploymentId = gridCacheContext.dynamicDeploymentId(); + this.groupId = gridCacheContext.groupId(); + this.cacheId = gridCacheContext.cacheId(); + this.clientCache = clientCache; + } + + /** + * Constructor of not started cache context. + * + * @param cacheDesc Cache descriptor. + * @param ctx Kernal context. + */ + public GridCacheContextInfo(DynamicCacheDescriptor cacheDesc, GridKernalContext ctx) { + this.config = cacheDesc.cacheConfiguration(); + this.dynamicDeploymentId = cacheDesc.deploymentId(); + this.groupId = cacheDesc.groupId(); + this.ctx = ctx; + this.clientCache = true; + + this.cacheId = CU.cacheId(config.getName()); + + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration config() { + return isCacheContextInited() ? gridCacheContext.config() : config; + } + + /** + * @return Cache name. + */ + public String name() { + return isCacheContextInited() ? gridCacheContext.name() : config.getName(); + } + + /** + * @return {@code true} in case cache use custom affinity mapper. + */ + public boolean customAffinityMapper() { + return isCacheContextInited() && gridCacheContext.customAffinityMapper(); + } + + /** + * @return Cache group id. + */ + public int groupId() { + return isCacheContextInited() ? gridCacheContext.groupId() : groupId; + } + + /** + * @return Cache id. + */ + public int cacheId() { + return isCacheContextInited() ? gridCacheContext.cacheId() : cacheId; + } + + /** + * @return {@code true} in case affinity node. + */ + public boolean affinityNode() { + return isCacheContextInited() && gridCacheContext.affinityNode(); + } + + /** + * @return Cache context. {@code null} for not started cache. + */ + @Nullable public GridCacheContext gridCacheContext() { + return gridCacheContext; + } + + /** + * @return Dynamic deployment ID. + */ + public IgniteUuid dynamicDeploymentId() { + return dynamicDeploymentId; + } + + /** + * Set real cache context in case cache has been fully initted and start. + * + * @param gridCacheCtx Initted cache context. + */ + public void initCacheContext(GridCacheContext<?, ?> gridCacheCtx) { + assert this.gridCacheContext == null : this.gridCacheContext; + assert gridCacheCtx != null; + + this.gridCacheContext = gridCacheCtx; + } + + /** + * @return {@code true} For client cache. + */ + public boolean isClientCache() { + return clientCache; + } + + /** + * @return Kernal context. + */ + public GridKernalContext context() { + return ctx; + } + + /** + * @return {@code true} If Cache context is initted. + */ + public boolean isCacheContextInited() { + return gridCacheContext != null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "GridCacheContextInfo: " + name() + " " + (isCacheContextInited() ? "started" : "not started"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 8a54852..0870d1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -756,10 +756,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { addCacheOnJoinFromConfig(caches, templates); CacheJoinNodeDiscoveryData discoData = new CacheJoinNodeDiscoveryData( - IgniteUuid.randomUuid(), - caches, - templates, - startAllCachesOnClientStart() + IgniteUuid.randomUuid(), + caches, + templates, + startAllCachesOnClientStart() ); cachesInfo.onStart(discoData); @@ -1250,16 +1250,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cache.context().userCache()) { // Re-create cache structures inside indexing in order to apply recent schema changes. - GridCacheContext cctx = cache.context(); + GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cache.context(), false); - DynamicCacheDescriptor desc = cacheDescriptor(cctx.name()); + DynamicCacheDescriptor desc = cacheDescriptor(cacheInfo.name()); - assert desc != null : cctx.name(); + assert desc != null : cacheInfo.name(); boolean rmvIdx = !cache.context().group().persistenceEnabled(); - ctx.query().onCacheStop0(cctx, rmvIdx); - ctx.query().onCacheStart0(cctx, desc.schema(), desc.sql()); + ctx.query().onCacheStop0(cacheInfo, rmvIdx); + ctx.query().onCacheStart0(cacheInfo, desc.schema(), desc.sql()); } } } @@ -1285,6 +1285,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * Initialize query infrastructure for not started cache. + * + * @param cacheDesc Cache descriptor. + * @throws IgniteCheckedException If failed. + */ + public void initQueryStructuresForNotStartedCache(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException { + QuerySchema schema = cacheDesc.schema() != null ? cacheDesc.schema() : new QuerySchema(); + + GridCacheContextInfo cacheInfo = new GridCacheContextInfo(cacheDesc, ctx); + + ctx.query().onCacheStart(cacheInfo, schema, cacheDesc.sql()); + } + + /** * @param cache Cache to stop. * @param cancel Cancel flag. * @param destroy Destroy data flag. Setting to <code>true</code> will remove all cache data. @@ -1308,7 +1322,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { cache.stop(); - ctx.kernalContext().query().onCacheStop(ctx, !cache.context().group().persistenceEnabled() || destroy); + GridCacheContextInfo cacheInfo = new GridCacheContextInfo(ctx, false); + + ctx.kernalContext().query().onCacheStop(cacheInfo, !cache.context().group().persistenceEnabled() || destroy); if (isNearEnabled(ctx)) { GridDhtCacheAdapter dht = ctx.near().dht(); @@ -1770,7 +1786,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * * @param reqs Cache requests to start. * @param fut Completable future. */ @@ -1785,7 +1800,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * * @param reqs Cache requests to start. * @param initVer Init exchange version. * @param doneVer Finish excahnge vertison. @@ -1956,7 +1970,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { AffinityTopologyVersion exchTopVer, LocalJoinCachesContext locJoinCtx ) throws IgniteCheckedException { - long time = System.currentTimeMillis(); + long time = U.currentTimeMillis(); if (locJoinCtx == null) return new GridFinishedFuture<>(); @@ -1968,12 +1982,22 @@ public class GridCacheProcessor extends GridProcessorAdapter { .map(cacheInfo -> new StartCacheInfo(cacheInfo.get1(), cacheInfo.get2(), exchTopVer, false)) .collect(Collectors.toList()); + locJoinCtx.initCaches() + .forEach(cacheDesc -> { + try { + initQueryStructuresForNotStartedCache(cacheDesc); + } + catch (Exception e) { + log.error("Can't initialize query structures for not started cache [cacheName=" + cacheDesc.cacheName() + "]"); + } + }); + prepareStartCaches(startCacheInfos); context().exchange().exchangerUpdateHeartbeat(); if (log.isInfoEnabled()) - log.info("Starting caches on local join performed in " + (System.currentTimeMillis() - time) + " ms."); + log.info("Starting caches on local join performed in " + (U.currentTimeMillis() - time) + " ms."); return res; } @@ -2034,7 +2058,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Caches which was failed. * @throws IgniteCheckedException if failed. */ - Map<StartCacheInfo, IgniteCheckedException> prepareStartCachesIfPossible(Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException { + Map<StartCacheInfo, IgniteCheckedException> prepareStartCachesIfPossible( + Collection<StartCacheInfo> startCacheInfos) throws IgniteCheckedException { HashMap<StartCacheInfo, IgniteCheckedException> failedCaches = new HashMap<>(); prepareStartCaches(startCacheInfos, (data, operation) -> { @@ -2074,7 +2099,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheInfo.getCacheDescriptor(), cacheInfo.getReqNearCfg(), cacheInfo.getExchangeTopVer(), - cacheInfo.isDisabledAfterStart() + cacheInfo.isDisabledAfterStart(), + cacheInfo.isClientCache() ); return null; @@ -2136,11 +2162,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { cacheStartFailHandler.handle( startCacheInfo, cacheInfo -> { - GridCacheContext<?, ?> cctx = cacheContexts.get(cacheInfo); + GridCacheContext cctx = cacheContexts.get(cacheInfo); if (!cctx.isRecoveryMode()) { ctx.query().onCacheStart( - cctx, + new GridCacheContextInfo(cctx, cacheInfo.isClientCache()), cacheInfo.getCacheDescriptor().schema() != null ? cacheInfo.getCacheDescriptor().schema() : new QuerySchema(), @@ -2196,14 +2222,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheDescriptor desc, @Nullable NearCacheConfiguration reqNearCfg, AffinityTopologyVersion exchTopVer, - boolean disabledAfterStart + boolean disabledAfterStart, + boolean clientCache ) throws IgniteCheckedException { GridCacheContext cacheCtx = prepareCacheContext(startCfg, desc, reqNearCfg, exchTopVer, disabledAfterStart); if (cacheCtx.isRecoveryMode()) finishRecovery(exchTopVer, cacheCtx); else { - ctx.query().onCacheStart(cacheCtx, desc.schema() != null ? desc.schema() : new QuerySchema(), desc.sql()); + ctx.query().onCacheStart(new GridCacheContextInfo(cacheCtx, clientCache), desc.schema() != null ? desc.schema() : new QuerySchema(), desc.sql()); onCacheStarted(cacheCtx); } @@ -2275,6 +2302,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Stops cache under checkpoint lock. + * * @param cctx Cache context. */ private void stopCacheSafely(GridCacheContext<?, ?> cctx) { @@ -2299,7 +2327,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cacheContext Cache context. * @throws IgniteCheckedException If failed. */ - private void finishRecovery(AffinityTopologyVersion cacheStartVer, GridCacheContext<?, ?> cacheContext) throws IgniteCheckedException { + private void finishRecovery(AffinityTopologyVersion cacheStartVer, + GridCacheContext<?, ?> cacheContext) throws IgniteCheckedException { + CacheGroupContext groupContext = cacheContext.group(); // Take cluster-wide cache descriptor and try to update local cache and cache group parameters. @@ -2324,9 +2354,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Stops all caches and groups, that was recovered, but not activated on node join. - * Such caches can remain only if it was filtered by node filter on current node. - * It's impossible to check whether current node is affinity node for given cache before join to topology. + * Stops all caches and groups, that was recovered, but not activated on node join. Such caches can remain only if + * it was filtered by node filter on current node. It's impossible to check whether current node is affinity node + * for given cache before join to topology. */ public void shutdownNotFinishedRecoveryCaches() { for (GridCacheAdapter cacheAdapter : caches.values()) { @@ -2562,17 +2592,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { grp.onCacheStarted(cacheCtx); - ctx.query().onCacheStart(cacheCtx, desc.schema() != null ? desc.schema() : new QuerySchema(), desc.sql()); + ctx.query().onCacheStart(new GridCacheContextInfo(cacheCtx, false), + desc.schema() != null ? desc.schema() : new QuerySchema(), desc.sql()); if (log.isInfoEnabled()) { log.info("Started cache in recovery mode [name=" + cfg.getName() + - ", id=" + cacheCtx.cacheId() + - (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") + - ", dataRegionName=" + dataRegion + - ", mode=" + cfg.getCacheMode() + - ", atomicity=" + cfg.getAtomicityMode() + - ", backups=" + cfg.getBackups() + - ", mvcc=" + cacheCtx.mvccEnabled() + ']'); + ", id=" + cacheCtx.cacheId() + + (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") + + ", dataRegionName=" + dataRegion + + ", mode=" + cfg.getCacheMode() + + ", atomicity=" + cfg.getAtomicityMode() + + ", backups=" + cfg.getBackups() + + ", mvcc=" + cacheCtx.mvccEnabled() + ']'); } return cacheCtx; @@ -2764,6 +2795,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { return ctx; } + else + //Try to unregister query structures for not started caches. + ctx.query().onCacheStop(cacheName); return null; } @@ -3728,7 +3762,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}. * @return Future that will be completed when all caches are deployed. */ - public IgniteInternalFuture<Boolean> dynamicStartCaches(Collection<CacheConfiguration> ccfgList, boolean failIfExists, + public IgniteInternalFuture<Boolean> dynamicStartCaches(Collection<CacheConfiguration> ccfgList, + boolean failIfExists, boolean checkThreadTx, boolean disabledAfterStart) { return dynamicStartCachesByStoredConf( ccfgList.stream().map(StoredCacheData::new).collect(Collectors.toList()), @@ -4737,9 +4772,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ public Collection<DynamicCacheDescriptor> persistentCaches() { return cachesInfo.registeredCaches().values() - .stream() - .filter(desc -> isPersistentCache(desc.cacheConfiguration(), ctx.config().getDataStorageConfiguration())) - .collect(Collectors.toList()); + .stream() + .filter(desc -> isPersistentCache(desc.cacheConfiguration(), ctx.config().getDataStorageConfiguration())) + .collect(Collectors.toList()); } /** @@ -4747,9 +4782,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ public Collection<CacheGroupDescriptor> persistentGroups() { return cachesInfo.registeredCacheGroups().values() - .stream() - .filter(CacheGroupDescriptor::persistenceEnabled) - .collect(Collectors.toList()); + .stream() + .filter(CacheGroupDescriptor::persistenceEnabled) + .collect(Collectors.toList()); } /** @@ -4990,32 +5025,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Starts client caches that do not exist yet. - * - * @throws IgniteCheckedException In case of error. - */ - public void createMissingQueryCaches() throws IgniteCheckedException { - for (Map.Entry<String, DynamicCacheDescriptor> e : cachesInfo.registeredCaches().entrySet()) { - DynamicCacheDescriptor desc = e.getValue(); - - if (isMissingQueryCache(desc)) - dynamicStartCache(null, desc.cacheConfiguration().getName(), null, false, true, true).get(); - } - } - - /** - * Whether cache defined by provided descriptor is not yet started and has queries enabled. - * - * @param desc Descriptor. - * @return {@code True} if this is missing query cache. - */ - private boolean isMissingQueryCache(DynamicCacheDescriptor desc) { - CacheConfiguration ccfg = desc.cacheConfiguration(); - - return !caches.containsKey(ccfg.getName()) && QueryUtils.isEnabled(ccfg); - } - - /** * Registers MBean for cache components. * * @param obj Cache component. @@ -5439,7 +5448,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Recovery lifecycle for caches. */ private class CacheRecoveryLifecycle implements MetastorageLifecycleListener, DatabaseLifecycleListener { - /** Set of QuerySchema's saved on recovery. It's needed if cache query schema has changed after node joined to topology.*/ + /** + * Set of QuerySchema's saved on recovery. It's needed if cache query schema has changed after node joined to + * topology. + */ private final Map<Integer, QuerySchema> querySchemas = new ConcurrentHashMap<>(); /** {@inheritDoc} */ @@ -5457,7 +5469,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { + @Override public void beforeBinaryMemoryRestore( + IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { for (DynamicCacheDescriptor cacheDescriptor : persistentCaches()) preparePageStore(cacheDescriptor, true); } @@ -5508,7 +5521,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param <T> Type of started data. */ - private static interface StartCacheFailHandler<T, R> { + private interface StartCacheFailHandler<T, R> { /** * Handle of fail. * http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LocalJoinCachesContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LocalJoinCachesContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LocalJoinCachesContext.java index f41df60..650d398 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LocalJoinCachesContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LocalJoinCachesContext.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -36,7 +35,13 @@ import org.apache.ignite.internal.util.typedef.internal.S; public class LocalJoinCachesContext { /** */ @GridToStringInclude - private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches = Collections.emptyList(); + private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches; + + /** + * + */ + @GridToStringInclude + private List<DynamicCacheDescriptor> locJoinInitCaches; /** */ @GridToStringInclude @@ -48,15 +53,18 @@ public class LocalJoinCachesContext { /** * @param locJoinStartCaches Local caches to start on join. + * @param locJoinInitCaches Local caches to initialize query infrastructure without start of caches. * @param cacheGrpDescs Cache group descriptors captured during join. * @param cacheDescs Cache descriptors captured during join. */ public LocalJoinCachesContext( List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches, + List<DynamicCacheDescriptor> locJoinInitCaches, Map<Integer, CacheGroupDescriptor> cacheGrpDescs, Map<String, DynamicCacheDescriptor> cacheDescs ) { this.locJoinStartCaches = locJoinStartCaches; + this.locJoinInitCaches = locJoinInitCaches; this.cacheGrpDescs = cacheGrpDescs; this.cacheDescs = cacheDescs; } @@ -69,6 +77,13 @@ public class LocalJoinCachesContext { } /** + * @return Cache descriptors to initialize query infrastructure without start of caches. + */ + public List<DynamicCacheDescriptor> initCaches() { + return locJoinInitCaches; + } + + /** * @return Group descriptors. */ public Map<Integer, CacheGroupDescriptor> cacheGroupDescriptors() { @@ -96,13 +111,22 @@ public class LocalJoinCachesContext { if (cacheNames.contains(desc.cacheName())) it.remove(); } + + Iterator<DynamicCacheDescriptor> iter = locJoinInitCaches.iterator(); + + for (; iter.hasNext(); ) { + DynamicCacheDescriptor desc = iter.next(); + + if (cacheNames.contains(desc.cacheName())) + iter.remove(); + } } /** * @return {@code True} if the context is empty. */ public boolean isEmpty() { - return F.isEmpty(locJoinStartCaches) && F.isEmpty(cacheGrpDescs) && F.isEmpty(cacheDescs); + return F.isEmpty(locJoinStartCaches) && F.isEmpty(locJoinInitCaches) && F.isEmpty(cacheGrpDescs) && F.isEmpty(cacheDescs); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java index a5aea26..7a73b3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StartCacheInfo.java @@ -42,6 +42,9 @@ public class StartCacheInfo { /** Disable started cache after start or not. */ private final boolean disabledAfterStart; + /** Cache is client or not. */ + private final boolean clientCache; + /** * @param desc Cache configuration for start. * @param reqNearCfg Near cache configuration for start. @@ -64,11 +67,26 @@ public class StartCacheInfo { public StartCacheInfo(CacheConfiguration conf, DynamicCacheDescriptor desc, NearCacheConfiguration reqNearCfg, AffinityTopologyVersion exchTopVer, boolean disabledAfterStart) { + this(conf, desc, reqNearCfg, exchTopVer, disabledAfterStart, false); + } + + /** + * @param conf Cache configuration for start. + * @param desc Cache descriptor for start. + * @param reqNearCfg Near cache configuration for start. + * @param exchTopVer Exchange topology version in which starting happened. + * @param disabledAfterStart Disable started cache after start or not. + * @param clientCache {@code true} in case starting cache on client node. + */ + public StartCacheInfo(CacheConfiguration conf, DynamicCacheDescriptor desc, + NearCacheConfiguration reqNearCfg, + AffinityTopologyVersion exchTopVer, boolean disabledAfterStart, boolean clientCache) { startedConf = conf; this.desc = desc; this.reqNearCfg = reqNearCfg; this.exchTopVer = exchTopVer; this.disabledAfterStart = disabledAfterStart; + this.clientCache = clientCache; } /** @@ -106,6 +124,13 @@ public class StartCacheInfo { return disabledAfterStart; } + /** + * @return Start cache on client or not. + */ + public boolean isClientCache() { + return clientCache; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(StartCacheInfo.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index dab2516..c8c9502 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -29,6 +29,7 @@ import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; @@ -197,20 +198,20 @@ public interface GridQueryIndexing { * * @param cacheName Cache name. * @param schemaName Schema name. - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @throws IgniteCheckedException If failed. */ - public void registerCache(String cacheName, String schemaName, GridCacheContext<?,?> cctx) + public void registerCache(String cacheName, String schemaName, GridCacheContextInfo<?, ?> cacheInfo) throws IgniteCheckedException; /** * Unregisters cache. * - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param rmvIdx If {@code true}, will remove index. * @throws IgniteCheckedException If failed to drop cache schema. */ - public void unregisterCache(GridCacheContext cctx, boolean rmvIdx) throws IgniteCheckedException; + public void unregisterCache(GridCacheContextInfo cacheInfo, boolean rmvIdx) throws IgniteCheckedException; /** * @@ -237,12 +238,14 @@ public interface GridQueryIndexing { /** * Registers type if it was not known before or updates it otherwise. * - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param desc Type descriptor. + * @param isSql {@code true} in case table has been created from SQL. * @throws IgniteCheckedException If failed. * @return {@code True} if type was registered, {@code false} if for some reason it was rejected. */ - public boolean registerType(GridCacheContext cctx, GridQueryTypeDescriptor desc, boolean isSql) throws IgniteCheckedException; + public boolean registerType(GridCacheContextInfo cacheInfo, GridQueryTypeDescriptor desc, + boolean isSql) throws IgniteCheckedException; /** * Updates index. Note that key is unique for cache, so if cache contains multiple indexes @@ -354,4 +357,22 @@ public interface GridQueryIndexing { * @return Row cache cleaner. */ public GridQueryRowCacheCleaner rowCacheCleaner(int cacheGroupId); + + /** + * Return context for registered cache info. + * + * @param cacheName Cache name. + * @return Cache context for registered cache or {@code null} in case the cache has not been registered. + */ + @Nullable public GridCacheContextInfo registeredCacheInfo(String cacheName); + + /** + * Initialize table's cache context created for not started cache. + * + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + * + * @return {@code true} If context has been initialized. + */ + public boolean initCacheContext(GridCacheContext ctx) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index d3f2930..db52f40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -63,8 +63,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; @@ -673,23 +673,26 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * Create type descriptors from schema and initialize indexing for given cache.<p> * Use with {@link #busyLock} where appropriate. - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param schema Initial schema. * @param isSql {@code true} in case create cache initialized from SQL. * @throws IgniteCheckedException If failed. */ - public void onCacheStart0(GridCacheContext<?, ?> cctx, QuerySchema schema, boolean isSql) + public void onCacheStart0(GridCacheContextInfo<?, ?> cacheInfo, QuerySchema schema, boolean isSql) throws IgniteCheckedException { - cctx.shared().database().checkpointReadLock(); + ctx.cache().context().database().checkpointReadLock(); try { + if (cacheInfo.isClientCache() && cacheInfo.isCacheContextInited() && idx.initCacheContext(cacheInfo.gridCacheContext())) + return; + synchronized (stateMux) { - boolean escape = cctx.config().isSqlEscapeAll(); + boolean escape = cacheInfo.config().isSqlEscapeAll(); - String cacheName = cctx.name(); + String cacheName = cacheInfo.name(); - String schemaName = QueryUtils.normalizeSchemaName(cacheName, cctx.config().getSqlSchema()); + String schemaName = QueryUtils.normalizeSchemaName(cacheName, cacheInfo.config().getSqlSchema()); // Prepare candidates. List<Class<?>> mustDeserializeClss = new ArrayList<>(); @@ -700,7 +703,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (!F.isEmpty(qryEntities)) { for (QueryEntity qryEntity : qryEntities) { - QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(cacheName, schemaName, cctx, qryEntity, + QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(cacheName, schemaName, cacheInfo, qryEntity, mustDeserializeClss, escape); cands.add(cand); @@ -735,7 +738,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { // Apply pending operation which could have been completed as no-op at this point. // There could be only one in-flight operation for a cache. for (SchemaOperation op : schemaOps.values()) { - if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId())) { + if (F.eq(op.proposeMessage().deploymentId(), cacheInfo.dynamicDeploymentId())) { if (op.started()) { SchemaOperationWorker worker = op.manager().worker(); @@ -799,7 +802,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { } // Ready to register at this point. - registerCache0(cacheName, schemaName, cctx, cands, isSql); + registerCache0(cacheName, schemaName, cacheInfo, cands, isSql); // Warn about possible implicit deserialization. if (!mustDeserializeClss.isEmpty()) { @@ -813,7 +816,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { } } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.cache().context().database().checkpointReadUnlock(); } } @@ -848,12 +851,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { * When called for the first time, we initialize topology thus understanding whether current node is coordinator * or not. * - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param schema Index states. * @param isSql {@code true} in case create cache initialized from SQL. * @throws IgniteCheckedException If failed. */ - public void onCacheStart(GridCacheContext cctx, QuerySchema schema, boolean isSql) throws IgniteCheckedException { + public void onCacheStart(GridCacheContextInfo cacheInfo, QuerySchema schema, + boolean isSql) throws IgniteCheckedException { if (idx == null) return; @@ -861,7 +865,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { return; try { - onCacheStart0(cctx, schema, isSql); + onCacheStart0(cacheInfo, schema, isSql); } finally { busyLock.leaveBusy(); @@ -869,10 +873,26 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param cctx Cache context. + * Destroy H2 structures for not started caches. + * + * @param cacheName Cache name. + */ + public void onCacheStop(String cacheName) { + if (idx == null) + return; + + GridCacheContextInfo cacheInfo = idx.registeredCacheInfo(cacheName); + + if (cacheInfo != null) + onCacheStop(cacheInfo, true); + } + + + /** + * @param cacheInfo Cache context info. * @param removeIdx If {@code true}, will remove index. */ - public void onCacheStop(GridCacheContext cctx, boolean removeIdx) { + public void onCacheStop(GridCacheContextInfo cacheInfo, boolean removeIdx) { if (idx == null) return; @@ -880,7 +900,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { return; try { - onCacheStop0(cctx, removeIdx); + onCacheStop0(cacheInfo, removeIdx); } finally { busyLock.leaveBusy(); @@ -1416,9 +1436,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { String cacheName = op.cacheName(); - GridCacheAdapter cache = ctx.cache().internalCache(cacheName); + GridCacheContextInfo cacheInfo = idx.registeredCacheInfo(cacheName); - if (cache == null || !F.eq(depId, cache.context().dynamicDeploymentId())) + if (cacheInfo == null || !F.eq(depId, cacheInfo.dynamicDeploymentId())) throw new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, cacheName); try { @@ -1427,11 +1447,18 @@ public class GridQueryProcessor extends GridProcessorAdapter { QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index()); - GridCacheContext cctx = cache.context(); + SchemaIndexCacheVisitor visitor; - SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName()); + if (cacheInfo.isCacheContextInited()) { + GridCacheContext cctx = cacheInfo.gridCacheContext(); - SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx, filter, cancelTok, op0.parallel()); + SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName()); + + visitor = new SchemaIndexCacheVisitorImpl(cctx, filter, cancelTok, op0.parallel()); + } + else + //For not started caches we shouldn't add any data to index. + visitor = clo -> {}; idx.dynamicIndexCreate(op0.schemaName(), op0.tableName(), idxDesc, op0.ifNotExists(), visitor); } @@ -1582,7 +1609,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * * @param cacheName Cache name. * @param schemaName Schema name. - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param cands Candidates. * @param isSql {@code true} in case create cache initialized from SQL. * @throws IgniteCheckedException If failed. @@ -1590,13 +1617,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { private void registerCache0( String cacheName, String schemaName, - GridCacheContext<?, ?> cctx, + GridCacheContextInfo<?, ?> cacheInfo, Collection<QueryTypeCandidate> cands, boolean isSql ) throws IgniteCheckedException { synchronized (stateMux) { if (idx != null) - idx.registerCache(cacheName, schemaName, cctx); + idx.registerCache(cacheName, schemaName, cacheInfo); try { for (QueryTypeCandidate cand : cands) { @@ -1627,13 +1654,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { } if (idx != null) - idx.registerType(cctx, desc, isSql); + idx.registerType(cacheInfo, desc, isSql); } cacheNames.add(CU.mask(cacheName)); } catch (IgniteCheckedException | RuntimeException e) { - onCacheStop0(cctx, true); + onCacheStop0(cacheInfo, true); throw e; } @@ -1644,14 +1671,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { * Unregister cache.<p> * Use with {@link #busyLock} where appropriate. * - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param destroy Destroy flag. */ - public void onCacheStop0(GridCacheContext cctx, boolean destroy) { + public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy) { if (idx == null) return; - String cacheName = cctx.name(); + String cacheName = cacheInfo.name(); synchronized (stateMux) { // Clear types. @@ -1687,7 +1714,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { // Notify indexing. try { - idx.unregisterCache(cctx, destroy); + idx.unregisterCache(cacheInfo, destroy); } catch (Exception e) { U.error(log, "Failed to clear indexing on cache unregister (will ignore): " + cacheName, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java index 6a2c22a..08d44bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java @@ -47,7 +47,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; @@ -397,17 +397,19 @@ public class QueryUtils { * * @param cacheName Cache name. * @param schemaName Schema name. - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param qryEntity Query entity. * @param mustDeserializeClss Classes which must be deserialized. * @param escape Escape flag. * @return Type candidate. * @throws IgniteCheckedException If failed. */ - public static QueryTypeCandidate typeForQueryEntity(String cacheName, String schemaName, GridCacheContext cctx, - QueryEntity qryEntity, List<Class<?>> mustDeserializeClss, boolean escape) throws IgniteCheckedException { - GridKernalContext ctx = cctx.kernalContext(); - CacheConfiguration<?,?> ccfg = cctx.config(); + public static QueryTypeCandidate typeForQueryEntity(String cacheName, String schemaName, + GridCacheContextInfo cacheInfo, + QueryEntity qryEntity, List<Class<?>> mustDeserializeClss, boolean escape) + throws IgniteCheckedException { + GridKernalContext ctx = cacheInfo.context(); + CacheConfiguration<?, ?> ccfg = cacheInfo.config(); boolean binaryEnabled = ctx.cacheObjects().isBinaryEnabled(ccfg); @@ -494,7 +496,7 @@ public class QueryUtils { // Need to setup affinity key for distributed joins. String keyType = qryEntity.getKeyType(); - if (!cctx.customAffinityMapper() && keyType != null) { + if (!cacheInfo.customAffinityMapper() && keyType != null) { if (coCtx != null) { CacheDefaultBinaryAffinityKeyMapper mapper = (CacheDefaultBinaryAffinityKeyMapper)coCtx.defaultAffMapper(); @@ -516,7 +518,7 @@ public class QueryUtils { else { processClassMeta(qryEntity, desc, coCtx); - AffinityKeyMapper keyMapper = cctx.config().getAffinityMapper(); + AffinityKeyMapper keyMapper = cacheInfo.config().getAffinityMapper(); if (keyMapper instanceof GridCacheDefaultAffinityKeyMapper) { String affField = @@ -619,14 +621,14 @@ public class QueryUtils { /** * Add validate property to QueryTypeDescriptor. - * + * * @param ctx Kernel context. * @param qryEntity Query entity. * @param d Descriptor. * @param name Field name. * @throws IgniteCheckedException */ - private static void addKeyValueValidationProperty(GridKernalContext ctx, QueryEntity qryEntity, QueryTypeDescriptorImpl d, + private static void addKeyValueValidationProperty(GridKernalContext ctx, QueryEntity qryEntity, QueryTypeDescriptorImpl d, String name, boolean isKey) throws IgniteCheckedException { Map<String, Object> dfltVals = qryEntity.getDefaultFieldValues(); @@ -638,12 +640,12 @@ public class QueryUtils { Object dfltVal = dfltVals.get(name); QueryBinaryProperty prop = buildBinaryProperty( - ctx, + ctx, name, U.classForName(typeName, Object.class, true), - d.aliases(), - isKey, - true, + d.aliases(), + isKey, + true, dfltVal, precision == null ? -1 : precision.getOrDefault(name, -1), scale == null ? -1 : scale.getOrDefault(name, -1)); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java index c037895..cb78ff6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.schema; +import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -37,8 +38,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.thread.IgniteThread; -import java.util.List; - import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index 5432257..ce7f4d7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -300,13 +300,14 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public void registerCache(String cacheName, String schemaName, - GridCacheContext<?, ?> cctx) throws IgniteCheckedException { - if (FAILED_CACHES.contains(cctx.name()) && cctx.kernalContext().clientNode()) - throw new IgniteCheckedException("Test query exception " + cctx.name() + " " + new Random().nextInt()); + GridCacheContextInfo<?, ?> cacheInfo) throws IgniteCheckedException { + if (FAILED_CACHES.contains(cacheInfo.name()) && cacheInfo.gridCacheContext().kernalContext().clientNode()) + throw new IgniteCheckedException("Test query exception " + cacheInfo.name() + " " + new Random().nextInt()); } /** {@inheritDoc} */ - @Override public void unregisterCache(GridCacheContext cctx, boolean rmvIdx) throws IgniteCheckedException { + @Override public void unregisterCache(GridCacheContextInfo cacheInfo, + boolean rmvIdx) throws IgniteCheckedException { // No-op } @@ -318,8 +319,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT return null; } - /** {@inheritDoc} */ - @Override public boolean registerType(GridCacheContext cctx, + @Override public boolean registerType(GridCacheContextInfo cacheInfo, GridQueryTypeDescriptor desc, boolean isSql) throws IgniteCheckedException { return false; } @@ -389,5 +389,18 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT @Override public GridQueryRowCacheCleaner rowCacheCleaner(int cacheGroupId) { return null; } + + /** {@inheritDoc} */ + @Override public GridCacheContextInfo registeredCacheInfo(String cacheName) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean initCacheContext(GridCacheContext ctx) throws IgniteCheckedException { + if (FAILED_CACHES.contains(ctx.name()) && ctx.kernalContext().clientNode()) + throw new IgniteCheckedException("Test query exception " + ctx.name() + " " + new Random().nextInt()); + + return true; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 9b31b02..1e32f57 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -1219,23 +1219,17 @@ public class DmlStatementsProcessor { GridH2Table tbl = idx.dataTable(cmd.schemaName(), cmd.tableName()); if (tbl == null) { - idx.kernalContext().cache().createMissingQueryCaches(); - - tbl = idx.dataTable(cmd.schemaName(), cmd.tableName()); - } - - if (tbl == null) { throw new IgniteSQLException("Table does not exist: " + cmd.tableName(), IgniteQueryErrorCode.TABLE_NOT_FOUND); } + H2Utils.checkAndStartNotStartedCache(tbl); + UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl); IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new BulkLoadDataConverter(plan); - GridCacheContext cache = tbl.cache(); - - IgniteDataStreamer<Object, Object> streamer = cache.grid().dataStreamer(cache.name()); + IgniteDataStreamer<Object, Object> streamer = idx.kernalContext().grid().dataStreamer(tbl.cacheName()); BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCache.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCache.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCache.java index 06a3251..15f5501 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCache.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCache.java @@ -17,19 +17,18 @@ package org.apache.ignite.internal.processors.query.h2; +import java.util.Iterator; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheGroupContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap; import org.apache.ignite.internal.util.typedef.F; import org.jsr166.ConcurrentLinkedHashMap; -import java.util.Iterator; -import java.util.Map; - import static org.jsr166.ConcurrentLinkedHashMap.DFLT_INIT_CAP; import static org.jsr166.ConcurrentLinkedHashMap.DFLT_LOAD_FACTOR; @@ -100,13 +99,13 @@ public class H2RowCache implements GridQueryRowCacheCleaner { /** * Cache un-registration callback. * - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @return {@code True} if there are no more usages for the given cache group. */ - public boolean onCacheUnregistered(GridCacheContext cctx) { + public boolean onCacheUnregistered(GridCacheContextInfo cacheInfo) { boolean res = --usageCnt == 0; - clearForCache(cctx); + clearForCache(cacheInfo); return res; } @@ -121,10 +120,10 @@ public class H2RowCache implements GridQueryRowCacheCleaner { /** * Clear entries belonging to the given cache. * - * @param cctx Cache context. + * @param cacheInfo Cache context info. */ - private void clearForCache(GridCacheContext cctx) { - int cacheId = cctx.cacheId(); + private void clearForCache(GridCacheContextInfo cacheInfo) { + int cacheId = cacheInfo.cacheId(); Iterator<Map.Entry<Long, GridH2KeyValueRowOnheap>> iter = rows.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java index 39f3329..ca135f1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2RowCacheRegistry.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.query.h2; +import java.util.HashMap; +import java.util.Map; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.jetbrains.annotations.Nullable; -import java.util.HashMap; -import java.util.Map; - /** * H2 row cache registry. */ @@ -47,14 +47,14 @@ public class H2RowCacheRegistry { /** * Callback invoked on cache registration within indexing. * - * @param cctx Cache context. + * @param cacheInfo Cache info context. */ - public void onCacheRegistered(GridCacheContext cctx) { - if (!cctx.config().isSqlOnheapCacheEnabled()) + public void onCacheRegistered(GridCacheContextInfo cacheInfo) { + if (!cacheInfo.config().isSqlOnheapCacheEnabled()) return; synchronized (mux) { - int grpId = cctx.groupId(); + int grpId = cacheInfo.groupId(); if (caches != null) { H2RowCache cache = caches.get(grpId); @@ -68,31 +68,37 @@ public class H2RowCacheRegistry { HashMap<Integer, H2RowCache> caches0 = copy(); - H2RowCache rowCache = new H2RowCache(cctx.group(), cctx.config().getSqlOnheapCacheMaxSize()); + if (cacheInfo.affinityNode()) { + GridCacheContext cacheCtx = cacheInfo.gridCacheContext(); - caches0.put(grpId, rowCache); + assert cacheCtx != null; - caches = caches0; + H2RowCache rowCache = new H2RowCache(cacheCtx.group(), cacheInfo.config().getSqlOnheapCacheMaxSize()); - // Inject row cache cleaner into store on cache creation. - // Used in case the cache with enabled SqlOnheapCache is created in exists cache group - // and SqlOnheapCache is disbaled for the caches have been created before. - for (IgniteCacheOffheapManager.CacheDataStore ds : cctx.offheap().cacheDataStores()) - ds.setRowCacheCleaner(rowCache); + caches0.put(grpId, rowCache); + + caches = caches0; + + // Inject row cache cleaner into store on cache creation. + // Used in case the cache with enabled SqlOnheapCache is created in exists cache group + // and SqlOnheapCache is disbaled for the caches have been created before. + for (IgniteCacheOffheapManager.CacheDataStore ds : cacheCtx.offheap().cacheDataStores()) + ds.setRowCacheCleaner(rowCache); + } } } /** * Callback invoked when cache gets unregistered. * - * @param cctx Cache context. + * @param cacheInfo Cache context info. */ - public void onCacheUnregistered(GridCacheContext cctx) { - if (!cctx.config().isSqlOnheapCacheEnabled()) + public void onCacheUnregistered(GridCacheContextInfo cacheInfo) { + if (!cacheInfo.config().isSqlOnheapCacheEnabled()) return; synchronized (mux) { - int grpId = cctx.groupId(); + int grpId = cacheInfo.groupId(); assert caches != null; @@ -100,7 +106,7 @@ public class H2RowCacheRegistry { assert cache != null; - if (cache.onCacheUnregistered(cctx)) { + if (cache.onCacheUnregistered(cacheInfo)) { HashMap<Integer, H2RowCache> caches0 = copy(); caches0.remove(grpId); http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java index ab7cb4b..ed3a9c5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Schema.java @@ -105,7 +105,7 @@ public class H2Schema { if (tbls.putIfAbsent(tbl.tableName(), tbl) != null) throw new IllegalStateException("Table already registered: " + tbl.fullTableName()); - if (typeToTbl.putIfAbsent(new H2TypeKey(tbl.cache().name(), tbl.typeName()), tbl) != null) + if (typeToTbl.putIfAbsent(new H2TypeKey(tbl.cacheName(), tbl.typeName()), tbl) != null) throw new IllegalStateException("Table already registered: " + tbl.fullTableName()); } @@ -115,7 +115,7 @@ public class H2Schema { public void remove(H2TableDescriptor tbl) { tbls.remove(tbl.tableName()); - typeToTbl.remove(new H2TypeKey(tbl.cache().name(), tbl.typeName())); + typeToTbl.remove(new H2TypeKey(tbl.cacheName(), tbl.typeName())); } /** @@ -128,7 +128,7 @@ public class H2Schema { tbls.remove(tbl.tableName()); - typeToTbl.remove(new H2TypeKey(tbl.cache().name(), tbl.typeName())); + typeToTbl.remove(new H2TypeKey(tbl.cacheName(), tbl.typeName())); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java index 6c20727..920f03d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java @@ -25,6 +25,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.QueryIndexType; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheContextInfo; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; @@ -71,8 +72,8 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { /** */ private final H2Schema schema; - /** Cache context. */ - private final GridCacheContext cctx; + /** Cache context info. */ + private final GridCacheContextInfo cacheInfo; /** */ private GridH2Table tbl; @@ -92,15 +93,15 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { * @param idx Indexing. * @param schema Schema. * @param type Type descriptor. - * @param cctx Cache context. + * @param cacheInfo Cache context info. * @param isSql {@code true} in case table has been created from SQL. */ public H2TableDescriptor(IgniteH2Indexing idx, H2Schema schema, GridQueryTypeDescriptor type, - GridCacheContext cctx, boolean isSql) { + GridCacheContextInfo cacheInfo, boolean isSql) { this.idx = idx; this.type = type; this.schema = schema; - this.cctx = cctx; + this.cacheInfo = cacheInfo; this.isSql = isSql; fullTblName = H2Utils.withQuotes(schema.schemaName()) + "." + H2Utils.withQuotes(type.tableName()); @@ -156,10 +157,24 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { } /** + * @return Cache name. + */ + public String cacheName(){ + return cacheInfo.name(); + } + + /** + * @return Cache context info. + */ + public GridCacheContextInfo cacheInfo() { + return cacheInfo; + } + + /** * @return Cache context. */ public GridCacheContext cache() { - return cctx; + return cacheInfo.gridCacheContext(); } /** @@ -188,8 +203,8 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { * @return H2 row factory. */ H2RowFactory rowFactory(GridH2RowDescriptor rowDesc) { - if (cctx.affinityNode()) - return new H2RowFactory(rowDesc, cctx); + if (cacheInfo.affinityNode()) + return new H2RowFactory(rowDesc, cacheInfo.gridCacheContext()); return null; } @@ -421,10 +436,10 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory { * @return Index. */ private Index createHashIndex(GridH2Table tbl, String idxName, List<IndexColumn> cols) { - if (cctx.affinityNode()) { + if (cacheInfo.affinityNode()) { assert pkHashIdx == null : pkHashIdx; - pkHashIdx = new H2PkHashIndex(cctx, tbl, idxName, cols); + pkHashIdx = new H2PkHashIndex(cacheInfo.gridCacheContext(), tbl, idxName, cols); return pkHashIdx; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableEngine.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableEngine.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableEngine.java index c05aaf6..9e79fba 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableEngine.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableEngine.java @@ -80,7 +80,7 @@ public class H2TableEngine implements TableEngine { /** {@inheritDoc} */ @Override public TableBase createTable(CreateTableData createTblData) { - resTbl0 = new GridH2Table(createTblData, rowDesc0, rowFactory0, tblDesc0, tblDesc0.cache()); + resTbl0 = new GridH2Table(createTblData, rowDesc0, rowFactory0, tblDesc0, tblDesc0.cacheInfo()); return resTbl0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d19123d4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java index 865754f..d45a60f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.engine.Session; import org.h2.jdbc.JdbcConnection; import org.h2.result.SortOrder; @@ -298,4 +300,29 @@ public class H2Utils { dest.add(col); } } + + /** + * Check that given table has not started cache and start it for such case. + * + * @param tbl Table to check on not started cache. + * @return {@code true} in case not started and has been started. + */ + public static boolean checkAndStartNotStartedCache(GridH2Table tbl) { + if (tbl != null && tbl.isCacheLazy()) { + String cacheName = tbl.cacheInfo().config().getName(); + + GridKernalContext ctx = tbl.cacheInfo().context(); + + try { + Boolean res = ctx.cache().dynamicStartCache(null, cacheName, null, false, true, true).get(); + + return U.firstNotNull(res, Boolean.FALSE); + } + catch (IgniteCheckedException ex) { + throw U.convertException(ex); + } + } + + return false; + } }
