This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-17765-2
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-17765-2 by this push:
new 00180150ee Cache query plan for original query string.
00180150ee is described below
commit 00180150eecd5c4709e35517d0f04a97490d31fe
Author: amashenkov <[email protected]>
AuthorDate: Tue Jun 20 00:57:19 2023 +0300
Cache query plan for original query string.
---
.../internal/sql/engine/SqlQueryProcessor.java | 235 ++++++++-------------
.../sql/engine/prepare/PrepareService.java | 4 +-
.../sql/engine/prepare/PrepareServiceImpl.java | 151 +++++++++++--
.../sql/engine/prepare/QueryPlanFactory.java | 32 ---
.../sql/engine/exec/ExecutionServiceImplTest.java | 10 +-
.../internal/sql/engine/framework/TestNode.java | 17 +-
.../sql/engine/planner/PrepareServiceSelfTest.java | 81 +++----
7 files changed, 270 insertions(+), 260 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 580d6ee12a..9676cddf48 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -17,32 +17,28 @@
package org.apache.ignite.internal.sql.engine;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML;
import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.lang.ErrorGroups.Sql.OPERATION_INTERRUPTED_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_EXPIRED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_NOT_FOUND_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
-import static
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_SQL_OPERATION_KIND_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.catalog.CatalogManager;
@@ -67,7 +63,6 @@ import
org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
-import org.apache.ignite.internal.sql.engine.exec.QueryValidationException;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
@@ -81,12 +76,7 @@ import
org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionInfo;
import org.apache.ignite.internal.sql.engine.session.SessionManager;
import org.apache.ignite.internal.sql.engine.session.SessionProperty;
-import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
-import org.apache.ignite.internal.sql.engine.sql.ParseResult;
-import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
@@ -247,6 +237,7 @@ public class SqlQueryProcessor implements QueryProcessor {
busyLock
);
+ //TODO IGNITE-19497 No need cache clear as cache already aware of
catalog versioning.
sqlSchemaManager.registerListener(prepareSvc::resetCache);
this.prepareSvc = prepareSvc;
@@ -383,121 +374,116 @@ public class SqlQueryProcessor implements
QueryProcessor {
private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
SessionId sessionId,
- QueryContext context,
+ QueryContext queryContext,
String sql,
Object... params
) {
Session session = sessionManager.session(sessionId);
if (session == null) {
- return CompletableFuture.failedFuture(
- new SqlException(SESSION_NOT_FOUND_ERR, format("Session
not found [{}]", sessionId)));
+ return failedFuture(new SqlException(SESSION_NOT_FOUND_ERR,
format("Session not found [{}]", sessionId)));
}
- String schemaName =
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
-
- InternalTransaction outerTx =
context.unwrap(InternalTransaction.class);
-
QueryCancel queryCancel = new QueryCancel();
- AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
- queryCancel::cancel,
- taskExecutor
- );
-
- queryCancel.add(() -> session.unregisterResource(closeableResource));
-
try {
- session.registerResource(closeableResource);
+ registerQueryCancel(session, queryCancel);
} catch (IllegalStateException ex) {
- return CompletableFuture.failedFuture(new
IgniteInternalException(SESSION_EXPIRED_ERR,
+ return failedFuture(new
IgniteInternalException(SESSION_EXPIRED_ERR,
format("Session has been expired [{}]",
session.sessionId()), ex));
}
- CompletableFuture<Void> start = new CompletableFuture<>();
+ String schemaName =
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
- AtomicReference<InternalTransaction> tx = new AtomicReference<>();
+ InternalTransaction outerTx =
queryContext.unwrap(InternalTransaction.class);
+ boolean implicitTx = outerTx == null;
- CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
- .thenApply(v -> {
- StatementParseResult parseResult =
IgniteSqlParser.parse(sql, StatementParseResult.MODE);
- SqlNode sqlNode = parseResult.statement();
+ // TODO: IGNITE-19497 Use timestamp to get schema.
+ SchemaPlus schema = sqlSchemaManager.schema(schemaName);
- validateParsedStatement(context, outerTx, parseResult,
sqlNode, params);
+ if (schema == null) {
+ return failedFuture(new SchemaNotFoundException(schemaName));
+ }
- return sqlNode;
- })
- .thenCompose(sqlNode -> {
- boolean rwOp = dataModificationOp(sqlNode);
+ BaseQueryContext plannerContext = BaseQueryContext.builder()
+
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+ .logger(LOG)
+ .cancel(queryCancel)
+ .parameters(params)
+ .plannerTimeout(PLANNER_TIMEOUT)
+ .build();
+
+ return prepareSvc.prepareAsync(sql, queryContext, plannerContext)
+ .thenComposeAsync(plan -> {
+ // Ensure plan can be executed.
+ if (SqlQueryType.DDL == plan.type() && outerTx != null) {
+ return failedFuture(new
SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support
transactions."));
+ }
- boolean implicitTxRequired = outerTx == null;
+ InternalTransaction tx = implicitTx ?
txManager.begin(plan.type() != DML) : outerTx;
- tx.set(implicitTxRequired ? txManager.begin(!rwOp) :
outerTx);
+ try {
+ // TODO IGNITE-19497 Ensure implicit tx uses the same
schema, which query was planned for, otherwise retry planning.
+ var dataCursor = executionSrvc.executePlan(tx, plan,
plannerContext);
- SchemaPlus schema = sqlSchemaManager.schema(schemaName);
+ SqlQueryType queryType = plan.type();
+ assert queryType != null : "Expected a full plan but
got a fragment: " + plan;
- if (schema == null) {
- return CompletableFuture.failedFuture(new
SchemaNotFoundException(schemaName));
- }
+ AsyncSqlCursorImpl<List<Object>> cursor = new
AsyncSqlCursorImpl<>(
+ queryType,
+ plan.metadata(),
+ implicitTx ? tx : null,
+ new AsyncCursor<>() {
+ @Override
+ public
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+ session.touch();
- BaseQueryContext ctx = BaseQueryContext.builder()
- .frameworkConfig(
-
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .build()
- )
- .logger(LOG)
- .cancel(queryCancel)
- .parameters(params)
- .plannerTimeout(PLANNER_TIMEOUT)
- .build();
-
- return prepareSvc.prepareAsync(sqlNode, ctx)
- .thenApply(plan -> {
- var dataCursor =
executionSrvc.executePlan(tx.get(), plan, ctx);
-
- SqlQueryType queryType = plan.type();
- assert queryType != null : "Expected a full
plan but got a fragment: " + plan;
-
- return new AsyncSqlCursorImpl<>(
- queryType,
- plan.metadata(),
- implicitTxRequired ? tx.get() : null,
- new AsyncCursor<List<Object>>() {
- @Override
- public
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
- session.touch();
-
- return
dataCursor.requestNextAsync(rows);
- }
-
- @Override
- public CompletableFuture<Void>
closeAsync() {
- session.touch();
-
- return dataCursor.closeAsync();
- }
- }
- );
- });
- });
+ return
dataCursor.requestNextAsync(rows);
+ }
- stage.whenComplete((cur, ex) -> {
- if (ex instanceof CancellationException) {
- queryCancel.cancel();
- }
+ @Override
+ public CompletableFuture<Void>
closeAsync() {
+ session.touch();
- if (ex != null && outerTx == null) {
- InternalTransaction tx0 = tx.get();
- if (tx0 != null) {
- tx0.rollback();
- }
- }
- });
+ return dataCursor.closeAsync();
+ }
+ }
+ );
- start.completeAsync(() -> null, taskExecutor);
+ return CompletableFuture.completedFuture(cursor);
+ } catch (Throwable th) {
+ handleQueryException(th, queryCancel, implicitTx ? tx
: null);
- return stage;
+ return failedFuture(th);
+ }
+ }, taskExecutor);
+ }
+
+ /**
+ * Registers QueryCancel object in session.
+ *
+ * @throws IllegalStateException If session is invalid
+ */
+ private void registerQueryCancel(Session session, QueryCancel queryCancel)
throws IllegalStateException {
+ AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
+ queryCancel::cancel,
+ taskExecutor
+ );
+
+ queryCancel.add(() -> session.unregisterResource(closeableResource));
+ session.registerResource(closeableResource);
+ }
+
+ private static void handleQueryException(Throwable th, QueryCancel
queryCancel, @Nullable InternalTransaction tx) {
+ if (th instanceof CancellationException) {
+ queryCancel.cancel();
+ }
+
+ if (th != null) {
+ if (tx != null) {
+ tx.rollback();
+ }
+ }
}
private abstract static class AbstractTableEventListener implements
EventListener<TableEventParameters> {
@@ -606,55 +592,4 @@ public class SqlQueryProcessor implements QueryProcessor {
.thenApply(v -> false);
}
}
-
- /** Returns {@code true} if this is data modification operation. */
- private static boolean dataModificationOp(SqlNode sqlNode) {
- return SqlKind.DML.contains(sqlNode.getKind());
- }
-
- /** Performs additional validation of a parsed statement. **/
- public static void validateParsedStatement(
- QueryContext context,
- InternalTransaction outerTx,
- ParseResult parseResult,
- SqlNode node,
- Object[] params
- ) {
- Set<SqlQueryType> allowedTypes = context.allowedQueryTypes();
- SqlQueryType queryType = Commons.getQueryType(node);
-
- if (queryType == null) {
- throw new
IgniteInternalException(UNSUPPORTED_SQL_OPERATION_KIND_ERR, "Unsupported
operation ["
- + "sqlNodeKind=" + node.getKind() + "; "
- + "querySql=\"" + node + "\"]");
- }
-
- if (!allowedTypes.contains(queryType)) {
- String message = format("Invalid SQL statement type in the batch.
Expected {} but got {}.", allowedTypes, queryType);
-
- throw new QueryValidationException(message);
- }
-
- if (SqlQueryType.DDL == queryType && outerTx != null) {
- throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't
support transactions.");
- }
-
- if (parseResult.dynamicParamsCount() != params.length) {
- String message = format(
- "Unexpected number of query parameters. Provided {} but
there is only {} dynamic parameter(s).",
- params.length, parseResult.dynamicParamsCount()
- );
-
- throw new SqlException(QUERY_INVALID_ERR, message);
- }
-
- for (Object param : params) {
- if (!TypeUtils.supportParamInstance(param)) {
- String message = format(
- "Unsupported dynamic parameter defined. Provided '{}'
is not supported.", param.getClass().getName());
-
- throw new SqlException(QUERY_INVALID_ERR, message);
- }
- }
- }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
index 7a42727412..71657400bb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.sql.engine.prepare;
import java.util.concurrent.CompletableFuture;
-import org.apache.calcite.sql.SqlNode;
+import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -29,5 +29,5 @@ public interface PrepareService extends LifecycleAware {
/**
* Prepare query plan.
*/
- CompletableFuture<QueryPlan> prepareAsync(SqlNode sqlNode,
BaseQueryContext ctx);
+ CompletableFuture<QueryPlan> prepareAsync(String query, QueryContext
queryContext, BaseQueryContext ctx);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 6bebcbf8f7..3e01239b85 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -18,13 +18,18 @@
package org.apache.ignite.internal.sql.engine.prepare;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_SQL_OPERATION_KIND_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -38,15 +43,21 @@ import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlDdl;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.api.ColumnMetadataImpl;
import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.exec.QueryValidationException;
import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
+import org.apache.ignite.internal.sql.engine.sql.ParseResult;
+import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -55,6 +66,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -125,6 +137,59 @@ public class PrepareServiceImpl implements PrepareService {
.asMap();
}
+ private static boolean skipCache(SqlNode sqlNode) {
+ SqlKind kind = sqlNode.getKind();
+
+ switch (kind) {
+ case SELECT:
+ case INSERT:
+ return false;
+ default:
+ return true;
+ }
+ }
+
+ /** Performs additional validation of a parsed statement. **/
+ public static void validateParsedStatement(
+ QueryContext context,
+ ParseResult parseResult,
+ SqlNode node,
+ Object[] params
+ ) {
+ Set<SqlQueryType> allowedTypes = context.allowedQueryTypes();
+ SqlQueryType queryType = Commons.getQueryType(node);
+
+ if (queryType == null) {
+ throw new
IgniteInternalException(UNSUPPORTED_SQL_OPERATION_KIND_ERR, "Unsupported
operation ["
+ + "sqlNodeKind=" + node.getKind() + "; "
+ + "querySql=\"" + node + "\"]");
+ }
+
+ if (!allowedTypes.contains(queryType)) {
+ String message = format("Invalid SQL statement type in the batch.
Expected {} but got {}.", allowedTypes, queryType);
+
+ throw new QueryValidationException(message);
+ }
+
+ if (parseResult.dynamicParamsCount() != params.length) {
+ String message = format(
+ "Unexpected number of query parameters. Provided {} but
there is only {} dynamic parameter(s).",
+ params.length, parseResult.dynamicParamsCount()
+ );
+
+ throw new SqlException(QUERY_INVALID_ERR, message);
+ }
+
+ for (Object param : params) {
+ if (!TypeUtils.supportParamInstance(param)) {
+ String message = format(
+ "Unsupported dynamic parameter defined. Provided '{}'
is not supported.", param.getClass().getName());
+
+ throw new SqlException(QUERY_INVALID_ERR, message);
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override
public void start() {
@@ -146,9 +211,71 @@ public class PrepareServiceImpl implements PrepareService {
planningPool.shutdownNow();
}
+ /** Drop cached query plans. */
+ public void resetCache() {
+ cache.clear();
+ }
+
+ @TestOnly
+ public int cacheSize() {
+ return cache.size();
+ }
+
/** {@inheritDoc} */
@Override
+ public CompletableFuture<QueryPlan> prepareAsync(String query,
QueryContext queryContext, BaseQueryContext ctx) {
+ CacheKey cacheKey = createCacheKey(query, ctx);
+
+ CompletableFuture<QueryPlan> opFut = new CompletableFuture<>();
+
+ CompletableFuture<QueryPlan> cached = cache.computeIfAbsent(cacheKey,
key -> {
+ SqlNode sqlNode;
+
+ try {
+ StatementParseResult parseResult =
IgniteSqlParser.parse(query, StatementParseResult.MODE);
+
+ sqlNode = parseResult.statement();
+
+ validateParsedStatement(queryContext, parseResult, sqlNode,
ctx.parameters());
+ } catch (Exception ex) {
+ opFut.completeExceptionally(ex);
+
+ return null;
+ }
+
+ if (skipCache(sqlNode)) {
+ prepareAsync0(sqlNode, ctx)
+ .handle((r, e) -> e != null ?
opFut.completeExceptionally(e) : opFut.complete(r));
+
+ return null;
+ }
+
+ if (query.equals(sqlNode.toString())) {
+ prepareAsync0(sqlNode, ctx).handle((r, e) -> e != null ?
opFut.completeExceptionally(e) : opFut.complete(r));
+ } else {
+ runAsync(() -> prepareAsync(sqlNode, ctx).handle((r, e) -> e
!= null ? opFut.completeExceptionally(e) : opFut.complete(r)),
+ planningPool);
+ }
+
+ return opFut;
+ });
+
+ return cached != null ? cached : opFut;
+ }
+
+ /**
+ * Prepares and caches normalized query plan.
+ */
public CompletableFuture<QueryPlan> prepareAsync(SqlNode sqlNode,
BaseQueryContext ctx) {
+ CacheKey cacheKey = createCacheKey(sqlNode.toString(),
ctx.unwrap(BaseQueryContext.class));
+
+ return cache.computeIfAbsent(cacheKey, k -> prepareAsync0(sqlNode,
ctx));
+ }
+
+ /**
+ * Prepares query plan bypassing cache.
+ */
+ public CompletableFuture<QueryPlan> prepareAsync0(SqlNode sqlNode,
BaseQueryContext ctx) {
try {
assert single(sqlNode);
@@ -186,16 +313,6 @@ public class PrepareServiceImpl implements PrepareService {
}
}
- /** Drop cached query plans. */
- public void resetCache() {
- cache.clear();
- }
-
- @TestOnly
- public Map<CacheKey, CompletableFuture<QueryPlan>> cache() {
- return cache;
- }
-
private CompletableFuture<QueryPlan> prepareDdl(SqlNode sqlNode,
PlanningContext ctx) {
assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" :
sqlNode.getClass().getName();
@@ -213,7 +330,7 @@ public class PrepareServiceImpl implements PrepareService {
return cachedPlan.thenApply(plan -> new
ExplainPlan(plan.explain(SqlExplainLevel.ALL_ATTRIBUTES)));
}
- return CompletableFuture.supplyAsync(() -> {
+ return supplyAsync(() -> {
IgnitePlanner planner = ctx.planner();
// Validate
@@ -233,9 +350,7 @@ public class PrepareServiceImpl implements PrepareService {
}
private CompletableFuture<QueryPlan> prepareQuery(SqlNode sqlNode,
PlanningContext ctx) {
- CacheKey cacheKey = createCacheKey(sqlNode.toString(),
ctx.unwrap(BaseQueryContext.class));
-
- return cache.computeIfAbsent(cacheKey, k ->
CompletableFuture.supplyAsync(() -> {
+ return supplyAsync(() -> {
IgnitePlanner planner = ctx.planner();
// Validate
@@ -252,13 +367,11 @@ public class PrepareServiceImpl implements PrepareService
{
ResultSetMetadata metadata =
resultSetMetadata(validated.dataType(), validated.origins());
return new MultiStepQueryPlan(template, metadata, igniteRel);
- }, planningPool)).thenApply(QueryPlan::copy);
+ }, planningPool);
}
private CompletableFuture<QueryPlan> prepareDml(SqlNode sqlNode,
PlanningContext ctx) {
- CacheKey cacheKey = createCacheKey(sqlNode.toString(),
ctx.unwrap(BaseQueryContext.class));
-
- return cache.computeIfAbsent(cacheKey, k ->
CompletableFuture.supplyAsync(() -> {
+ return supplyAsync(() -> {
IgnitePlanner planner = ctx.planner();
// Validate
@@ -273,7 +386,7 @@ public class PrepareServiceImpl implements PrepareService {
QueryTemplate template = new QueryTemplate(fragments);
return new MultiStepDmlPlan(template, igniteRel);
- }, planningPool)).thenApply(QueryPlan::copy);
+ }, planningPool);
}
private static CacheKey createCacheKey(String query, BaseQueryContext ctx)
{
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java
deleted file mode 100644
index 2d5ecb82af..0000000000
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.prepare;
-
-/**
- * QueryPlan factory interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
-public interface QueryPlanFactory {
- /**
- * Create plans from context.
- *
- * @param ctx Planning context.
- * @return Query plan.
- */
- QueryPlan create(PlanningContext ctx);
-}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 0ef105e7c5..bce3a40968 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -66,6 +66,8 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
import
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
@@ -93,8 +95,6 @@ import
org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
-import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
-import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -538,11 +538,7 @@ public class ExecutionServiceImplTest {
}
private QueryPlan prepare(String query, BaseQueryContext ctx) {
- StatementParseResult parseResult = IgniteSqlParser.parse(query,
StatementParseResult.MODE);
-
- assertEquals(ctx.parameters().length,
parseResult.dynamicParamsCount(), "Invalid number of dynamic parameters");
-
- return await(prepareService.prepareAsync(parseResult.statement(),
ctx));
+ return await(prepareService.prepareAsync(query,
QueryContext.create(SqlQueryType.ALL), ctx));
}
static class TestCluster {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 17d095634e..a4d2bafede 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -22,7 +22,6 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
@@ -35,6 +34,8 @@ import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
@@ -58,14 +59,11 @@ import
org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PlannerHelper;
-import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
-import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -82,7 +80,7 @@ import org.apache.ignite.network.TopologyService;
public class TestNode implements LifecycleAware {
private final String nodeName;
private final SchemaPlus schema;
- private final PrepareService prepareService;
+ private final PrepareServiceImpl prepareService;
private final ExecutionService executionService;
private final List<LifecycleAware> services = new ArrayList<>();
@@ -188,12 +186,7 @@ public class TestNode implements LifecycleAware {
* @return A plan to execute.
*/
public QueryPlan prepare(String query) {
- StatementParseResult parseResult = IgniteSqlParser.parse(query,
StatementParseResult.MODE);
- BaseQueryContext ctx = createContext();
-
- assertEquals(ctx.parameters().length,
parseResult.dynamicParamsCount(), "Invalid number of dynamic parameters");
-
- return await(prepareService.prepareAsync(parseResult.statement(),
ctx));
+ return await(prepareService.prepareAsync(query,
QueryContext.create(SqlQueryType.ALL), createContext()));
}
/**
@@ -206,7 +199,7 @@ public class TestNode implements LifecycleAware {
public QueryPlan prepare(SqlNode queryAst) {
assertThat(queryAst, not(instanceOf(SqlNodeList.class)));
- return await(prepareService.prepareAsync(queryAst, createContext()));
+ return await(prepareService.prepareAsync0(queryAst, createContext()));
}
private BaseQueryContext createContext() {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
index 933943c003..48da1e6774 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.sql.engine.planner;
-import static
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.validateParsedStatement;
+import static
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.validateParsedStatement;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.List;
import java.util.Map;
@@ -43,7 +43,7 @@ import
org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.hamcrest.Matchers;
+import org.apache.ignite.sql.SqlException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -100,25 +100,25 @@ public class PrepareServiceSelfTest extends
AbstractPlannerTest {
public void normalizedQueryCache(String query1, String query2, Object[]
params) {
// Parse query and check nothing cached.
SqlNode query1Ast = parse(query1, params);
- assertThat(service.cache(), Matchers.anEmptyMap());
+ assertEquals(0, service.cacheSize());
Mockito.verifyNoInteractions(queryPlannerSpy);
// Preparing AST caches a normalized query plan.
assertThat(service.prepareAsync(query1Ast, createContext(params)),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertEquals(1, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(),
Mockito.any());
// Preparing same AST returns plan from cache.
query1Ast = parse(query1, params);
assertThat(service.prepareAsync(query1Ast, createContext(params)),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertEquals(1, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(),
Mockito.any());
// Prepare similar query returns plan from cache.
SqlNode query2Ast = parse(query2, params);
assertThat(service.prepareAsync(query2Ast, createContext(params)),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertEquals(1, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(),
Mockito.any());
}
@@ -128,7 +128,7 @@ public class PrepareServiceSelfTest extends
AbstractPlannerTest {
SqlNode sqlNode = parseResult.statement();
// Validate statement
- validateParsedStatement(queryCtx, null, parseResult, sqlNode, params);
+ validateParsedStatement(queryCtx, parseResult, sqlNode, params);
return sqlNode;
}
@@ -137,8 +137,8 @@ public class PrepareServiceSelfTest extends
AbstractPlannerTest {
public void ddlBypassCache() {
String query = "CREATE TABLE tbl0(id INTEGER PRIMARY KEY, val
VARCHAR);";
- assertThat(service.prepareAsync(parse(query), createContext()),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(0));
+ assertThat(service.prepareAsync(query, queryCtx, createContext()),
willBe(notNullValue()));
+ assertEquals(0, service.cacheSize());
Mockito.verify(ddlPlannerSpy, Mockito.times(1)).convert(Mockito.any(),
Mockito.any());
// DDL goes a separate flow via ddl converter.
Mockito.verifyNoInteractions(queryPlannerSpy);
@@ -146,11 +146,16 @@ public class PrepareServiceSelfTest extends
AbstractPlannerTest {
@Test
public void errors() {
- String query = "SELECT * FROM tbl2 WHERE id > 0"; // Invalid table
name.
+ String query = "invalid query"; // Invalid table name.
- assertThat(service.prepareAsync(parse(query), createContext()),
willThrow(CalciteContextException.class));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
-
assertTrue(service.cache().values().iterator().next().isCompletedExceptionally());
+ assertThat(service.prepareAsync(query, queryCtx, createContext()),
willThrow(SqlException.class));
+ assertEquals(0, service.cacheSize());
+ Mockito.verifyNoInteractions(queryPlannerSpy);
+
+ query = "SELECT * FROM tbl2 WHERE id > 0"; // Invalid table name.
+
+ assertThat(service.prepareAsync(query, queryCtx, createContext()),
willThrow(CalciteContextException.class));
+ assertEquals(2, service.cacheSize());
Mockito.verifyNoInteractions(queryPlannerSpy);
}
@@ -162,23 +167,23 @@ public class PrepareServiceSelfTest extends
AbstractPlannerTest {
String query = "SELECT * FROM tbl WHERE id > 0";
- assertThat(service.prepareAsync(parse(query), createContext()),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(0));
+ assertThat(service.prepareAsync(query, queryCtx, createContext()),
willBe(notNullValue()));
+ assertEquals(0, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(),
Mockito.any());
- assertThat(service.prepareAsync(parse(query), createContext()),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(0));
+ assertThat(service.prepareAsync(query, queryCtx, createContext()),
willBe(notNullValue()));
+ assertEquals(0, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(),
Mockito.any());
}
@Test
- public void normalizedQueryCaching() {
+ public void normalizedQuery() {
SqlNode queryAst = parse("SELECT NULL");
String normalizedQuery = queryAst.toString();
- assertThat(service.prepareAsync(parse(normalizedQuery),
createContext()), willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertThat(service.prepareAsync(normalizedQuery, queryCtx,
createContext()), willBe(notNullValue()));
+ assertEquals(1, service.cacheSize());
}
@Test
@@ -188,42 +193,42 @@ public class PrepareServiceSelfTest extends
AbstractPlannerTest {
String explainQuery2 = "EXPLAIN PLAN FOR SELECT * /* comment */ FROM
tbl WHERE id > 0";
// Ensure explain don't cache anything.
- assertThat(service.prepareAsync(parse(explainQuery), createContext()),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.anEmptyMap());
+ assertThat(service.prepareAsync(explainQuery, queryCtx,
createContext()), willBe(notNullValue()));
+ assertEquals(0, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(),
Mockito.any());
// Cache query plan.
- assertThat(service.prepareAsync(parse(query), createContext()),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertThat(service.prepareAsync(query, queryCtx, createContext()),
willBe(notNullValue()));
+ assertEquals(2, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(),
Mockito.any());
// Check explain gets plan from cache.
- assertThat(service.prepareAsync(parse(explainQuery), createContext()),
willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertThat(service.prepareAsync(explainQuery, queryCtx,
createContext()), willBe(notNullValue()));
+ assertEquals(2, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(),
Mockito.any());
// Check explain gets plan from cache for similar query.
- assertThat(service.prepareAsync(parse(explainQuery2),
createContext()), willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertThat(service.prepareAsync(explainQuery2, queryCtx,
createContext()), willBe(notNullValue()));
+ assertEquals(2, service.cacheSize());
Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(),
Mockito.any());
}
@Test
public void resetCache() {
- assertThat(service.cache(), Matchers.anEmptyMap());
+ assertEquals(0, service.cacheSize());
- assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id >
0"), createContext()), willBe(notNullValue()));
- assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id >
1"), createContext()), willBe(notNullValue()));
- assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id >
2"), createContext()), willBe(notNullValue()));
- assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id >
3"), createContext()), willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(4));
+ assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0",
queryCtx, createContext()), willBe(notNullValue()));
+ assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 1",
queryCtx, createContext()), willBe(notNullValue()));
+ assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 2",
queryCtx, createContext()), willBe(notNullValue()));
+ assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 3",
queryCtx, createContext()), willBe(notNullValue()));
+ assertEquals(8, service.cacheSize());
service.resetCache();
- assertThat(service.cache(), Matchers.anEmptyMap());
+ assertEquals(0, service.cacheSize());
- assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id >
0"), createContext()), willBe(notNullValue()));
- assertThat(service.cache(), Matchers.aMapWithSize(1));
+ assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0",
queryCtx, createContext()), willBe(notNullValue()));
+ assertEquals(2, service.cacheSize());
}
private BaseQueryContext createContext(Object... params) {