This is an automated email from the ASF dual-hosted git repository. vozerov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new ab01d51 IGNITE-11210: SQL: Merged DML and other command plans into a single cache. This closes #6200. new 1948ba8 Merge remote-tracking branch 'origin/master' ab01d51 is described below commit ab01d51f51d93841f27c162e6a54e85ae911dfe1 Author: devozerov <voze...@gridgain.com> AuthorDate: Fri Mar 1 13:46:04 2019 +0300 IGNITE-11210: SQL: Merged DML and other command plans into a single cache. This closes #6200. --- .../processors/query/h2/IgniteH2Indexing.java | 107 +++++---------------- .../internal/processors/query/h2/QueryParser.java | 36 ++++++- .../processors/query/h2/QueryParserResultDml.java | 16 ++- 3 files changed, 72 insertions(+), 87 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index acf1136..65a85ba 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -32,7 +32,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; @@ -109,7 +108,6 @@ import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIt import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; -import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.opt.QueryContext; @@ -127,7 +125,6 @@ import org.apache.ignite.internal.sql.command.SqlCommand; import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand; import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand; import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult; -import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -198,17 +195,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */ private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4; - /** Default size for update plan cache. */ - private static final int UPDATE_PLAN_CACHE_SIZE = 1024; - /** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */ private final boolean updateInTxAllowed = Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION); - /** Update plans cache. */ - private volatile ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache = - new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE); - /** Logger. */ @LoggerResource private IgniteLogger log; @@ -475,7 +465,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { QueryParserResult parseRes = parser.parse(schemaName, fieldsQry, false); if (parseRes.isDml()) { - UpdateResult updRes = executeUpdate(schemaName, parseRes.dml(), fieldsQry, true, filter, cancel); + QueryParserResultDml dml = parseRes.dml(); + + assert dml != null; + + UpdateResult updRes = executeUpdate(schemaName, dml, fieldsQry, true, filter, cancel); List<?> updResRow = Collections.singletonList(updRes.counter()); @@ -754,7 +748,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { boolean fail = false; try { - UpdatePlan plan = updatePlan(schemaName, dml, null); + UpdatePlan plan = dml.plan(); List<List<?>> planRows = plan.createRows(args != null ? args : X.EMPTY_OBJECT_ARRAY); @@ -1285,12 +1279,16 @@ public class IgniteH2Indexing implements GridQueryIndexing { // Execute. if (parseRes.isCommand()) { + QueryParserResultCommand cmd = parseRes.command(); + + assert cmd != null; + // Execute command. FieldsQueryCursor<List<?>> cmdRes = executeCommand( schemaName, newQry, cliCtx, - parseRes.command() + cmd ); res.add(cmdRes); @@ -1365,6 +1363,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { boolean fail = false; try { + if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) { + throw new IgniteSQLException("DML statements are not allowed inside a transaction over " + + "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " + + "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " + + "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")"); + } + if (!loc) return executeUpdateDistributed(schemaName, dml , qry, cancel); else { @@ -1537,7 +1542,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { IndexingQueryFilter filter = backupFilter(topVer, parts); - UpdatePlan plan = updatePlan(schema, dml, fldsQry); + UpdatePlan plan = dml.plan(); GridCacheContext planCctx = plan.cacheContext(); @@ -1585,6 +1590,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { ); cur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @SuppressWarnings("NullableProblems") @Override public Iterator<List<?>> iterator() { try { return res.iterator(); @@ -2125,16 +2131,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { partReservationMgr.onCacheStop(cacheName); - // Remove cached DML plans. - Iterator<Map.Entry<H2CachedStatementKey, UpdatePlan>> iter = updatePlanCache.entrySet().iterator(); - - while (iter.hasNext()) { - UpdatePlan plan = iter.next().getValue(); - - if (F.eq(cacheName, plan.cacheContext().name())) - iter.remove(); - } - // Drop schema (needs to be called after callback to DML processor because the latter depends on schema). schemaMgr.onCacheDestroyed(cacheName, rmvIdx); @@ -2150,8 +2146,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { */ private void clearPlanCache() { parser.clearCache(); - - updatePlanCache = new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE); } /** {@inheritDoc} */ @@ -2188,10 +2182,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @throws IgniteCheckedException If failed. */ public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException { - IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer); - - if (fut != null) - fut.get(); + ctx.cache().context().exchange().affinityReadyFuture(topVer).get(); } /** {@inheritDoc} */ @@ -2290,7 +2281,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { List<Object[]> argss = fieldsQry0.batchedArguments(); - UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry0); + UpdatePlan plan = dml.plan(); GridCacheContext<?, ?> cctx = plan.cacheContext(); @@ -2402,7 +2393,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { long items = 0; - UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry); + UpdatePlan plan = dml.plan(); GridCacheContext<?, ?> cctx = plan.cacheContext(); @@ -2651,6 +2642,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { ); cur = new QueryCursorImpl<>(new Iterable<List<?>>() { + @SuppressWarnings("NullableProblems") @Override public Iterator<List<?>> iterator() { try { return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true); @@ -2666,55 +2658,4 @@ public class IgniteH2Indexing implements GridQueryIndexing { return DmlUtils.processSelectResult(plan, cur, pageSize); } - - /** - * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args, - * if available. - * - * @param schemaName Schema. - * @param dml Command. - * @param fieldsQry Original fields query. - * @return Update plan. - */ - @SuppressWarnings("IfMayBeConditional") - private UpdatePlan updatePlan( - String schemaName, - QueryParserResultDml dml, - SqlFieldsQuery fieldsQry - ) throws IgniteCheckedException { - // Disallow updates on SYSTEM schema. - if (F.eq(QueryUtils.SCHEMA_SYS, schemaName)) - throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema", - IgniteQueryErrorCode.UNSUPPORTED_OPERATION); - - // Disallow updates inside transaction if this is not MVCC mode. - boolean inTx = ctx.cache().context().tm().inUserTx(); - - if (!dml.mvccEnabled() && !updateInTxAllowed && inTx) { - throw new IgniteSQLException("DML statements are not allowed inside a transaction over " + - "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " + - "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " + - "\"IGNITE_ALLOW_DML_INSIDE_TRANSACTION\""); - } - - H2CachedStatementKey planKey = new H2CachedStatementKey(schemaName, dml.statement().getSQL(), fieldsQry); - - UpdatePlan res = updatePlanCache.get(planKey); - - if (res != null) - return res; - - res = UpdatePlanBuilder.planForStatement( - schemaName, - dml.statement(), - dml.mvccEnabled(), - this, - fieldsQry - ); - - // Don't cache re-runs - UpdatePlan oldRes = updatePlanCache.putIfAbsent(planKey, res); - - return oldRes != null ? oldRes : res; - } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java index 52d52bc..6ddea41 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java @@ -30,8 +30,11 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils; import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst; @@ -333,7 +336,7 @@ public class QueryParser { return new QueryParserResult(newQry, remainingQry, null, null, cmd); } else if (GridSqlQueryParser.isDml(prepared)) { - QueryParserResultDml dml = prepareDmlStatement(prepared); + QueryParserResultDml dml = prepareDmlStatement(schemaName, qry, prepared); return new QueryParserResult(newQry, remainingQry, null, dml, null); } @@ -475,10 +478,16 @@ public class QueryParser { /** * Prepare DML statement. * + * @param schemaName Schema name. + * @param qry Query. * @param prepared Prepared. * @return Statement. */ - private QueryParserResultDml prepareDmlStatement(Prepared prepared) { + private QueryParserResultDml prepareDmlStatement(String schemaName, SqlFieldsQuery qry, Prepared prepared) { + if (F.eq(QueryUtils.SCHEMA_SYS, schemaName)) + throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + // Prepare AST. GridSqlQueryParser parser = new GridSqlQueryParser(false); @@ -517,11 +526,32 @@ public class QueryParser { streamTbl = DmlAstUtils.gridTableForElement(insert.into()).dataTable(); } + + // Create update plan. + UpdatePlan plan; + + try { + plan = UpdatePlanBuilder.planForStatement( + schemaName, + stmt, + mvccEnabled, + idx, + qry + ); + } + catch (Exception e) { + if (e instanceof IgniteSQLException) + throw (IgniteSQLException)e; + else + throw new IgniteSQLException("Failed to prepare update plan.", e); + } + return new QueryParserResultDml( stmt, prepared.getParameters().size(), mvccEnabled, - streamTbl + streamTbl, + plan ); } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java index cf48d8f..2244a3f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.h2; +import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.jetbrains.annotations.Nullable; @@ -37,6 +38,9 @@ public class QueryParserResultDml { /** Streamer table. */ private final GridH2Table streamTbl; + /** Update plan. */ + private final UpdatePlan plan; + /** * Constructor. * @@ -44,17 +48,20 @@ public class QueryParserResultDml { * @param paramsCnt Number of parameters. * @param mvccEnabled Whether MVCC is enabled. * @param streamTbl Streamer table. + * @param plan Update plan. */ public QueryParserResultDml( GridSqlStatement stmt, int paramsCnt, boolean mvccEnabled, - @Nullable GridH2Table streamTbl + @Nullable GridH2Table streamTbl, + UpdatePlan plan ) { this.stmt = stmt; this.paramsCnt = paramsCnt; this.mvccEnabled = mvccEnabled; this.streamTbl = streamTbl; + this.plan = plan; } /** @@ -91,4 +98,11 @@ public class QueryParserResultDml { public int parametersCount() { return paramsCnt; } + + /** + * @return Update plan. + */ + public UpdatePlan plan() { + return plan; + } }