This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-17765-3 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 264a15847f2eaaa548ac4be8b6c10486627a8d35 Author: amashenkov <[email protected]> AuthorDate: Tue Jun 20 15:04:52 2023 +0300 Indirect cache for query plans. --- .../internal/sql/engine/SqlQueryProcessor.java | 237 ++++++++------------- .../sql/engine/prepare/PrepareService.java | 4 +- .../sql/engine/prepare/PrepareServiceImpl.java | 157 ++++++++++++-- .../sql/engine/prepare/QueryPlanFactory.java | 32 --- .../sql/engine/sql/IgniteSqlCreateTable.java | 7 + .../sql/engine/exec/ExecutionServiceImplTest.java | 10 +- .../internal/sql/engine/framework/TestNode.java | 15 +- .../sql/engine/planner/PrepareServiceSelfTest.java | 124 ++++++----- 8 files changed, 310 insertions(+), 276 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 580d6ee12a..b706ba84da 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -17,32 +17,28 @@ package org.apache.ignite.internal.sql.engine; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.lang.ErrorGroups.Sql.OPERATION_INTERRUPTED_ERR; -import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_EXPIRED_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_NOT_FOUND_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR; -import static org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_SQL_OPERATION_KIND_ERR; import static org.apache.ignite.lang.IgniteStringFormatter.format; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.util.Pair; import org.apache.ignite.internal.catalog.CatalogManager; @@ -67,7 +63,6 @@ import org.apache.ignite.internal.sql.engine.exec.LifecycleAware; import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl; import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor; import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; -import org.apache.ignite.internal.sql.engine.exec.QueryValidationException; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper; import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl; import org.apache.ignite.internal.sql.engine.prepare.PrepareService; @@ -81,12 +76,7 @@ import org.apache.ignite.internal.sql.engine.session.SessionId; import org.apache.ignite.internal.sql.engine.session.SessionInfo; import org.apache.ignite.internal.sql.engine.session.SessionManager; import org.apache.ignite.internal.sql.engine.session.SessionProperty; -import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser; -import org.apache.ignite.internal.sql.engine.sql.ParseResult; -import org.apache.ignite.internal.sql.engine.sql.StatementParseResult; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; -import org.apache.ignite.internal.sql.engine.util.Commons; -import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.event.TableEvent; @@ -247,7 +237,8 @@ public class SqlQueryProcessor implements QueryProcessor { busyLock ); - sqlSchemaManager.registerListener(prepareSvc::resetCache); + //TODO IGNITE-19497 No need cache clear as cache already aware of catalog versioning. + sqlSchemaManager.registerListener(prepareSvc::invalidateCachedPlans); this.prepareSvc = prepareSvc; @@ -383,121 +374,116 @@ public class SqlQueryProcessor implements QueryProcessor { private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0( SessionId sessionId, - QueryContext context, + QueryContext queryContext, String sql, Object... params ) { Session session = sessionManager.session(sessionId); if (session == null) { - return CompletableFuture.failedFuture( - new SqlException(SESSION_NOT_FOUND_ERR, format("Session not found [{}]", sessionId))); + return failedFuture(new SqlException(SESSION_NOT_FOUND_ERR, format("Session not found [{}]", sessionId))); } - String schemaName = session.properties().get(QueryProperty.DEFAULT_SCHEMA); - - InternalTransaction outerTx = context.unwrap(InternalTransaction.class); - QueryCancel queryCancel = new QueryCancel(); - AsyncCloseable closeableResource = () -> CompletableFuture.runAsync( - queryCancel::cancel, - taskExecutor - ); - - queryCancel.add(() -> session.unregisterResource(closeableResource)); - try { - session.registerResource(closeableResource); + registerQueryCancel(session, queryCancel); } catch (IllegalStateException ex) { - return CompletableFuture.failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR, + return failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR, format("Session has been expired [{}]", session.sessionId()), ex)); } - CompletableFuture<Void> start = new CompletableFuture<>(); + String schemaName = session.properties().get(QueryProperty.DEFAULT_SCHEMA); - AtomicReference<InternalTransaction> tx = new AtomicReference<>(); + InternalTransaction outerTx = queryContext.unwrap(InternalTransaction.class); + boolean implicitTx = outerTx == null; - CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start - .thenApply(v -> { - StatementParseResult parseResult = IgniteSqlParser.parse(sql, StatementParseResult.MODE); - SqlNode sqlNode = parseResult.statement(); + // TODO: IGNITE-19497 Use timestamp to get schema. + SchemaPlus schema = sqlSchemaManager.schema(schemaName); - validateParsedStatement(context, outerTx, parseResult, sqlNode, params); + if (schema == null) { + return failedFuture(new SchemaNotFoundException(schemaName)); + } - return sqlNode; - }) - .thenCompose(sqlNode -> { - boolean rwOp = dataModificationOp(sqlNode); + BaseQueryContext plannerContext = BaseQueryContext.builder() + .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()) + .logger(LOG) + .cancel(queryCancel) + .parameters(params) + .plannerTimeout(PLANNER_TIMEOUT) + .build(); + + return prepareSvc.prepareAsync(sql, queryContext, plannerContext) + .thenComposeAsync(plan -> { + // Ensure plan can be executed. + if (SqlQueryType.DDL == plan.type() && outerTx != null) { + return failedFuture(new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support transactions.")); + } - boolean implicitTxRequired = outerTx == null; + InternalTransaction tx = implicitTx ? txManager.begin(plan.type() != DML) : outerTx; - tx.set(implicitTxRequired ? txManager.begin(!rwOp) : outerTx); + try { + // TODO IGNITE-19497 Ensure implicit tx uses the same schema, which query was planned for, otherwise retry planning. + var dataCursor = executionSrvc.executePlan(tx, plan, plannerContext); - SchemaPlus schema = sqlSchemaManager.schema(schemaName); + SqlQueryType queryType = plan.type(); + assert queryType != null : "Expected a full plan but got a fragment: " + plan; - if (schema == null) { - return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName)); - } + AsyncSqlCursorImpl<List<Object>> cursor = new AsyncSqlCursorImpl<>( + queryType, + plan.metadata(), + implicitTx ? tx : null, + new AsyncCursor<>() { + @Override + public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { + session.touch(); - BaseQueryContext ctx = BaseQueryContext.builder() - .frameworkConfig( - Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) - .defaultSchema(schema) - .build() - ) - .logger(LOG) - .cancel(queryCancel) - .parameters(params) - .plannerTimeout(PLANNER_TIMEOUT) - .build(); - - return prepareSvc.prepareAsync(sqlNode, ctx) - .thenApply(plan -> { - var dataCursor = executionSrvc.executePlan(tx.get(), plan, ctx); - - SqlQueryType queryType = plan.type(); - assert queryType != null : "Expected a full plan but got a fragment: " + plan; - - return new AsyncSqlCursorImpl<>( - queryType, - plan.metadata(), - implicitTxRequired ? tx.get() : null, - new AsyncCursor<List<Object>>() { - @Override - public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { - session.touch(); - - return dataCursor.requestNextAsync(rows); - } - - @Override - public CompletableFuture<Void> closeAsync() { - session.touch(); - - return dataCursor.closeAsync(); - } - } - ); - }); - }); + return dataCursor.requestNextAsync(rows); + } - stage.whenComplete((cur, ex) -> { - if (ex instanceof CancellationException) { - queryCancel.cancel(); - } + @Override + public CompletableFuture<Void> closeAsync() { + session.touch(); - if (ex != null && outerTx == null) { - InternalTransaction tx0 = tx.get(); - if (tx0 != null) { - tx0.rollback(); - } - } - }); + return dataCursor.closeAsync(); + } + } + ); - start.completeAsync(() -> null, taskExecutor); + return CompletableFuture.completedFuture(cursor); + } catch (Throwable th) { + handleQueryException(th, queryCancel, implicitTx ? tx : null); - return stage; + return failedFuture(th); + } + }, taskExecutor); + } + + /** + * Registers QueryCancel object in session. + * + * @throws IllegalStateException If session is invalid + */ + private void registerQueryCancel(Session session, QueryCancel queryCancel) throws IllegalStateException { + AsyncCloseable closeableResource = () -> CompletableFuture.runAsync( + queryCancel::cancel, + taskExecutor + ); + + queryCancel.add(() -> session.unregisterResource(closeableResource)); + session.registerResource(closeableResource); + } + + private static void handleQueryException(Throwable th, QueryCancel queryCancel, @Nullable InternalTransaction tx) { + if (th instanceof CancellationException) { + queryCancel.cancel(); + } + + if (th != null) { + if (tx != null) { + tx.rollback(); + } + } } private abstract static class AbstractTableEventListener implements EventListener<TableEventParameters> { @@ -606,55 +592,4 @@ public class SqlQueryProcessor implements QueryProcessor { .thenApply(v -> false); } } - - /** Returns {@code true} if this is data modification operation. */ - private static boolean dataModificationOp(SqlNode sqlNode) { - return SqlKind.DML.contains(sqlNode.getKind()); - } - - /** Performs additional validation of a parsed statement. **/ - public static void validateParsedStatement( - QueryContext context, - InternalTransaction outerTx, - ParseResult parseResult, - SqlNode node, - Object[] params - ) { - Set<SqlQueryType> allowedTypes = context.allowedQueryTypes(); - SqlQueryType queryType = Commons.getQueryType(node); - - if (queryType == null) { - throw new IgniteInternalException(UNSUPPORTED_SQL_OPERATION_KIND_ERR, "Unsupported operation [" - + "sqlNodeKind=" + node.getKind() + "; " - + "querySql=\"" + node + "\"]"); - } - - if (!allowedTypes.contains(queryType)) { - String message = format("Invalid SQL statement type in the batch. Expected {} but got {}.", allowedTypes, queryType); - - throw new QueryValidationException(message); - } - - if (SqlQueryType.DDL == queryType && outerTx != null) { - throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support transactions."); - } - - if (parseResult.dynamicParamsCount() != params.length) { - String message = format( - "Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).", - params.length, parseResult.dynamicParamsCount() - ); - - throw new SqlException(QUERY_INVALID_ERR, message); - } - - for (Object param : params) { - if (!TypeUtils.supportParamInstance(param)) { - String message = format( - "Unsupported dynamic parameter defined. Provided '{}' is not supported.", param.getClass().getName()); - - throw new SqlException(QUERY_INVALID_ERR, message); - } - } - } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java index 7a42727412..71657400bb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.sql.engine.prepare; import java.util.concurrent.CompletableFuture; -import org.apache.calcite.sql.SqlNode; +import org.apache.ignite.internal.sql.engine.QueryContext; import org.apache.ignite.internal.sql.engine.exec.LifecycleAware; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; @@ -29,5 +29,5 @@ public interface PrepareService extends LifecycleAware { /** * Prepare query plan. */ - CompletableFuture<QueryPlan> prepareAsync(SqlNode sqlNode, BaseQueryContext ctx); + CompletableFuture<QueryPlan> prepareAsync(String query, QueryContext queryContext, BaseQueryContext ctx); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java index 6bebcbf8f7..0d3aeac2c3 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java @@ -17,14 +17,19 @@ package org.apache.ignite.internal.sql.engine.prepare; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_SQL_OPERATION_KIND_ERR; +import static org.apache.ignite.lang.IgniteStringFormatter.format; import com.github.benmanes.caffeine.cache.Caffeine; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; @@ -38,15 +43,21 @@ import org.apache.calcite.runtime.CalciteContextException; import org.apache.calcite.sql.SqlDdl; import org.apache.calcite.sql.SqlExplain; import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.sql.api.ColumnMetadataImpl; import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl; +import org.apache.ignite.internal.sql.engine.QueryContext; import org.apache.ignite.internal.sql.engine.SqlQueryType; +import org.apache.ignite.internal.sql.engine.exec.QueryValidationException; import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; +import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser; +import org.apache.ignite.internal.sql.engine.sql.ParseResult; +import org.apache.ignite.internal.sql.engine.sql.StatementParseResult; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; @@ -55,6 +66,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.sql.ColumnMetadata; import org.apache.ignite.sql.ResultSetMetadata; +import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -72,6 +84,7 @@ public class PrepareServiceImpl implements PrepareService { private final DdlSqlToCommandConverter ddlConverter; private final ConcurrentMap<CacheKey, CompletableFuture<QueryPlan>> cache; + private final ConcurrentMap<CacheKey, String> parseCache; private final String nodeName; @@ -123,6 +136,10 @@ public class PrepareServiceImpl implements PrepareService { .maximumSize(cacheSize) .<CacheKey, CompletableFuture<QueryPlan>>build() .asMap(); + parseCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .<CacheKey, String>build() + .asMap(); } /** {@inheritDoc} */ @@ -146,8 +163,59 @@ public class PrepareServiceImpl implements PrepareService { planningPool.shutdownNow(); } + /** Drop cached query plans. */ + public void invalidateCachedPlans() { + cache.clear(); + } + + @TestOnly + public void invalidateParserCache() { + parseCache.clear(); + } + + @TestOnly + public int planCacheSize() { + return cache.size(); + } + + @TestOnly + public int parseCacheSize() { + return parseCache.size(); + } + /** {@inheritDoc} */ @Override + public CompletableFuture<QueryPlan> prepareAsync(String query, QueryContext queryContext, BaseQueryContext ctx) { + CacheKey cacheKey = createCacheKey(query, ctx); + + String normalizedQuery = parseCache.get(cacheKey); + + if (normalizedQuery == null) { + SqlNode sqlNode; + try { + sqlNode = parse(query, queryContext, ctx); + } catch (Exception ex) { + return failedFuture(ex); + } + + if (skipCache(sqlNode)) { + return prepareAsync(sqlNode, ctx); + } + + parseCache.putIfAbsent(cacheKey, sqlNode.toString()); + + return cache.computeIfAbsent(createCacheKey(sqlNode.toString(), ctx), k -> prepareAsync(sqlNode, ctx)) + .thenApply(QueryPlan::copy); + } + + return cache.computeIfAbsent(createCacheKey(normalizedQuery, ctx), k -> + supplyAsync(() -> parse(query, queryContext, ctx)).thenCompose(sqlNode -> prepareAsync(sqlNode, ctx))) + .thenApply(QueryPlan::copy); + } + + /** + * Prepares and caches normalized query plan. + */ public CompletableFuture<QueryPlan> prepareAsync(SqlNode sqlNode, BaseQueryContext ctx) { try { assert single(sqlNode); @@ -186,20 +254,10 @@ public class PrepareServiceImpl implements PrepareService { } } - /** Drop cached query plans. */ - public void resetCache() { - cache.clear(); - } - - @TestOnly - public Map<CacheKey, CompletableFuture<QueryPlan>> cache() { - return cache; - } - private CompletableFuture<QueryPlan> prepareDdl(SqlNode sqlNode, PlanningContext ctx) { assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" : sqlNode.getClass().getName(); - return CompletableFuture.completedFuture(new DdlPlan(ddlConverter.convert((SqlDdl) sqlNode, ctx))); + return completedFuture(new DdlPlan(ddlConverter.convert((SqlDdl) sqlNode, ctx))); } private CompletableFuture<QueryPlan> prepareExplain(SqlNode sqlNode, PlanningContext ctx) { @@ -213,7 +271,7 @@ public class PrepareServiceImpl implements PrepareService { return cachedPlan.thenApply(plan -> new ExplainPlan(plan.explain(SqlExplainLevel.ALL_ATTRIBUTES))); } - return CompletableFuture.supplyAsync(() -> { + return supplyAsync(() -> { IgnitePlanner planner = ctx.planner(); // Validate @@ -233,9 +291,7 @@ public class PrepareServiceImpl implements PrepareService { } private CompletableFuture<QueryPlan> prepareQuery(SqlNode sqlNode, PlanningContext ctx) { - CacheKey cacheKey = createCacheKey(sqlNode.toString(), ctx.unwrap(BaseQueryContext.class)); - - return cache.computeIfAbsent(cacheKey, k -> CompletableFuture.supplyAsync(() -> { + return supplyAsync(() -> { IgnitePlanner planner = ctx.planner(); // Validate @@ -252,13 +308,11 @@ public class PrepareServiceImpl implements PrepareService { ResultSetMetadata metadata = resultSetMetadata(validated.dataType(), validated.origins()); return new MultiStepQueryPlan(template, metadata, igniteRel); - }, planningPool)).thenApply(QueryPlan::copy); + }, planningPool); } private CompletableFuture<QueryPlan> prepareDml(SqlNode sqlNode, PlanningContext ctx) { - CacheKey cacheKey = createCacheKey(sqlNode.toString(), ctx.unwrap(BaseQueryContext.class)); - - return cache.computeIfAbsent(cacheKey, k -> CompletableFuture.supplyAsync(() -> { + return supplyAsync(() -> { IgnitePlanner planner = ctx.planner(); // Validate @@ -273,7 +327,7 @@ public class PrepareServiceImpl implements PrepareService { QueryTemplate template = new QueryTemplate(fragments); return new MultiStepDmlPlan(template, igniteRel); - }, planningPool)).thenApply(QueryPlan::copy); + }, planningPool); } private static CacheKey createCacheKey(String query, BaseQueryContext ctx) { @@ -309,4 +363,67 @@ public class PrepareServiceImpl implements PrepareService { } ); } + + private static SqlNode parse(String query, QueryContext queryContext, BaseQueryContext ctx) { + StatementParseResult parseResult = IgniteSqlParser.parse(query, StatementParseResult.MODE); + + SqlNode sqlNode = parseResult.statement(); + + validateParsedStatement(queryContext, parseResult, sqlNode, ctx.parameters()); + + return sqlNode; + } + + private static boolean skipCache(SqlNode sqlNode) { + SqlKind kind = sqlNode.getKind(); + + switch (kind) { + case SELECT: + case INSERT: + return false; + default: + return true; + } + } + + /** Performs additional validation of a parsed statement. **/ + public static void validateParsedStatement( + QueryContext context, + ParseResult parseResult, + SqlNode node, + Object[] params + ) { + Set<SqlQueryType> allowedTypes = context.allowedQueryTypes(); + SqlQueryType queryType = Commons.getQueryType(node); + + if (queryType == null) { + throw new IgniteInternalException(UNSUPPORTED_SQL_OPERATION_KIND_ERR, "Unsupported operation [" + + "sqlNodeKind=" + node.getKind() + "; " + + "querySql=\"" + node + "\"]"); + } + + if (!allowedTypes.contains(queryType)) { + String message = format("Invalid SQL statement type in the batch. Expected {} but got {}.", allowedTypes, queryType); + + throw new QueryValidationException(message); + } + + if (parseResult.dynamicParamsCount() != params.length) { + String message = format( + "Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).", + params.length, parseResult.dynamicParamsCount() + ); + + throw new SqlException(QUERY_INVALID_ERR, message); + } + + for (Object param : params) { + if (!TypeUtils.supportParamInstance(param)) { + String message = format( + "Unsupported dynamic parameter defined. Provided '{}' is not supported.", param.getClass().getName()); + + throw new SqlException(QUERY_INVALID_ERR, message); + } + } + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java deleted file mode 100644 index 2d5ecb82af..0000000000 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.sql.engine.prepare; - -/** - * QueryPlan factory interface. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 - */ -public interface QueryPlanFactory { - /** - * Create plans from context. - * - * @param ctx Planning context. - * @return Query plan. - */ - QueryPlan create(PlanningContext ctx); -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java index a907b0b90b..dd7336818c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java @@ -69,6 +69,13 @@ public class IgniteSqlCreateTable extends SqlCreate { return ImmutableNullableList.of(name, columnList, createOptionList); } + /** {@inheritDoc} */ + @Override + public SqlNode clone(SqlParserPos pos) { + //TODO: add clone for all DDL nodes. + return new IgniteSqlCreateTable(pos, ifNotExists, name, columnList, colocationColumns, createOptionList); + } + /** {@inheritDoc} */ @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index 0ef105e7c5..bce3a40968 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -66,6 +66,8 @@ import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.AsyncCursor; import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult; import org.apache.ignite.internal.sql.engine.QueryCancel; +import org.apache.ignite.internal.sql.engine.QueryContext; +import org.apache.ignite.internal.sql.engine.SqlQueryType; import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; import org.apache.ignite.internal.sql.engine.exec.rel.Inbox; @@ -93,8 +95,6 @@ import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; -import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser; -import org.apache.ignite.internal.sql.engine.sql.StatementParseResult; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; @@ -538,11 +538,7 @@ public class ExecutionServiceImplTest { } private QueryPlan prepare(String query, BaseQueryContext ctx) { - StatementParseResult parseResult = IgniteSqlParser.parse(query, StatementParseResult.MODE); - - assertEquals(ctx.parameters().length, parseResult.dynamicParamsCount(), "Invalid number of dynamic parameters"); - - return await(prepareService.prepareAsync(parseResult.statement(), ctx)); + return await(prepareService.prepareAsync(query, QueryContext.create(SqlQueryType.ALL), ctx)); } static class TestCluster { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java index 17d095634e..475ec37fcb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -22,7 +22,6 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import java.util.ArrayList; @@ -35,6 +34,8 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.sql.engine.AsyncCursor; import org.apache.ignite.internal.sql.engine.QueryCancel; +import org.apache.ignite.internal.sql.engine.QueryContext; +import org.apache.ignite.internal.sql.engine.SqlQueryType; import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.exec.ExchangeService; import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl; @@ -58,14 +59,11 @@ import org.apache.ignite.internal.sql.engine.message.MessageService; import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl; import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl; import org.apache.ignite.internal.sql.engine.prepare.PlannerHelper; -import org.apache.ignite.internal.sql.engine.prepare.PrepareService; import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl; import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter; import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; -import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser; -import org.apache.ignite.internal.sql.engine.sql.StatementParseResult; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -82,7 +80,7 @@ import org.apache.ignite.network.TopologyService; public class TestNode implements LifecycleAware { private final String nodeName; private final SchemaPlus schema; - private final PrepareService prepareService; + private final PrepareServiceImpl prepareService; private final ExecutionService executionService; private final List<LifecycleAware> services = new ArrayList<>(); @@ -188,12 +186,7 @@ public class TestNode implements LifecycleAware { * @return A plan to execute. */ public QueryPlan prepare(String query) { - StatementParseResult parseResult = IgniteSqlParser.parse(query, StatementParseResult.MODE); - BaseQueryContext ctx = createContext(); - - assertEquals(ctx.parameters().length, parseResult.dynamicParamsCount(), "Invalid number of dynamic parameters"); - - return await(prepareService.prepareAsync(parseResult.statement(), ctx)); + return await(prepareService.prepareAsync(query, QueryContext.create(SqlQueryType.ALL), createContext())); } /** diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java index 933943c003..4834220ae1 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java @@ -17,12 +17,12 @@ package org.apache.ignite.internal.sql.engine.planner; -import static org.apache.ignite.internal.sql.engine.SqlQueryProcessor.validateParsedStatement; +import static org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.validateParsedStatement; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.List; import java.util.Map; @@ -43,7 +43,7 @@ import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser; import org.apache.ignite.internal.sql.engine.sql.StatementParseResult; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; -import org.hamcrest.Matchers; +import org.apache.ignite.sql.SqlException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -97,38 +97,34 @@ public class PrepareServiceSelfTest extends AbstractPlannerTest { @ParameterizedTest(name = "{0}") @MethodSource("queries") - public void normalizedQueryCache(String query1, String query2, Object[] params) { - // Parse query and check nothing cached. - SqlNode query1Ast = parse(query1, params); - assertThat(service.cache(), Matchers.anEmptyMap()); - Mockito.verifyNoInteractions(queryPlannerSpy); - - // Preparing AST caches a normalized query plan. - assertThat(service.prepareAsync(query1Ast, createContext(params)), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + public void queryCache(String query1, String query2, Object[] params) { + // Preparing query caches plan for both query and normalized query. + assertThat(service.prepareAsync(query1, queryCtx, createContext(params)), willBe(notNullValue())); + assertEquals(1, service.parseCacheSize()); + assertEquals(1, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), Mockito.any()); - // Preparing same AST returns plan from cache. - query1Ast = parse(query1, params); - assertThat(service.prepareAsync(query1Ast, createContext(params)), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + // Preparing same query returns plan from cache. + assertThat(service.prepareAsync(query1, queryCtx, createContext(params)), willBe(notNullValue())); + assertEquals(1, service.parseCacheSize()); + assertEquals(1, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), Mockito.any()); - // Prepare similar query returns plan from cache. - SqlNode query2Ast = parse(query2, params); - - assertThat(service.prepareAsync(query2Ast, createContext(params)), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + // Preparing similar query returns cached plan and also cache the plan for the query. + assertThat(service.prepareAsync(query2, queryCtx, createContext(params)), willBe(notNullValue())); + assertEquals(2, service.parseCacheSize()); + assertEquals(1, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), Mockito.any()); } + private SqlNode parse(String sql, Object... params) { // Parse query StatementParseResult parseResult = IgniteSqlParser.parse(sql, StatementParseResult.MODE); SqlNode sqlNode = parseResult.statement(); // Validate statement - validateParsedStatement(queryCtx, null, parseResult, sqlNode, params); + validateParsedStatement(queryCtx, parseResult, sqlNode, params); return sqlNode; } @@ -137,8 +133,8 @@ public class PrepareServiceSelfTest extends AbstractPlannerTest { public void ddlBypassCache() { String query = "CREATE TABLE tbl0(id INTEGER PRIMARY KEY, val VARCHAR);"; - assertThat(service.prepareAsync(parse(query), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(0)); + assertThat(service.prepareAsync(query, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(0, service.planCacheSize()); Mockito.verify(ddlPlannerSpy, Mockito.times(1)).convert(Mockito.any(), Mockito.any()); // DDL goes a separate flow via ddl converter. Mockito.verifyNoInteractions(queryPlannerSpy); @@ -146,11 +142,16 @@ public class PrepareServiceSelfTest extends AbstractPlannerTest { @Test public void errors() { - String query = "SELECT * FROM tbl2 WHERE id > 0"; // Invalid table name. + String query = "invalid query"; // Invalid table name. + + assertThat(service.prepareAsync(query, queryCtx, createContext()), willThrow(SqlException.class)); + assertEquals(0, service.planCacheSize()); + Mockito.verifyNoInteractions(queryPlannerSpy); + + query = "SELECT * FROM tbl2 WHERE id > 0"; // Invalid table name. - assertThat(service.prepareAsync(parse(query), createContext()), willThrow(CalciteContextException.class)); - assertThat(service.cache(), Matchers.aMapWithSize(1)); - assertTrue(service.cache().values().iterator().next().isCompletedExceptionally()); + assertThat(service.prepareAsync(query, queryCtx, createContext()), willThrow(CalciteContextException.class)); + assertEquals(1, service.planCacheSize()); Mockito.verifyNoInteractions(queryPlannerSpy); } @@ -162,23 +163,23 @@ public class PrepareServiceSelfTest extends AbstractPlannerTest { String query = "SELECT * FROM tbl WHERE id > 0"; - assertThat(service.prepareAsync(parse(query), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(0)); + assertThat(service.prepareAsync(query, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(0, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), Mockito.any()); - assertThat(service.prepareAsync(parse(query), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(0)); + assertThat(service.prepareAsync(query, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(0, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), Mockito.any()); } @Test - public void normalizedQueryCaching() { + public void normalizedQuery() { SqlNode queryAst = parse("SELECT NULL"); String normalizedQuery = queryAst.toString(); - assertThat(service.prepareAsync(parse(normalizedQuery), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + assertThat(service.prepareAsync(normalizedQuery, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(1, service.planCacheSize()); } @Test @@ -188,42 +189,59 @@ public class PrepareServiceSelfTest extends AbstractPlannerTest { String explainQuery2 = "EXPLAIN PLAN FOR SELECT * /* comment */ FROM tbl WHERE id > 0"; // Ensure explain don't cache anything. - assertThat(service.prepareAsync(parse(explainQuery), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.anEmptyMap()); + assertThat(service.prepareAsync(explainQuery, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(0, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), Mockito.any()); // Cache query plan. - assertThat(service.prepareAsync(parse(query), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + assertThat(service.prepareAsync(query, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(1, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), Mockito.any()); // Check explain gets plan from cache. - assertThat(service.prepareAsync(parse(explainQuery), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + assertThat(service.prepareAsync(explainQuery, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(1, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), Mockito.any()); // Check explain gets plan from cache for similar query. - assertThat(service.prepareAsync(parse(explainQuery2), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + assertThat(service.prepareAsync(explainQuery2, queryCtx, createContext()), willBe(notNullValue())); + assertEquals(1, service.planCacheSize()); Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), Mockito.any()); } @Test public void resetCache() { - assertThat(service.cache(), Matchers.anEmptyMap()); + assertEquals(0, service.planCacheSize()); + + // Fill caches. + assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0", queryCtx, createContext()), willBe(notNullValue())); + assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 1", queryCtx, createContext()), willBe(notNullValue())); + + assertEquals(2, service.parseCacheSize()); + assertEquals(2, service.planCacheSize()); + Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), Mockito.any()); + + // Drop cached plans. + service.invalidateCachedPlans(); + + assertEquals(2, service.parseCacheSize()); + assertEquals(0, service.planCacheSize()); + + assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0", queryCtx, createContext()), willBe(notNullValue())); + assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 1", queryCtx, createContext()), willBe(notNullValue())); - assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 0"), createContext()), willBe(notNullValue())); - assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 1"), createContext()), willBe(notNullValue())); - assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 2"), createContext()), willBe(notNullValue())); - assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 3"), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(4)); + assertEquals(2, service.parseCacheSize()); + assertEquals(2, service.planCacheSize()); + Mockito.verify(queryPlannerSpy, Mockito.times(4)).apply(Mockito.any(), Mockito.any()); - service.resetCache(); + // Invalidate parser cache and check reusing cached plans. + service.invalidateParserCache(); - assertThat(service.cache(), Matchers.anEmptyMap()); + assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0", queryCtx, createContext()), willBe(notNullValue())); - assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 0"), createContext()), willBe(notNullValue())); - assertThat(service.cache(), Matchers.aMapWithSize(1)); + assertEquals(1, service.parseCacheSize()); + assertEquals(2, service.planCacheSize()); + Mockito.verify(queryPlannerSpy, Mockito.times(4)).apply(Mockito.any(), Mockito.any()); } private BaseQueryContext createContext(Object... params) {
