This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-17765-3
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 264a15847f2eaaa548ac4be8b6c10486627a8d35
Author: amashenkov <[email protected]>
AuthorDate: Tue Jun 20 15:04:52 2023 +0300

    Indirect cache for query plans.
---
 .../internal/sql/engine/SqlQueryProcessor.java     | 237 ++++++++-------------
 .../sql/engine/prepare/PrepareService.java         |   4 +-
 .../sql/engine/prepare/PrepareServiceImpl.java     | 157 ++++++++++++--
 .../sql/engine/prepare/QueryPlanFactory.java       |  32 ---
 .../sql/engine/sql/IgniteSqlCreateTable.java       |   7 +
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  10 +-
 .../internal/sql/engine/framework/TestNode.java    |  15 +-
 .../sql/engine/planner/PrepareServiceSelfTest.java | 124 ++++++-----
 8 files changed, 310 insertions(+), 276 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 580d6ee12a..b706ba84da 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -17,32 +17,28 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.lang.ErrorGroups.Sql.OPERATION_INTERRUPTED_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_EXPIRED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_NOT_FOUND_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
-import static 
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_SQL_OPERATION_KIND_ERR;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.catalog.CatalogManager;
@@ -67,7 +63,6 @@ import 
org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
-import org.apache.ignite.internal.sql.engine.exec.QueryValidationException;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
@@ -81,12 +76,7 @@ import 
org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.session.SessionInfo;
 import org.apache.ignite.internal.sql.engine.session.SessionManager;
 import org.apache.ignite.internal.sql.engine.session.SessionProperty;
-import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
-import org.apache.ignite.internal.sql.engine.sql.ParseResult;
-import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
@@ -247,7 +237,8 @@ public class SqlQueryProcessor implements QueryProcessor {
                 busyLock
         );
 
-        sqlSchemaManager.registerListener(prepareSvc::resetCache);
+        //TODO IGNITE-19497 No need cache clear as cache already aware of 
catalog versioning.
+        sqlSchemaManager.registerListener(prepareSvc::invalidateCachedPlans);
 
         this.prepareSvc = prepareSvc;
 
@@ -383,121 +374,116 @@ public class SqlQueryProcessor implements 
QueryProcessor {
 
     private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
             SessionId sessionId,
-            QueryContext context,
+            QueryContext queryContext,
             String sql,
             Object... params
     ) {
         Session session = sessionManager.session(sessionId);
 
         if (session == null) {
-            return CompletableFuture.failedFuture(
-                    new SqlException(SESSION_NOT_FOUND_ERR, format("Session 
not found [{}]", sessionId)));
+            return failedFuture(new SqlException(SESSION_NOT_FOUND_ERR, 
format("Session not found [{}]", sessionId)));
         }
 
-        String schemaName = 
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
-
-        InternalTransaction outerTx = 
context.unwrap(InternalTransaction.class);
-
         QueryCancel queryCancel = new QueryCancel();
 
-        AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
-                queryCancel::cancel,
-                taskExecutor
-        );
-
-        queryCancel.add(() -> session.unregisterResource(closeableResource));
-
         try {
-            session.registerResource(closeableResource);
+            registerQueryCancel(session, queryCancel);
         } catch (IllegalStateException ex) {
-            return CompletableFuture.failedFuture(new 
IgniteInternalException(SESSION_EXPIRED_ERR,
+            return failedFuture(new 
IgniteInternalException(SESSION_EXPIRED_ERR,
                     format("Session has been expired [{}]", 
session.sessionId()), ex));
         }
 
-        CompletableFuture<Void> start = new CompletableFuture<>();
+        String schemaName = 
session.properties().get(QueryProperty.DEFAULT_SCHEMA);
 
-        AtomicReference<InternalTransaction> tx = new AtomicReference<>();
+        InternalTransaction outerTx = 
queryContext.unwrap(InternalTransaction.class);
+        boolean implicitTx = outerTx == null;
 
-        CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
-                .thenApply(v -> {
-                    StatementParseResult parseResult = 
IgniteSqlParser.parse(sql, StatementParseResult.MODE);
-                    SqlNode sqlNode = parseResult.statement();
+        // TODO: IGNITE-19497 Use timestamp to get schema.
+        SchemaPlus schema = sqlSchemaManager.schema(schemaName);
 
-                    validateParsedStatement(context, outerTx, parseResult, 
sqlNode, params);
+        if (schema == null) {
+            return failedFuture(new SchemaNotFoundException(schemaName));
+        }
 
-                    return sqlNode;
-                })
-                .thenCompose(sqlNode -> {
-                    boolean rwOp = dataModificationOp(sqlNode);
+        BaseQueryContext plannerContext = BaseQueryContext.builder()
+                
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+                .logger(LOG)
+                .cancel(queryCancel)
+                .parameters(params)
+                .plannerTimeout(PLANNER_TIMEOUT)
+                .build();
+
+        return prepareSvc.prepareAsync(sql, queryContext, plannerContext)
+                .thenComposeAsync(plan -> {
+                    // Ensure plan can be executed.
+                    if (SqlQueryType.DDL == plan.type() && outerTx != null) {
+                        return failedFuture(new 
SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support 
transactions."));
+                    }
 
-                    boolean implicitTxRequired = outerTx == null;
+                    InternalTransaction tx = implicitTx ? 
txManager.begin(plan.type() != DML) : outerTx;
 
-                    tx.set(implicitTxRequired ? txManager.begin(!rwOp) : 
outerTx);
+                    try {
+                        // TODO IGNITE-19497 Ensure implicit tx uses the same 
schema, which query was planned for, otherwise retry planning.
+                        var dataCursor = executionSrvc.executePlan(tx, plan, 
plannerContext);
 
-                    SchemaPlus schema = sqlSchemaManager.schema(schemaName);
+                        SqlQueryType queryType = plan.type();
+                        assert queryType != null : "Expected a full plan but 
got a fragment: " + plan;
 
-                    if (schema == null) {
-                        return CompletableFuture.failedFuture(new 
SchemaNotFoundException(schemaName));
-                    }
+                        AsyncSqlCursorImpl<List<Object>> cursor = new 
AsyncSqlCursorImpl<>(
+                                queryType,
+                                plan.metadata(),
+                                implicitTx ? tx : null,
+                                new AsyncCursor<>() {
+                                    @Override
+                                    public 
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
+                                        session.touch();
 
-                    BaseQueryContext ctx = BaseQueryContext.builder()
-                            .frameworkConfig(
-                                    
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                                            .defaultSchema(schema)
-                                            .build()
-                            )
-                            .logger(LOG)
-                            .cancel(queryCancel)
-                            .parameters(params)
-                            .plannerTimeout(PLANNER_TIMEOUT)
-                            .build();
-
-                    return prepareSvc.prepareAsync(sqlNode, ctx)
-                            .thenApply(plan -> {
-                                var dataCursor = 
executionSrvc.executePlan(tx.get(), plan, ctx);
-
-                                SqlQueryType queryType = plan.type();
-                                assert queryType != null : "Expected a full 
plan but got a fragment: " + plan;
-
-                                return new AsyncSqlCursorImpl<>(
-                                        queryType,
-                                        plan.metadata(),
-                                        implicitTxRequired ? tx.get() : null,
-                                        new AsyncCursor<List<Object>>() {
-                                            @Override
-                                            public 
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
-                                                session.touch();
-
-                                                return 
dataCursor.requestNextAsync(rows);
-                                            }
-
-                                            @Override
-                                            public CompletableFuture<Void> 
closeAsync() {
-                                                session.touch();
-
-                                                return dataCursor.closeAsync();
-                                            }
-                                        }
-                                );
-                            });
-                });
+                                        return 
dataCursor.requestNextAsync(rows);
+                                    }
 
-        stage.whenComplete((cur, ex) -> {
-            if (ex instanceof CancellationException) {
-                queryCancel.cancel();
-            }
+                                    @Override
+                                    public CompletableFuture<Void> 
closeAsync() {
+                                        session.touch();
 
-            if (ex != null && outerTx == null) {
-                InternalTransaction tx0 = tx.get();
-                if (tx0 != null) {
-                    tx0.rollback();
-                }
-            }
-        });
+                                        return dataCursor.closeAsync();
+                                    }
+                                }
+                        );
 
-        start.completeAsync(() -> null, taskExecutor);
+                        return CompletableFuture.completedFuture(cursor);
+                    } catch (Throwable th) {
+                        handleQueryException(th, queryCancel, implicitTx ? tx 
: null);
 
-        return stage;
+                        return failedFuture(th);
+                    }
+                }, taskExecutor);
+    }
+
+    /**
+     * Registers QueryCancel object in session.
+     *
+     * @throws IllegalStateException If session is invalid
+     */
+    private void registerQueryCancel(Session session, QueryCancel queryCancel) 
throws IllegalStateException {
+        AsyncCloseable closeableResource = () -> CompletableFuture.runAsync(
+                queryCancel::cancel,
+                taskExecutor
+        );
+
+        queryCancel.add(() -> session.unregisterResource(closeableResource));
+        session.registerResource(closeableResource);
+    }
+
+    private static void handleQueryException(Throwable th, QueryCancel 
queryCancel, @Nullable InternalTransaction tx) {
+        if (th instanceof CancellationException) {
+            queryCancel.cancel();
+        }
+
+        if (th != null) {
+            if (tx != null) {
+                tx.rollback();
+            }
+        }
     }
 
     private abstract static class AbstractTableEventListener implements 
EventListener<TableEventParameters> {
@@ -606,55 +592,4 @@ public class SqlQueryProcessor implements QueryProcessor {
                     .thenApply(v -> false);
         }
     }
-
-    /** Returns {@code true} if this is data modification operation. */
-    private static boolean dataModificationOp(SqlNode sqlNode) {
-        return SqlKind.DML.contains(sqlNode.getKind());
-    }
-
-    /** Performs additional validation of a parsed statement. **/
-    public static void validateParsedStatement(
-            QueryContext context,
-            InternalTransaction outerTx,
-            ParseResult parseResult,
-            SqlNode node,
-            Object[] params
-    ) {
-        Set<SqlQueryType> allowedTypes = context.allowedQueryTypes();
-        SqlQueryType queryType = Commons.getQueryType(node);
-
-        if (queryType == null) {
-            throw new 
IgniteInternalException(UNSUPPORTED_SQL_OPERATION_KIND_ERR, "Unsupported 
operation ["
-                    + "sqlNodeKind=" + node.getKind() + "; "
-                    + "querySql=\"" + node + "\"]");
-        }
-
-        if (!allowedTypes.contains(queryType)) {
-            String message = format("Invalid SQL statement type in the batch. 
Expected {} but got {}.", allowedTypes, queryType);
-
-            throw new QueryValidationException(message);
-        }
-
-        if (SqlQueryType.DDL == queryType && outerTx != null) {
-            throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't 
support transactions.");
-        }
-
-        if (parseResult.dynamicParamsCount() != params.length) {
-            String message = format(
-                    "Unexpected number of query parameters. Provided {} but 
there is only {} dynamic parameter(s).",
-                    params.length, parseResult.dynamicParamsCount()
-            );
-
-            throw new SqlException(QUERY_INVALID_ERR, message);
-        }
-
-        for (Object param : params) {
-            if (!TypeUtils.supportParamInstance(param)) {
-                String message = format(
-                        "Unsupported dynamic parameter defined. Provided '{}' 
is not supported.", param.getClass().getName());
-
-                throw new SqlException(QUERY_INVALID_ERR, message);
-            }
-        }
-    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
index 7a42727412..71657400bb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.sql.engine.prepare;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.calcite.sql.SqlNode;
+import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 
@@ -29,5 +29,5 @@ public interface PrepareService extends LifecycleAware {
     /**
      * Prepare query plan.
      */
-    CompletableFuture<QueryPlan> prepareAsync(SqlNode sqlNode, 
BaseQueryContext ctx);
+    CompletableFuture<QueryPlan> prepareAsync(String query,  QueryContext 
queryContext, BaseQueryContext ctx);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 6bebcbf8f7..0d3aeac2c3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -17,14 +17,19 @@
 
 package org.apache.ignite.internal.sql.engine.prepare;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_INVALID_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.QUERY_VALIDATION_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_SQL_OPERATION_KIND_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -38,15 +43,21 @@ import org.apache.calcite.runtime.CalciteContextException;
 import org.apache.calcite.sql.SqlDdl;
 import org.apache.calcite.sql.SqlExplain;
 import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.api.ColumnMetadataImpl;
 import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.exec.QueryValidationException;
 import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
+import org.apache.ignite.internal.sql.engine.sql.ParseResult;
+import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -55,6 +66,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -72,6 +84,7 @@ public class PrepareServiceImpl implements PrepareService {
     private final DdlSqlToCommandConverter ddlConverter;
 
     private final ConcurrentMap<CacheKey, CompletableFuture<QueryPlan>> cache;
+    private final ConcurrentMap<CacheKey, String> parseCache;
 
     private final String nodeName;
 
@@ -123,6 +136,10 @@ public class PrepareServiceImpl implements PrepareService {
                 .maximumSize(cacheSize)
                 .<CacheKey, CompletableFuture<QueryPlan>>build()
                 .asMap();
+        parseCache = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .<CacheKey, String>build()
+                .asMap();
     }
 
     /** {@inheritDoc} */
@@ -146,8 +163,59 @@ public class PrepareServiceImpl implements PrepareService {
         planningPool.shutdownNow();
     }
 
+    /** Drop cached query plans. */
+    public void invalidateCachedPlans() {
+        cache.clear();
+    }
+
+    @TestOnly
+    public void invalidateParserCache() {
+        parseCache.clear();
+    }
+
+    @TestOnly
+    public int planCacheSize() {
+        return cache.size();
+    }
+
+    @TestOnly
+    public int parseCacheSize() {
+        return parseCache.size();
+    }
+
     /** {@inheritDoc} */
     @Override
+    public CompletableFuture<QueryPlan> prepareAsync(String query, 
QueryContext queryContext, BaseQueryContext ctx) {
+        CacheKey cacheKey = createCacheKey(query, ctx);
+
+        String normalizedQuery = parseCache.get(cacheKey);
+
+        if (normalizedQuery == null) {
+            SqlNode sqlNode;
+            try {
+                sqlNode = parse(query, queryContext, ctx);
+            } catch (Exception ex) {
+                return failedFuture(ex);
+            }
+
+            if (skipCache(sqlNode)) {
+                return prepareAsync(sqlNode, ctx);
+            }
+
+            parseCache.putIfAbsent(cacheKey, sqlNode.toString());
+
+            return cache.computeIfAbsent(createCacheKey(sqlNode.toString(), 
ctx), k -> prepareAsync(sqlNode, ctx))
+                    .thenApply(QueryPlan::copy);
+        }
+
+        return cache.computeIfAbsent(createCacheKey(normalizedQuery, ctx), k ->
+                        supplyAsync(() -> parse(query, queryContext, 
ctx)).thenCompose(sqlNode -> prepareAsync(sqlNode, ctx)))
+                .thenApply(QueryPlan::copy);
+    }
+
+    /**
+     * Prepares and caches normalized query plan.
+     */
     public CompletableFuture<QueryPlan> prepareAsync(SqlNode sqlNode, 
BaseQueryContext ctx) {
         try {
             assert single(sqlNode);
@@ -186,20 +254,10 @@ public class PrepareServiceImpl implements PrepareService 
{
         }
     }
 
-    /** Drop cached query plans. */
-    public void resetCache() {
-        cache.clear();
-    }
-
-    @TestOnly
-    public Map<CacheKey, CompletableFuture<QueryPlan>> cache() {
-        return cache;
-    }
-
     private CompletableFuture<QueryPlan> prepareDdl(SqlNode sqlNode, 
PlanningContext ctx) {
         assert sqlNode instanceof SqlDdl : sqlNode == null ? "null" : 
sqlNode.getClass().getName();
 
-        return CompletableFuture.completedFuture(new 
DdlPlan(ddlConverter.convert((SqlDdl) sqlNode, ctx)));
+        return completedFuture(new DdlPlan(ddlConverter.convert((SqlDdl) 
sqlNode, ctx)));
     }
 
     private CompletableFuture<QueryPlan> prepareExplain(SqlNode sqlNode, 
PlanningContext ctx) {
@@ -213,7 +271,7 @@ public class PrepareServiceImpl implements PrepareService {
             return cachedPlan.thenApply(plan -> new 
ExplainPlan(plan.explain(SqlExplainLevel.ALL_ATTRIBUTES)));
         }
 
-        return CompletableFuture.supplyAsync(() -> {
+        return supplyAsync(() -> {
             IgnitePlanner planner = ctx.planner();
 
             // Validate
@@ -233,9 +291,7 @@ public class PrepareServiceImpl implements PrepareService {
     }
 
     private CompletableFuture<QueryPlan> prepareQuery(SqlNode sqlNode, 
PlanningContext ctx) {
-        CacheKey cacheKey = createCacheKey(sqlNode.toString(), 
ctx.unwrap(BaseQueryContext.class));
-
-        return cache.computeIfAbsent(cacheKey, k -> 
CompletableFuture.supplyAsync(() -> {
+        return supplyAsync(() -> {
             IgnitePlanner planner = ctx.planner();
 
             // Validate
@@ -252,13 +308,11 @@ public class PrepareServiceImpl implements PrepareService 
{
             ResultSetMetadata metadata = 
resultSetMetadata(validated.dataType(), validated.origins());
 
             return new MultiStepQueryPlan(template, metadata, igniteRel);
-        }, planningPool)).thenApply(QueryPlan::copy);
+        }, planningPool);
     }
 
     private CompletableFuture<QueryPlan> prepareDml(SqlNode sqlNode, 
PlanningContext ctx) {
-        CacheKey cacheKey = createCacheKey(sqlNode.toString(), 
ctx.unwrap(BaseQueryContext.class));
-
-        return cache.computeIfAbsent(cacheKey, k -> 
CompletableFuture.supplyAsync(() -> {
+        return supplyAsync(() -> {
             IgnitePlanner planner = ctx.planner();
 
             // Validate
@@ -273,7 +327,7 @@ public class PrepareServiceImpl implements PrepareService {
             QueryTemplate template = new QueryTemplate(fragments);
 
             return new MultiStepDmlPlan(template, igniteRel);
-        }, planningPool)).thenApply(QueryPlan::copy);
+        }, planningPool);
     }
 
     private static CacheKey createCacheKey(String query, BaseQueryContext ctx) 
{
@@ -309,4 +363,67 @@ public class PrepareServiceImpl implements PrepareService {
                 }
         );
     }
+
+    private static SqlNode parse(String query, QueryContext queryContext, 
BaseQueryContext ctx) {
+        StatementParseResult parseResult = IgniteSqlParser.parse(query, 
StatementParseResult.MODE);
+
+        SqlNode sqlNode = parseResult.statement();
+
+        validateParsedStatement(queryContext, parseResult, sqlNode, 
ctx.parameters());
+
+        return sqlNode;
+    }
+
+    private static boolean skipCache(SqlNode sqlNode) {
+        SqlKind kind = sqlNode.getKind();
+
+        switch (kind) {
+            case SELECT:
+            case INSERT:
+                return false;
+            default:
+                return true;
+        }
+    }
+
+    /** Performs additional validation of a parsed statement. **/
+    public static void validateParsedStatement(
+            QueryContext context,
+            ParseResult parseResult,
+            SqlNode node,
+            Object[] params
+    ) {
+        Set<SqlQueryType> allowedTypes = context.allowedQueryTypes();
+        SqlQueryType queryType = Commons.getQueryType(node);
+
+        if (queryType == null) {
+            throw new 
IgniteInternalException(UNSUPPORTED_SQL_OPERATION_KIND_ERR, "Unsupported 
operation ["
+                    + "sqlNodeKind=" + node.getKind() + "; "
+                    + "querySql=\"" + node + "\"]");
+        }
+
+        if (!allowedTypes.contains(queryType)) {
+            String message = format("Invalid SQL statement type in the batch. 
Expected {} but got {}.", allowedTypes, queryType);
+
+            throw new QueryValidationException(message);
+        }
+
+        if (parseResult.dynamicParamsCount() != params.length) {
+            String message = format(
+                    "Unexpected number of query parameters. Provided {} but 
there is only {} dynamic parameter(s).",
+                    params.length, parseResult.dynamicParamsCount()
+            );
+
+            throw new SqlException(QUERY_INVALID_ERR, message);
+        }
+
+        for (Object param : params) {
+            if (!TypeUtils.supportParamInstance(param)) {
+                String message = format(
+                        "Unsupported dynamic parameter defined. Provided '{}' 
is not supported.", param.getClass().getName());
+
+                throw new SqlException(QUERY_INVALID_ERR, message);
+            }
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java
deleted file mode 100644
index 2d5ecb82af..0000000000
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QueryPlanFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.prepare;
-
-/**
- * QueryPlan factory interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- */
-public interface QueryPlanFactory {
-    /**
-     * Create plans from context.
-     *
-     * @param ctx Planning context.
-     * @return Query plan.
-     */
-    QueryPlan create(PlanningContext ctx);
-}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java
index a907b0b90b..dd7336818c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlCreateTable.java
@@ -69,6 +69,13 @@ public class IgniteSqlCreateTable extends SqlCreate {
         return ImmutableNullableList.of(name, columnList, createOptionList);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public SqlNode clone(SqlParserPos pos) {
+        //TODO: add clone for all DDL nodes.
+        return new IgniteSqlCreateTable(pos, ifNotExists, name, columnList, 
colocationColumns, createOptionList);
+    }
+
     /** {@inheritDoc} */
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 0ef105e7c5..bce3a40968 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -66,6 +66,8 @@ import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.AsyncCursor;
 import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import 
org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
@@ -93,8 +95,6 @@ import 
org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
-import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
-import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -538,11 +538,7 @@ public class ExecutionServiceImplTest {
     }
 
     private QueryPlan prepare(String query, BaseQueryContext ctx) {
-        StatementParseResult parseResult = IgniteSqlParser.parse(query, 
StatementParseResult.MODE);
-
-        assertEquals(ctx.parameters().length, 
parseResult.dynamicParamsCount(), "Invalid number of dynamic parameters");
-
-        return await(prepareService.prepareAsync(parseResult.statement(), 
ctx));
+        return await(prepareService.prepareAsync(query, 
QueryContext.create(SqlQueryType.ALL), ctx));
     }
 
     static class TestCluster {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 17d095634e..475ec37fcb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -22,7 +22,6 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
@@ -35,6 +34,8 @@ import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.sql.engine.AsyncCursor;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
@@ -58,14 +59,11 @@ import 
org.apache.ignite.internal.sql.engine.message.MessageService;
 import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
 import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.PlannerHelper;
-import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
-import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -82,7 +80,7 @@ import org.apache.ignite.network.TopologyService;
 public class TestNode implements LifecycleAware {
     private final String nodeName;
     private final SchemaPlus schema;
-    private final PrepareService prepareService;
+    private final PrepareServiceImpl prepareService;
     private final ExecutionService executionService;
 
     private final List<LifecycleAware> services = new ArrayList<>();
@@ -188,12 +186,7 @@ public class TestNode implements LifecycleAware {
      * @return A plan to execute.
      */
     public QueryPlan prepare(String query) {
-        StatementParseResult parseResult = IgniteSqlParser.parse(query, 
StatementParseResult.MODE);
-        BaseQueryContext ctx = createContext();
-
-        assertEquals(ctx.parameters().length, 
parseResult.dynamicParamsCount(), "Invalid number of dynamic parameters");
-
-        return await(prepareService.prepareAsync(parseResult.statement(), 
ctx));
+        return await(prepareService.prepareAsync(query, 
QueryContext.create(SqlQueryType.ALL), createContext()));
     }
 
     /**
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
index 933943c003..4834220ae1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PrepareServiceSelfTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.sql.engine.planner;
 
-import static 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.validateParsedStatement;
+import static 
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.validateParsedStatement;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.List;
 import java.util.Map;
@@ -43,7 +43,7 @@ import 
org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
 import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.hamcrest.Matchers;
+import org.apache.ignite.sql.SqlException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -97,38 +97,34 @@ public class PrepareServiceSelfTest extends 
AbstractPlannerTest {
 
     @ParameterizedTest(name = "{0}")
     @MethodSource("queries")
-    public void normalizedQueryCache(String query1, String query2, Object[] 
params) {
-        // Parse query and check nothing cached.
-        SqlNode query1Ast = parse(query1, params);
-        assertThat(service.cache(), Matchers.anEmptyMap());
-        Mockito.verifyNoInteractions(queryPlannerSpy);
-
-        // Preparing AST caches a normalized query plan.
-        assertThat(service.prepareAsync(query1Ast, createContext(params)), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+    public void queryCache(String query1, String query2, Object[] params) {
+        // Preparing query caches plan for both query and normalized query.
+        assertThat(service.prepareAsync(query1, queryCtx, 
createContext(params)), willBe(notNullValue()));
+        assertEquals(1, service.parseCacheSize());
+        assertEquals(1, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), 
Mockito.any());
 
-        // Preparing same AST returns plan from cache.
-        query1Ast = parse(query1, params);
-        assertThat(service.prepareAsync(query1Ast, createContext(params)), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+        // Preparing same query returns plan from cache.
+        assertThat(service.prepareAsync(query1, queryCtx, 
createContext(params)), willBe(notNullValue()));
+        assertEquals(1, service.parseCacheSize());
+        assertEquals(1, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), 
Mockito.any());
 
-        // Prepare similar query returns plan from cache.
-        SqlNode query2Ast = parse(query2, params);
-
-        assertThat(service.prepareAsync(query2Ast, createContext(params)), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+        // Preparing similar query returns cached plan and also cache the plan 
for the query.
+        assertThat(service.prepareAsync(query2, queryCtx, 
createContext(params)), willBe(notNullValue()));
+        assertEquals(2, service.parseCacheSize());
+        assertEquals(1, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), 
Mockito.any());
     }
 
+
     private SqlNode parse(String sql, Object... params) {
         // Parse query
         StatementParseResult parseResult = IgniteSqlParser.parse(sql, 
StatementParseResult.MODE);
         SqlNode sqlNode = parseResult.statement();
 
         // Validate statement
-        validateParsedStatement(queryCtx, null, parseResult, sqlNode, params);
+        validateParsedStatement(queryCtx, parseResult, sqlNode, params);
 
         return sqlNode;
     }
@@ -137,8 +133,8 @@ public class PrepareServiceSelfTest extends 
AbstractPlannerTest {
     public void ddlBypassCache() {
         String query = "CREATE TABLE tbl0(id INTEGER PRIMARY KEY, val 
VARCHAR);";
 
-        assertThat(service.prepareAsync(parse(query), createContext()), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(0));
+        assertThat(service.prepareAsync(query, queryCtx, createContext()), 
willBe(notNullValue()));
+        assertEquals(0, service.planCacheSize());
         Mockito.verify(ddlPlannerSpy, Mockito.times(1)).convert(Mockito.any(), 
Mockito.any());
         // DDL goes a separate flow via ddl converter.
         Mockito.verifyNoInteractions(queryPlannerSpy);
@@ -146,11 +142,16 @@ public class PrepareServiceSelfTest extends 
AbstractPlannerTest {
 
     @Test
     public void errors() {
-        String query = "SELECT * FROM tbl2 WHERE id > 0"; // Invalid table 
name.
+        String query = "invalid query"; // Invalid table name.
+
+        assertThat(service.prepareAsync(query, queryCtx, createContext()), 
willThrow(SqlException.class));
+        assertEquals(0, service.planCacheSize());
+        Mockito.verifyNoInteractions(queryPlannerSpy);
+
+        query = "SELECT * FROM tbl2 WHERE id > 0"; // Invalid table name.
 
-        assertThat(service.prepareAsync(parse(query), createContext()), 
willThrow(CalciteContextException.class));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
-        
assertTrue(service.cache().values().iterator().next().isCompletedExceptionally());
+        assertThat(service.prepareAsync(query, queryCtx, createContext()), 
willThrow(CalciteContextException.class));
+        assertEquals(1, service.planCacheSize());
         Mockito.verifyNoInteractions(queryPlannerSpy);
     }
 
@@ -162,23 +163,23 @@ public class PrepareServiceSelfTest extends 
AbstractPlannerTest {
 
         String query = "SELECT * FROM tbl WHERE id > 0";
 
-        assertThat(service.prepareAsync(parse(query), createContext()), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(0));
+        assertThat(service.prepareAsync(query, queryCtx, createContext()), 
willBe(notNullValue()));
+        assertEquals(0, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), 
Mockito.any());
 
-        assertThat(service.prepareAsync(parse(query), createContext()), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(0));
+        assertThat(service.prepareAsync(query, queryCtx, createContext()), 
willBe(notNullValue()));
+        assertEquals(0, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), 
Mockito.any());
     }
 
     @Test
-    public void normalizedQueryCaching() {
+    public void normalizedQuery() {
         SqlNode queryAst = parse("SELECT NULL");
 
         String normalizedQuery = queryAst.toString();
 
-        assertThat(service.prepareAsync(parse(normalizedQuery), 
createContext()), willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+        assertThat(service.prepareAsync(normalizedQuery, queryCtx, 
createContext()), willBe(notNullValue()));
+        assertEquals(1, service.planCacheSize());
     }
 
     @Test
@@ -188,42 +189,59 @@ public class PrepareServiceSelfTest extends 
AbstractPlannerTest {
         String explainQuery2 = "EXPLAIN PLAN FOR SELECT * /* comment */ FROM 
tbl WHERE id > 0";
 
         // Ensure explain don't cache anything.
-        assertThat(service.prepareAsync(parse(explainQuery), createContext()), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.anEmptyMap());
+        assertThat(service.prepareAsync(explainQuery, queryCtx, 
createContext()), willBe(notNullValue()));
+        assertEquals(0, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(1)).apply(Mockito.any(), 
Mockito.any());
 
         // Cache query plan.
-        assertThat(service.prepareAsync(parse(query), createContext()), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+        assertThat(service.prepareAsync(query, queryCtx, createContext()), 
willBe(notNullValue()));
+        assertEquals(1, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), 
Mockito.any());
 
         // Check explain gets plan from cache.
-        assertThat(service.prepareAsync(parse(explainQuery), createContext()), 
willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+        assertThat(service.prepareAsync(explainQuery, queryCtx, 
createContext()), willBe(notNullValue()));
+        assertEquals(1, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), 
Mockito.any());
 
         // Check explain gets plan from cache for similar query.
-        assertThat(service.prepareAsync(parse(explainQuery2), 
createContext()), willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+        assertThat(service.prepareAsync(explainQuery2, queryCtx, 
createContext()), willBe(notNullValue()));
+        assertEquals(1, service.planCacheSize());
         Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), 
Mockito.any());
     }
 
     @Test
     public void resetCache() {
-        assertThat(service.cache(), Matchers.anEmptyMap());
+        assertEquals(0, service.planCacheSize());
+
+        // Fill caches.
+        assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0", 
queryCtx, createContext()), willBe(notNullValue()));
+        assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 1", 
queryCtx, createContext()), willBe(notNullValue()));
+
+        assertEquals(2, service.parseCacheSize());
+        assertEquals(2, service.planCacheSize());
+        Mockito.verify(queryPlannerSpy, Mockito.times(2)).apply(Mockito.any(), 
Mockito.any());
+
+        // Drop cached plans.
+        service.invalidateCachedPlans();
+
+        assertEquals(2, service.parseCacheSize());
+        assertEquals(0, service.planCacheSize());
+
+        assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0", 
queryCtx, createContext()), willBe(notNullValue()));
+        assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 1", 
queryCtx, createContext()), willBe(notNullValue()));
 
-        assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 
0"), createContext()), willBe(notNullValue()));
-        assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 
1"), createContext()), willBe(notNullValue()));
-        assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 
2"), createContext()), willBe(notNullValue()));
-        assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 
3"), createContext()), willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(4));
+        assertEquals(2, service.parseCacheSize());
+        assertEquals(2, service.planCacheSize());
+        Mockito.verify(queryPlannerSpy, Mockito.times(4)).apply(Mockito.any(), 
Mockito.any());
 
-        service.resetCache();
+        // Invalidate parser cache and check reusing cached plans.
+        service.invalidateParserCache();
 
-        assertThat(service.cache(), Matchers.anEmptyMap());
+        assertThat(service.prepareAsync("SELECT * FROM tbl WHERE id > 0", 
queryCtx, createContext()), willBe(notNullValue()));
 
-        assertThat(service.prepareAsync(parse("SELECT * FROM tbl WHERE id > 
0"), createContext()), willBe(notNullValue()));
-        assertThat(service.cache(), Matchers.aMapWithSize(1));
+        assertEquals(1, service.parseCacheSize());
+        assertEquals(2, service.planCacheSize());
+        Mockito.verify(queryPlannerSpy, Mockito.times(4)).apply(Mockito.any(), 
Mockito.any());
     }
 
     private BaseQueryContext createContext(Object... params) {

Reply via email to