This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-17765-2 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 05cf5031dc0271e417031bb9dba4e64a29b71a14 Author: amashenkov <[email protected]> AuthorDate: Tue Jun 13 19:06:06 2023 +0300 fix --- .../internal/sql/engine/SqlQueryProcessor.java | 323 +++++++++++---------- .../internal/sql/engine/prepare/CacheKey.java | 19 +- .../sql/engine/schema/CatalogSqlSchemaManager.java | 5 + .../sql/engine/schema/SqlSchemaManager.java | 5 + 4 files changed, 197 insertions(+), 155 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index ffa661fe87..f3dd71b238 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.sql.engine; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.internal.sql.engine.SqlQueryType.DML; import static org.apache.ignite.internal.sql.engine.SqlQueryType.QUERY; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; @@ -33,13 +34,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -53,6 +52,7 @@ import org.apache.calcite.util.Pair; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.index.event.IndexEvent; import org.apache.ignite.internal.index.event.IndexEventParameters; @@ -90,7 +90,6 @@ import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser; import org.apache.ignite.internal.sql.engine.sql.ParseResult; import org.apache.ignite.internal.sql.engine.sql.StatementParseResult; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; -import org.apache.ignite.internal.sql.engine.util.BaseQueryContext.Builder; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.DataStorageManager; @@ -110,7 +109,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** - * SqlQueryProcessor. + * SqlQueryProcessor. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ public class SqlQueryProcessor implements QueryProcessor { @@ -126,8 +125,8 @@ public class SqlQueryProcessor implements QueryProcessor { private static final long SESSION_EXPIRE_CHECK_PERIOD = TimeUnit.SECONDS.toMillis(1); /** - * Duration in milliseconds after which the session will be considered expired if no action have been performed - * on behalf of this session during this period. + * Duration in milliseconds after which the session will be considered expired if no action have been performed on behalf of this + * session during this period. */ private static final long DEFAULT_SESSION_IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(15); @@ -390,22 +389,123 @@ public class SqlQueryProcessor implements QueryProcessor { private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0( SessionId sessionId, - QueryContext context, + QueryContext queryContext, String sql, Object... params ) { Session session = sessionManager.session(sessionId); if (session == null) { - return CompletableFuture.failedFuture( + return failedFuture( new SqlException(SESSION_NOT_FOUND_ERR, format("Session not found [{}]", sessionId))); } + QueryCancel queryCancel; + + try { + queryCancel = createQueryCancel(session); + } catch (IllegalStateException ex) { + return failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR, + format("Session has been expired [{}]", session.sessionId()), ex)); + } + String schemaName = session.properties().get(QueryProperty.DEFAULT_SCHEMA); - InternalTransaction outerTx = context.unwrap(InternalTransaction.class); + InternalTransaction outerTx = queryContext.unwrap(InternalTransaction.class); + long timestamp = outerTx == null ? clock.nowLong() : outerTx.startTimestamp().longValue(); + + boolean implicitTx = outerTx == null; + + int plannerCatalogVersion = sqlSchemaManager.actualCatalogVersion(timestamp); + SchemaPlus schema = sqlSchemaManager.schema(schemaName, plannerCatalogVersion); + + if (schema == null) { + return failedFuture(new SchemaNotFoundException(schemaName)); + } + + CacheKey cacheKey = new CacheKey(schemaName, plannerCatalogVersion, sql, params); + + BaseQueryContext plannerContext = BaseQueryContext.builder() + .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()) + .logger(LOG) + .cancel(queryCancel) + .parameters(params) + .plannerTimeout(PLANNER_TIMEOUT) + .build(); + + CompletableFuture<QueryPlan> planFuture = queryCache.get(cacheKey); + + if (planFuture == null) { + planFuture = CompletableFuture.supplyAsync(() -> { + // Parse query + StatementParseResult parseResult = IgniteSqlParser.parse(sql, StatementParseResult.MODE); + SqlNode sqlNode = parseResult.statement(); - QueryCancel queryCancel = new QueryCancel(); + // Validate statement + validateParsedStatement(queryContext, parseResult, sqlNode, params); + + return sqlNode; + }).thenCompose(sqlNode -> { + if (skipCache(Commons.getQueryType(sqlNode))) { + // Prepare query plan without caching. + return prepareSvc.prepareAsync(sqlNode, plannerContext); + } + + // Try query plan for normalized query, or create a new one asynchronously. + CacheKey normalizedQueryCacheKey = new CacheKey(schemaName, plannerCatalogVersion, sqlNode.toString(), params); + + CompletableFuture<QueryPlan> planFuture0 = queryCache.computeIfAbsent( + normalizedQueryCacheKey, + k -> prepareSvc.prepareAsync(sqlNode, plannerContext) + ); + + queryCache.putIfAbsent(cacheKey, planFuture0); + + // Copy shared plan. + return planFuture0.thenApply(QueryPlan::copy); + }); + } else { + // Copy shared plan. + planFuture = planFuture.thenApply(QueryPlan::copy); + } + + return planFuture + .thenCompose(plan -> { + // Validate plan + if (SqlQueryType.DDL == plan.type() && outerTx != null) { + return failedFuture(new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support transactions.")); + } + + InternalTransaction tx = implicitTx ? txManager.begin(plan.type() != DML) : outerTx; + + try { + int txCatalogVersion = catalogManager.activeCatalogVersion(tx.startTimestamp().longValue()); + + if (implicitTx && plannerCatalogVersion != txCatalogVersion) { + LOG.info("Retry query planning: plannerCatalogVersion={}, txCatalogVersion={}", + plannerCatalogVersion, txCatalogVersion); + + return tx.rollbackAsync() + .thenComposeAsync(ignore -> querySingle0(sessionId, queryContext, sql, params), taskExecutor); + } + + return CompletableFuture.completedFuture(executePlan(session, tx, plan, plannerContext)); + } catch (Throwable th) { + handleQueryException(th, queryCancel, implicitTx ? tx : null); + + return failedFuture(th); + } + }); + } + + private static boolean skipCache(@Nullable SqlQueryType queryType) { + return queryType != QUERY && queryType != DML; + } + + @NotNull + private QueryCancel createQueryCancel(Session session) { + QueryCancel queryCancel; + queryCancel = new QueryCancel(); AsyncCloseable closeableResource = () -> CompletableFuture.runAsync( queryCancel::cancel, @@ -413,114 +513,52 @@ public class SqlQueryProcessor implements QueryProcessor { ); queryCancel.add(() -> session.unregisterResource(closeableResource)); + session.registerResource(closeableResource); + return queryCancel; + } - try { - session.registerResource(closeableResource); - } catch (IllegalStateException ex) { - return CompletableFuture.failedFuture(new IgniteInternalException(SESSION_EXPIRED_ERR, - format("Session has been expired [{}]", session.sessionId()), ex)); - } - CompletableFuture<Void> start = new CompletableFuture<>(); - - boolean implicitTxRequired = outerTx == null; - AtomicReference<InternalTransaction> tx = new AtomicReference<>(); - - CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start - .thenCompose(v -> { - Builder contextBuilder = BaseQueryContext.builder() - .logger(LOG) - .cancel(queryCancel) - .parameters(params) - .plannerTimeout(PLANNER_TIMEOUT); - - CompletableFuture<PlanWithContext>[] newPlanHolder = new CompletableFuture[1]; - - CompletableFuture<QueryPlan> cachedPlan = queryCache.computeIfAbsent(new CacheKey(schemaName, sql, params), (k) -> { - // Parse query. - StatementParseResult parseResult = IgniteSqlParser.parse(sql, StatementParseResult.MODE); - SqlNode sqlNode = parseResult.statement(); - - validateParsedStatement(context, outerTx, parseResult, sqlNode, params); - - // Build context. - tx.set(implicitTxRequired ? txManager.begin(!dataModificationOp(sqlNode)) : outerTx); - SchemaPlus schema = resolveSchema(schemaName); - BaseQueryContext ctx = contextBuilder.frameworkConfig( - Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()).build(); - - CompletableFuture<QueryPlan> planFuture = prepareSvc.prepareAsync(sqlNode, ctx); - - SqlQueryType queryType = Commons.getQueryType(sqlNode); - boolean putPlanIntoCache = queryType == QUERY || queryType == DML; - - newPlanHolder[0] = planFuture.thenApply(plan -> new PlanWithContext(putPlanIntoCache ? plan.copy() : plan, ctx)); - - return putPlanIntoCache ? planFuture : null; - }); - - return Objects.requireNonNullElseGet( - newPlanHolder[0], - () -> cachedPlan.thenApply(plan -> { - // Build query context for execution. - tx.set(implicitTxRequired ? txManager.begin(plan.type() != DML) : outerTx); - SchemaPlus schema = resolveSchema(schemaName); - BaseQueryContext ctx = contextBuilder.frameworkConfig( - Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build()).build(); - - return new PlanWithContext(plan.copy(), ctx); - }) - ).thenApply(planWithContext -> { - QueryPlan plan = planWithContext.plan; - BaseQueryContext ctx = planWithContext.context; - - var dataCursor = executionSrvc.executePlan(tx.get(), plan, ctx); - - SqlQueryType queryType = plan.type(); - assert queryType != null : "Expected a full plan but got a fragment: " + plan; - - return new AsyncSqlCursorImpl<>( - queryType, - plan.metadata(), - implicitTxRequired ? tx.get() : null, - new AsyncCursor<List<Object>>() { - @Override - public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { - session.touch(); - - return dataCursor.requestNextAsync(rows); - } - - @Override - public CompletableFuture<Void> closeAsync() { - session.touch(); - - return dataCursor.closeAsync(); - } - } - ); - }); - }); + private AsyncSqlCursor<List<Object>> executePlan(Session session, InternalTransaction tx, QueryPlan plan, BaseQueryContext ctx) { + var dataCursor = executionSrvc.executePlan(tx, plan, ctx); - stage.whenComplete((cur, ex) -> { - if (ex instanceof CancellationException) { - queryCancel.cancel(); - } + SqlQueryType queryType = plan.type(); + assert queryType != null : "Expected a full plan but got a fragment: " + plan; + + return new AsyncSqlCursorImpl<>( + queryType, + plan.metadata(), + tx, + new AsyncCursor<>() { + @Override + public CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) { + session.touch(); + + return dataCursor.requestNextAsync(rows); + } + + @Override + public CompletableFuture<Void> closeAsync() { + session.touch(); - if (ex != null && outerTx == null) { - InternalTransaction tx0 = tx.get(); - if (tx0 != null) { - tx0.rollback(); + return dataCursor.closeAsync(); + } } - } - }); + ); + } - start.completeAsync(() -> null, taskExecutor); + private static void handleQueryException(Throwable th, QueryCancel queryCancel, @Nullable InternalTransaction tx) { + if (th instanceof CancellationException) { + queryCancel.cancel(); + } - return stage; + if (th != null) { + if (tx != null) { + tx.rollback(); + } + } } - private SchemaPlus resolveSchema(String schemaName) { + private SchemaPlus resolveSchema(String schemaName, HybridTimestamp timestamp) { SchemaPlus schema = sqlSchemaManager.schema(schemaName); if (schema == null) { @@ -555,11 +593,11 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onTableCreated( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -573,11 +611,11 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onTableUpdated( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -591,11 +629,11 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onTableDropped( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -609,12 +647,12 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onIndexDropped( - // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas - DEFAULT_SCHEMA_NAME, - parameters.tableId(), - parameters.indexId(), - parameters.causalityToken() - ) + // TODO: https://issues.apache.org/jira/browse/IGNITE-17694 Hardcoded schemas + DEFAULT_SCHEMA_NAME, + parameters.tableId(), + parameters.indexId(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -628,11 +666,11 @@ public class SqlQueryProcessor implements QueryProcessor { @Override public CompletableFuture<Boolean> notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) { return schemaHolder.onIndexCreated( - parameters.tableId(), - parameters.indexId(), - parameters.indexDescriptor(), - parameters.causalityToken() - ) + parameters.tableId(), + parameters.indexId(), + parameters.indexDescriptor(), + parameters.causalityToken() + ) .thenApply(v -> false); } } @@ -645,7 +683,6 @@ public class SqlQueryProcessor implements QueryProcessor { /** Performs additional validation of a parsed statement. **/ private static void validateParsedStatement( QueryContext context, - InternalTransaction outerTx, ParseResult parseResult, SqlNode node, Object[] params @@ -665,10 +702,6 @@ public class SqlQueryProcessor implements QueryProcessor { throw new QueryValidationException(message); } - if (SqlQueryType.DDL == queryType && outerTx != null) { - throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, "DDL doesn't support transactions."); - } - if (parseResult.dynamicParamsCount() != params.length) { String message = format( "Unexpected number of query parameters. Provided {} but there is only {} dynamic parameter(s).", @@ -687,14 +720,4 @@ public class SqlQueryProcessor implements QueryProcessor { } } } - - private static final class PlanWithContext { - private final QueryPlan plan; - private final BaseQueryContext context; - - private PlanWithContext(QueryPlan plan, BaseQueryContext context) { - this.plan = plan; - this.context = context; - } - } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java index b6a1c023aa..dbe0f0f957 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/CacheKey.java @@ -29,19 +29,25 @@ public class CacheKey { private final String schemaName; + private final int catalogVersion; + private final String query; private final Class[] paramTypes; + /** * Constructor. * * @param schemaName Schema name. - * @param query Query string. - * @param params Dynamic parameters. + * @param catalogVersion Catalog version. + * @param query Query string. + * @param params Dynamic parameters. */ - public CacheKey(String schemaName, String query, Object[] params) { + public CacheKey(String schemaName, int catalogVersion, String query, Object[] params) { + //TODO: IGNITE-17765 use schema id instead of name. this.schemaName = schemaName; + this.catalogVersion = catalogVersion; this.query = query; this.paramTypes = params.length == 0 ? EMPTY_CLASS_ARRAY @@ -60,6 +66,9 @@ public class CacheKey { CacheKey cacheKey = (CacheKey) o; + if (catalogVersion != cacheKey.catalogVersion){ + return false; + } if (!schemaName.equals(cacheKey.schemaName)) { return false; } @@ -67,14 +76,14 @@ public class CacheKey { return false; } - return Arrays.deepEquals(paramTypes, cacheKey.paramTypes); + return Arrays.equals(paramTypes, cacheKey.paramTypes); } @Override public int hashCode() { int result = schemaName.hashCode(); result = 31 * result + query.hashCode(); - result = 31 * result + Arrays.deepHashCode(paramTypes); + result = 31 * result + Arrays.hashCode(paramTypes); return result; } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java index d18bece8f9..9975062ef9 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java @@ -107,6 +107,11 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { return cache.computeIfAbsent(entry, (v) -> createSqlSchema(v.getValue(), descriptor)); } + @Override + public int actualCatalogVersion(long timestamp) { + return catalogManager.activeCatalogVersion(timestamp); + } + private SchemaPlus createSqlSchema(int version, CatalogSchemaDescriptor descriptor) { String schemaName = descriptor.name(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java index ef45bc8598..3031aa7bf6 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java @@ -53,4 +53,9 @@ public interface SqlSchemaManager { * Returns a required schema if specified, or default schema otherwise. */ SchemaPlus activeSchema(@Nullable String name, long timestamp); + + /** + * Return actual schema version. + */ + int actualCatalogVersion(long timestamp); }
