This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-19497 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit bbf8eb5ed167fbbe41624285f81fe843afaec846 Author: amashenkov <[email protected]> AuthorDate: Tue Aug 1 20:53:41 2023 +0300 Use schema versions for queries. * Use actual schema for query in TX. * Pass schema version to remote fragment. * Wait for catalog schema ready on the remote side. --- .../java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java | 5 +++++ .../java/org/apache/ignite/internal/catalog/CatalogService.java | 6 ++++++ .../org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java | 4 +++- .../ignite/internal/sql/engine/exec/ExecutionServiceImpl.java | 6 +++--- .../ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java | 2 +- .../apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java | 1 + .../ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java | 2 +- .../internal/sql/engine/framework/PredefinedSchemaManager.java | 2 +- .../org/apache/ignite/internal/sql/engine/framework/TestNode.java | 6 +++--- 9 files changed, 24 insertions(+), 10 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index eb606160e3..16a9a1e8e0 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -275,6 +275,11 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam return catalogByVer.lastEntry().getKey(); } + @Override + public CompletableFuture<Void> catalogReadyFuture(int ver) { + return versionTracker.waitFor(ver); + } + private Catalog catalog(int version) { return catalogByVer.get(version); } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java index c1d30f1b4c..314ad24075 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.catalog; import java.util.Collection; +import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; @@ -79,4 +80,9 @@ public interface CatalogService { int latestCatalogVersion(); void listen(CatalogEvent evt, EventListener<CatalogEventParameters> closure); + + /** + * Returns a future, which completes, when catalog of given version will be available. + */ + CompletableFuture<Void> catalogReadyFuture(int ver); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 1ba38288b1..9098326e46 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -423,7 +423,9 @@ public class SqlQueryProcessor implements QueryProcessor { tx.set(implicitTxRequired ? txManager.begin(!rwOp, null) : outerTx); - SchemaPlus schema = sqlSchemaManager.schema(schemaName); + int catalogVersion = catalogManager.activeCatalogVersion(tx.get().startTimestamp().longValue()); + + SchemaPlus schema = sqlSchemaManager.schema(schemaName, catalogVersion); if (schema == null) { return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName)); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 6b483f8b0e..50428f0b91 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -239,13 +239,13 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve return queryManager.execute(tx, plan); } - private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, Object[] params) { + private BaseQueryContext createQueryContext(UUID queryId, @Nullable String schema, int catalogVersion, Object[] params) { return BaseQueryContext.builder() .queryId(queryId) .parameters(params) .frameworkConfig( Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) - .defaultSchema(sqlSchemaManager.schema(schema)) + .defaultSchema(sqlSchemaManager.schema(schema, catalogVersion)) .build() ) .logger(LOG) @@ -427,7 +427,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve private DistributedQueryManager getOrCreateQueryManager(QueryStartRequest msg) { return queryManagerMap.computeIfAbsent(msg.queryId(), key -> { - BaseQueryContext ctx = createQueryContext(key, msg.schema(), msg.parameters()); + BaseQueryContext ctx = createQueryContext(key, msg.schema(), (int)msg.schemaVersion(), msg.parameters()); return new DistributedQueryManager(ctx); }); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java index ed27b83e91..371f885abc 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java @@ -93,7 +93,7 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { /** {@inheritDoc} */ @Override public CompletableFuture<?> actualSchemaAsync(long ver) { - return CompletableFuture.completedFuture(catalogManager.schema((int) ver)); + return catalogManager.catalogReadyFuture((int) ver); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java index ef45bc8598..e2c1fb6e1b 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java @@ -47,6 +47,7 @@ public interface SqlSchemaManager { /** * Wait for {@code ver} schema version, just a stub, need to be removed after IGNITE-18733. */ + @Deprecated CompletableFuture<?> actualSchemaAsync(long ver); /** diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index e57170b284..ec61eca2fb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -610,7 +610,7 @@ public class ExecutionServiceImplTest { rootSch.add(schema.getName(), schema); SchemaPlus plus = rootSch.plus().getSubSchema(schema.getName()); - when(schemaManagerMock.schema(any())).thenReturn(plus); + when(schemaManagerMock.schema(any(), anyInt())).thenReturn(plus); var executionService = new ExecutionServiceImpl<>( messageService, diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java index c98b745510..73ddc9098b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java @@ -81,7 +81,7 @@ public class PredefinedSchemaManager implements SqlSchemaManager { /** {@inheritDoc} */ @Override public CompletableFuture<?> actualSchemaAsync(long ver) { - return CompletableFuture.completedFuture(root); + return CompletableFuture.completedFuture(null); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java index de4e900d96..88f270c581 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -79,12 +79,12 @@ import org.apache.ignite.network.TopologyService; */ public class TestNode implements LifecycleAware { private final String nodeName; - private final SchemaPlus schema; private final PrepareService prepareService; private final ExecutionService executionService; private final ParserService parserService; private final List<LifecycleAware> services = new ArrayList<>(); + private final SqlSchemaManager schemaManager; /** * Constructs the object. @@ -101,7 +101,7 @@ public class TestNode implements LifecycleAware { ) { this.nodeName = nodeName; this.prepareService = registerService(new PrepareServiceImpl(nodeName, 0, mock(DdlSqlToCommandConverter.class), PLANNING_TIMEOUT)); - this.schema = schemaManager.schema("PUBLIC"); + this.schemaManager = schemaManager; TopologyService topologyService = clusterService.topologyService(); MessagingService messagingService = clusterService.messagingService(); @@ -223,7 +223,7 @@ public class TestNode implements LifecycleAware { .cancel(new QueryCancel()) .frameworkConfig( Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) - .defaultSchema(schema) + .defaultSchema(schemaManager.latestSchema("PUBLIC")) .build() ) .build();
