This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-19497
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit bbf8eb5ed167fbbe41624285f81fe843afaec846
Author: amashenkov <[email protected]>
AuthorDate: Tue Aug 1 20:53:41 2023 +0300

    Use schema versions for queries.
    * Use actual schema for query in TX.
    * Pass schema version to remote fragment.
    * Wait for catalog schema ready on the remote side.
---
 .../java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java | 5 +++++
 .../java/org/apache/ignite/internal/catalog/CatalogService.java     | 6 ++++++
 .../org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java    | 4 +++-
 .../ignite/internal/sql/engine/exec/ExecutionServiceImpl.java       | 6 +++---
 .../ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java  | 2 +-
 .../apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java  | 1 +
 .../ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java   | 2 +-
 .../internal/sql/engine/framework/PredefinedSchemaManager.java      | 2 +-
 .../org/apache/ignite/internal/sql/engine/framework/TestNode.java   | 6 +++---
 9 files changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index eb606160e3..16a9a1e8e0 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -275,6 +275,11 @@ public class CatalogManagerImpl extends 
Producer<CatalogEvent, CatalogEventParam
         return catalogByVer.lastEntry().getKey();
     }
 
+    @Override
+    public CompletableFuture<Void> catalogReadyFuture(int ver) {
+        return versionTracker.waitFor(ver);
+    }
+
     private Catalog catalog(int version) {
         return catalogByVer.get(version);
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index c1d30f1b4c..314ad24075 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.catalog;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -79,4 +80,9 @@ public interface CatalogService {
     int latestCatalogVersion();
 
     void listen(CatalogEvent evt, EventListener<CatalogEventParameters> 
closure);
+
+    /**
+     * Returns a future, which completes, when catalog of given version will 
be available.
+     */
+    CompletableFuture<Void> catalogReadyFuture(int ver);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 1ba38288b1..9098326e46 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -423,7 +423,9 @@ public class SqlQueryProcessor implements QueryProcessor {
 
                     tx.set(implicitTxRequired ? txManager.begin(!rwOp, null) : 
outerTx);
 
-                    SchemaPlus schema = sqlSchemaManager.schema(schemaName);
+                    int catalogVersion = 
catalogManager.activeCatalogVersion(tx.get().startTimestamp().longValue());
+
+                    SchemaPlus schema = sqlSchemaManager.schema(schemaName, 
catalogVersion);
 
                     if (schema == null) {
                         return CompletableFuture.failedFuture(new 
SchemaNotFoundException(schemaName));
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 6b483f8b0e..50428f0b91 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -239,13 +239,13 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         return queryManager.execute(tx, plan);
     }
 
-    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String 
schema, Object[] params) {
+    private BaseQueryContext createQueryContext(UUID queryId, @Nullable String 
schema, int catalogVersion, Object[] params) {
         return BaseQueryContext.builder()
                 .queryId(queryId)
                 .parameters(params)
                 .frameworkConfig(
                         Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                                .defaultSchema(sqlSchemaManager.schema(schema))
+                                .defaultSchema(sqlSchemaManager.schema(schema, 
catalogVersion))
                                 .build()
                 )
                 .logger(LOG)
@@ -427,7 +427,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
     private DistributedQueryManager getOrCreateQueryManager(QueryStartRequest 
msg) {
         return queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
-            BaseQueryContext ctx = createQueryContext(key, msg.schema(), 
msg.parameters());
+            BaseQueryContext ctx = createQueryContext(key, msg.schema(), 
(int)msg.schemaVersion(), msg.parameters());
 
             return new DistributedQueryManager(ctx);
         });
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
index ed27b83e91..371f885abc 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java
@@ -93,7 +93,7 @@ public class CatalogSqlSchemaManager implements 
SqlSchemaManager {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<?> actualSchemaAsync(long ver) {
-        return CompletableFuture.completedFuture(catalogManager.schema((int) 
ver));
+        return catalogManager.catalogReadyFuture((int) ver);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
index ef45bc8598..e2c1fb6e1b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
@@ -47,6 +47,7 @@ public interface SqlSchemaManager {
     /**
      * Wait for {@code ver} schema version, just a stub, need to be removed 
after IGNITE-18733.
      */
+    @Deprecated
     CompletableFuture<?> actualSchemaAsync(long ver);
 
     /**
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index e57170b284..ec61eca2fb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -610,7 +610,7 @@ public class ExecutionServiceImplTest {
         rootSch.add(schema.getName(), schema);
         SchemaPlus plus = rootSch.plus().getSubSchema(schema.getName());
 
-        when(schemaManagerMock.schema(any())).thenReturn(plus);
+        when(schemaManagerMock.schema(any(), anyInt())).thenReturn(plus);
 
         var executionService = new ExecutionServiceImpl<>(
                 messageService,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index c98b745510..73ddc9098b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -81,7 +81,7 @@ public class PredefinedSchemaManager implements 
SqlSchemaManager {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<?> actualSchemaAsync(long ver) {
-        return CompletableFuture.completedFuture(root);
+        return CompletableFuture.completedFuture(null);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index de4e900d96..88f270c581 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -79,12 +79,12 @@ import org.apache.ignite.network.TopologyService;
  */
 public class TestNode implements LifecycleAware {
     private final String nodeName;
-    private final SchemaPlus schema;
     private final PrepareService prepareService;
     private final ExecutionService executionService;
     private final ParserService parserService;
 
     private final List<LifecycleAware> services = new ArrayList<>();
+    private final SqlSchemaManager schemaManager;
 
     /**
      * Constructs the object.
@@ -101,7 +101,7 @@ public class TestNode implements LifecycleAware {
     ) {
         this.nodeName = nodeName;
         this.prepareService = registerService(new PrepareServiceImpl(nodeName, 
0, mock(DdlSqlToCommandConverter.class), PLANNING_TIMEOUT));
-        this.schema = schemaManager.schema("PUBLIC");
+        this.schemaManager = schemaManager;
 
         TopologyService topologyService = clusterService.topologyService();
         MessagingService messagingService = clusterService.messagingService();
@@ -223,7 +223,7 @@ public class TestNode implements LifecycleAware {
                 .cancel(new QueryCancel())
                 .frameworkConfig(
                         Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                                .defaultSchema(schema)
+                                
.defaultSchema(schemaManager.latestSchema("PUBLIC"))
                                 .build()
                 )
                 .build();

Reply via email to