This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a0be8d223 IGNITE-18878 Sql. Introduce schema synchronisation for 
query processing, naive fix - Fixes #1754.
0a0be8d223 is described below

commit 0a0be8d2236f42a95f8ba2c593ab14a0bcdb169c
Author: zstan <[email protected]>
AuthorDate: Thu Mar 16 15:08:39 2023 +0300

    IGNITE-18878 Sql. Introduce schema synchronisation for query processing, 
naive fix - Fixes #1754.
    
    Signed-off-by: zstan <[email protected]>
---
 .../ignite/internal/causality/VersionedValue.java  |   2 +-
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 -
 .../apache/ignite/internal/index/IndexManager.java |   8 +-
 .../internal/index/event/IndexEventParameters.java |   2 +-
 .../internal/runner/app/ItDataSchemaSyncTest.java  |  47 ++++++-
 .../runner/app/ItIgniteNodeRestartTest.java        |  66 +++++++++-
 .../internal/sql/engine/ItFunctionsTest.java       |   2 +-
 .../internal/sql/engine/ItMixedQueriesTest.java    |   2 -
 .../internal/sql/engine/SqlQueryProcessor.java     |   4 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |  94 +++++++++-----
 .../sql/engine/externalize/RelInputEx.java         |   8 --
 .../sql/engine/externalize/RelJsonReader.java      |  42 ++-----
 .../sql/engine/message/QueryStartRequest.java      |   5 +
 .../internal/sql/engine/rel/IgniteTableModify.java |   8 +-
 .../engine/rel/ProjectableFilterableTableScan.java |  14 +--
 .../internal/sql/engine/schema/IgniteSchema.java   |  28 ++++-
 .../sql/engine/schema/SqlSchemaManager.java        |  12 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    |  92 ++++++++------
 .../internal/sql/engine/util/BaseQueryContext.java |  18 ++-
 .../sql/engine/util/HashFunctionFactoryImpl.java   |   2 +-
 .../internal/sql/engine/StopCalciteModuleTest.java |  18 ++-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  30 +++--
 .../engine/exec/schema/SqlSchemaManagerTest.java   | 140 +++------------------
 .../sql/engine/externalize/RelJsonReaderTest.java  |  77 ------------
 .../engine/framework/PredefinedSchemaManager.java  |  12 +-
 .../sql/engine/framework/TestBuilders.java         |   2 +-
 .../sql/engine/planner/AbstractPlannerTest.java    |  24 +++-
 27 files changed, 381 insertions(+), 381 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
index efca7c5fa1..d309379728 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
@@ -49,7 +49,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class VersionedValue<T> {
     /** Token until the value is initialized. */
-    private static final long NOT_INITIALIZED = -1L;
+    public static final long NOT_INITIALIZED = -1L;
 
     /** Default history size. */
     private static final int DEFAULT_MAX_HISTORY_SIZE = 10;
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 07e5e5ca37..0ce7e3f6b2 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -157,9 +157,6 @@ public class ErrorGroups {
         /** Object already exists in schema. */
         public static final int OBJECT_ALREADY_EXISTS_ERR = 
SQL_ERR_GROUP.registerErrorCode(17);
 
-        /** Table version not found. */
-        public static final int TABLE_VER_NOT_FOUND_ERR = 
SQL_ERR_GROUP.registerErrorCode(18);
-
         /** Query mapping error. */
         public static final int QUERY_MAPPING_ERR = 
SQL_ERR_GROUP.registerErrorCode(19);
 
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 424df53627..e6439336c1 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -233,7 +233,7 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
 
             return future;
         } catch (Exception ex) {
-            return CompletableFuture.failedFuture(ex);
+            return failedFuture(ex);
         } finally {
             busyLock.leaveBusy();
         }
@@ -253,7 +253,7 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
             boolean failIfNotExists
     ) {
         if (!busyLock.enterBusy()) {
-            return CompletableFuture.failedFuture(new NodeStoppingException());
+            return failedFuture(new NodeStoppingException());
         }
 
         LOG.debug("Going to drop index [schema={}, index={}]", schemaName, 
indexName);
@@ -547,7 +547,7 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
                 String newName,
                 ConfigurationNotificationEvent<TableIndexView> ctx
         ) {
-            return CompletableFuture.failedFuture(new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-16196";));
+            return failedFuture(new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-16196";));
         }
 
         /** {@inheritDoc} */
@@ -559,7 +559,7 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         /** {@inheritDoc} */
         @Override
         public @NotNull CompletableFuture<?> onUpdate(@NotNull 
ConfigurationNotificationEvent<TableIndexView> ctx) {
-            return CompletableFuture.failedFuture(new 
IllegalStateException("Should not be called"));
+            return failedFuture(new IllegalStateException("Should not be 
called"));
         }
     }
 }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
index 3324a2463c..f9f244a23c 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
@@ -59,7 +59,7 @@ public class IndexEventParameters extends EventParameters {
      * @param indexId An index identifier.
      * @param index An index instance.
      */
-    protected IndexEventParameters(long revision, UUID indexId, @Nullable 
Index<?> index) {
+    private IndexEventParameters(long revision, UUID indexId, @Nullable 
Index<?> index) {
         super(revision);
 
         this.indexId = indexId;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 10c2abe728..3771263772 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -27,11 +27,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
@@ -187,6 +190,44 @@ public class ItDataSchemaSyncTest extends 
IgniteAbstractTest {
         assertTrue(waitForCondition(() -> 
table1.schemaView().schema().version() == 2, 5_000));
     }
 
+    /**
+     * Check that sql query will wait until appropriate schema is not 
propagated into all nodes.
+     */
+    @Test
+    public void queryWaitAppropriateSchema() throws Exception {
+        Ignite ignite0 = clusterNodes.get(0);
+        IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
+
+        createTable(ignite0, TABLE_NAME);
+
+        WatchListenerInhibitor listenerInhibitor = 
WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
+
+        listenerInhibitor.startInhibit();
+
+        sql(ignite0, "CREATE INDEX idx1 ON " + TABLE_NAME + "(valint)");
+
+        CompletableFuture<Void> fut = CompletableFuture.runAsync(() -> 
sql(ignite0, "SELECT * FROM "
+                + TABLE_NAME + " WHERE valint > 0"));
+
+        try {
+            // wait a timeout to observe that query can`t be executed.
+            fut.get(1, TimeUnit.SECONDS);
+
+            fail();
+        } catch (TimeoutException e) {
+            // Expected, no op.
+        }
+
+        listenerInhibitor.stopInhibit();
+
+        // only check that request is executed without timeout.
+        ResultSet<SqlRow> rs = sql(ignite0, "SELECT * FROM " + TABLE_NAME + " 
WHERE valint > 0");
+
+        assertNotNull(rs);
+
+        rs.close();
+    }
+
     /**
      * Test correctness of schemes recovery after node restart.
      */
@@ -314,9 +355,11 @@ public class ItDataSchemaSyncTest extends 
IgniteAbstractTest {
         sql(node, "ALTER TABLE " + tableName + " ADD COLUMN valstr2 VARCHAR 
NOT NULL DEFAULT 'default'");
     }
 
-    protected void sql(Ignite node, String query, Object... args) {
+    protected ResultSet<SqlRow> sql(Ignite node, String query, Object... args) 
{
+        ResultSet<SqlRow> rs = null;
         try (Session session = node.sql().createSession()) {
-            session.execute(null, query, args);
+            rs = session.execute(null, query, args);
         }
+        return rs;
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 23eafef4d1..548e113b96 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -113,7 +113,9 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.TransactionException;
@@ -663,6 +665,68 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
         assertEquals(DEFAULT_NODE_PORT, nodePort);
     }
 
+    /**
+     * Check correctness of return results after node restart.
+     * Scenario:
+     * <ol>
+     *     <li>Start two nodes and fill the data.</li>
+     *     <li>Create index.</li>
+     *     <li>Check explain contain index scan.</li>
+     *     <li>Check return results.</li>
+     *     <li>Restart one node.</li>
+     *     <li>Run query and compare results.</li>
+     * </ol>
+     */
+    @Test
+    public void testQueryCorrectnessAfterNodeRestart() throws 
InterruptedException {
+        IgniteImpl ignite1 = startNode(0);
+
+        createTableWithoutData(ignite1, TABLE_NAME, 2, 1);
+
+        IgniteImpl ignite2 = startNode(1);
+
+        String sql = "SELECT id FROM " + TABLE_NAME + " WHERE id > 0 ORDER BY 
id";
+
+        int intRes;
+
+        try (Session session1 = ignite1.sql().createSession(); Session 
session2 = ignite2.sql().createSession()) {
+            session1.execute(null, "CREATE INDEX idx1 ON " + TABLE_NAME + 
"(id)");
+
+            waitForIndex(List.of(ignite1, ignite2), "idx1");
+
+            createTableWithData(List.of(ignite1), TABLE_NAME, 2, 1);
+
+            ResultSet<SqlRow> plan = session1.execute(null, "EXPLAIN PLAN FOR 
" + sql);
+
+            String planStr = plan.next().stringValue(0);
+
+            assertTrue(planStr.contains("IndexScan"));
+
+            ResultSet<SqlRow> res1 = session1.execute(null, sql);
+
+            ResultSet<SqlRow> res2 = session2.execute(null, sql);
+
+            intRes = res1.next().intValue(0);
+
+            assertEquals(intRes, res2.next().intValue(0));
+
+            res1.close();
+
+            res2.close();
+        }
+
+        // TODO: Uncomment after IGNITE-18203
+        /*stopNode(0);
+
+        ignite1 = startNode(0);
+
+        try (Session session1 = ignite1.sql().createSession()) {
+            ResultSet<SqlRow> res3 = session1.execute(null, sql);
+
+            assertEquals(intRes, res3.next().intValue(0));
+        }*/
+    }
+
     /**
      * Restarts a node with changing configuration.
      */
@@ -1055,7 +1119,7 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     private void createTableWithData(List<IgniteImpl> nodes, String name, int 
replicas, int partitions)
             throws InterruptedException {
         try (Session session = nodes.get(0).sql().createSession()) {
-            session.execute(null, "CREATE TABLE " + name
+            session.execute(null, "CREATE TABLE IF NOT EXISTS " + name
                     + "(id INT PRIMARY KEY, name VARCHAR) WITH replicas=" + 
replicas + ", partitions=" + partitions);
 
             waitForIndex(nodes, name + "_PK");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
index 827158a6d2..309bcfc1ed 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java
@@ -37,7 +37,7 @@ import org.junit.jupiter.api.Test;
  * Test Ignite SQL functions.
  */
 public class ItFunctionsTest extends ClusterPerClassIntegrationTest {
-    private static final Object[] NULL_RESULT = new Object[] { null };
+    private static final Object[] NULL_RESULT = { null };
 
     @Test
     public void testTimestampDiffWithFractionsOfSecond() {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
index fa249c48d7..74cab4acc8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
@@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import java.util.Arrays;
 import java.util.List;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -208,7 +207,6 @@ public class ItMixedQueriesTest extends 
ClusterPerClassIntegrationTest {
      * Verifies that table modification events are passed to a calcite schema 
modification listener.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-16679";)
     public void testIgniteSchemaAwaresAlterTableCommand() {
         String selectAllQry = "select * from test_tbl";
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index c4ea27f00a..8d084d192c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -100,10 +100,10 @@ public class SqlQueryProcessor implements QueryProcessor {
     private static final long PLANNER_TIMEOUT = 15000L;
 
     /** Size of the cache for query plans. */
-    public static final int PLAN_CACHE_SIZE = 1024;
+    private static final int PLAN_CACHE_SIZE = 1024;
 
     /** Session expiration check period in milliseconds. */
-    public static final long SESSION_EXPIRE_CHECK_PERIOD = 
TimeUnit.SECONDS.toMillis(1);
+    private static final long SESSION_EXPIRE_CHECK_PERIOD = 
TimeUnit.SECONDS.toMillis(1);
 
     /** Name of the default schema. */
     public static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 817259d19f..fd4ffaa2c1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -245,8 +245,8 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                 .build();
     }
 
-    private QueryPlan prepareFragment(String jsonFragment) {
-        IgniteRel plan = physNodesCache.computeIfAbsent(jsonFragment, ser -> 
fromJson(sqlSchemaManager, ser));
+    private QueryPlan prepareFragment(String jsonFragment, BaseQueryContext 
ctx) {
+        IgniteRel plan = physNodesCache.computeIfAbsent(jsonFragment, ser -> 
fromJson(ctx, ser));
 
         return new FragmentPlan(plan);
     }
@@ -260,8 +260,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             case DML:
                 // TODO a barrier between previous operation and this one
             case QUERY:
-                return executeQuery(tx, ctx, (MultiStepPlan) plan
-                );
+                return executeQuery(tx, ctx, (MultiStepPlan) plan);
             case EXPLAIN:
                 return executeExplain((ExplainPlan) plan);
             case DDL:
@@ -321,13 +320,20 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     private void onMessage(String nodeName, QueryStartRequest msg) {
         assert nodeName != null && msg != null;
 
-        DistributedQueryManager queryManager = 
queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
-            BaseQueryContext ctx = createQueryContext(key, msg.schema(), 
msg.parameters());
+        CompletableFuture<?> fut = 
sqlSchemaManager.actualSchemaAsync(msg.schemaVersion());
 
-            return new DistributedQueryManager(ctx);
-        });
+        if (fut.isDone()) {
+            submitFragment(nodeName, msg);
+        } else {
+            fut.whenComplete((mgr, ex) -> {
+                if (ex != null) {
+                    handleError(ex, nodeName, msg);
+                    return;
+                }
 
-        queryManager.submitFragment(nodeName, msg.root(), 
msg.fragmentDescription(), msg.txAttributes());
+                taskExecutor.execute(msg.queryId(), msg.fragmentId(), () -> 
submitFragment(nodeName, msg));
+            });
+        }
     }
 
     private void onMessage(String nodeName, QueryStartResponse msg) {
@@ -395,6 +401,28 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         return mgr.localFragments();
     }
 
+    private void submitFragment(String nodeName, QueryStartRequest msg) {
+        DistributedQueryManager queryManager = getOrCreateQueryManager(msg);
+
+        queryManager.submitFragment(nodeName, msg.root(), 
msg.fragmentDescription(), msg.txAttributes());
+    }
+
+    private void handleError(Throwable ex, String nodeName, QueryStartRequest 
msg) {
+        DistributedQueryManager queryManager = getOrCreateQueryManager(msg);
+
+        queryManager.handleError(ex, nodeName, 
msg.fragmentDescription().fragmentId());
+    }
+
+    private DistributedQueryManager getOrCreateQueryManager(QueryStartRequest 
msg) {
+        DistributedQueryManager queryManager = 
queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
+            BaseQueryContext ctx = createQueryContext(key, msg.schema(), 
msg.parameters());
+
+            return new DistributedQueryManager(ctx);
+        });
+
+        return queryManager;
+    }
+
     /**
      * A convenient class that manages the initialization and termination of 
distributed queries.
      */
@@ -414,9 +442,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         private volatile Long rootFragmentId = null;
 
 
-        private DistributedQueryManager(
-                BaseQueryContext ctx
-        ) {
+        private DistributedQueryManager(BaseQueryContext ctx) {
             this.ctx = ctx;
 
             var root = new CompletableFuture<AsyncRootNode<RowT, 
List<Object>>>();
@@ -445,6 +471,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
                     .fragmentDescription(desc)
                     .parameters(ctx.parameters())
                     .txAttributes(txAttributes)
+                    .schemaVersion(ctx.schemaVersion())
                     .build();
 
             var fut = new CompletableFuture<Void>();
@@ -554,30 +581,39 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
             );
         }
 
-        private void submitFragment(String initiatorNode, String 
fragmentString, FragmentDescription desc, TxAttributes txAttributes) {
+        private void submitFragment(
+                String initiatorNode,
+                String fragmentString,
+                FragmentDescription desc,
+                TxAttributes txAttributes
+        ) {
             try {
-                QueryPlan qryPlan = prepareFragment(fragmentString);
+                QueryPlan qryPlan = prepareFragment(fragmentString, ctx);
 
                 FragmentPlan plan = (FragmentPlan) qryPlan;
 
                 executeFragment(plan, createContext(initiatorNode, desc, 
txAttributes));
             } catch (Throwable ex) {
-                LOG.debug("Unable to start query fragment", ex);
+                handleError(ex, initiatorNode, desc.fragmentId());
+            }
+        }
 
-                try {
-                    msgSrvc.send(
-                            initiatorNode,
-                            FACTORY.queryStartResponse()
-                                    .queryId(ctx.queryId())
-                                    .fragmentId(desc.fragmentId())
-                                    .error(ex)
-                                    .build()
-                    );
-                } catch (Exception e) {
-                    LOG.info("Unable to send error message", e);
-
-                    close(true);
-                }
+        private void handleError(Throwable ex, String initiatorNode, long 
fragmentId) {
+            LOG.debug("Unable to start query fragment", ex);
+
+            try {
+                msgSrvc.send(
+                        initiatorNode,
+                        FACTORY.queryStartResponse()
+                                .queryId(ctx.queryId())
+                                .fragmentId(fragmentId)
+                                .error(ex)
+                                .build()
+                );
+            } catch (Exception e) {
+                LOG.info("Unable to send error message", e);
+
+                close(true);
             }
         }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
index a614438074..15b0e9f2eb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelInputEx.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.externalize;
 
 import java.util.List;
-import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
@@ -37,13 +36,6 @@ public interface RelInputEx extends RelInput {
      */
     RelCollation getCollation(String tag);
 
-    /**
-     * Returns table by its id.
-     *
-     * @return A table with given id.
-     */
-    RelOptTable getTableById();
-
     /**
      * Returns search bounds.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index 645256b1a9..dd929a2ac2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -30,12 +30,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.UUID;
 import java.util.function.Function;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
@@ -50,8 +49,7 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.sql.SqlException;
 
@@ -61,15 +59,14 @@ import org.apache.ignite.sql.SqlException;
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class RelJsonReader {
-    private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF 
= new TypeReference<>() {
-    };
+    private static final TypeReference<LinkedHashMap<String, Object>> TYPE_REF 
= new TypeReference<>() {};
 
     private final ObjectMapper mapper = new 
ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
 
-    private final SqlSchemaManager schemaManager;
-
     private final RelJson relJson;
 
+    private final RelOptSchema relOptSchema;
+
     private final Map<String, RelNode> relMap = new LinkedHashMap<>();
 
     private RelNode lastRel;
@@ -78,8 +75,8 @@ public class RelJsonReader {
      * FromJson.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public static <T extends RelNode> T fromJson(SqlSchemaManager 
schemaManager, String json) {
-        RelJsonReader reader = new RelJsonReader(schemaManager);
+    public static <T extends RelNode> T fromJson(BaseQueryContext ctx, String 
json) {
+        RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
 
         return (T) reader.read(json);
     }
@@ -88,8 +85,8 @@ public class RelJsonReader {
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
-    public RelJsonReader(SqlSchemaManager schemaManager) {
-        this.schemaManager = schemaManager;
+    public RelJsonReader(RelOptSchema relOptSchema) {
+        this.relOptSchema = relOptSchema;
 
         relJson = new RelJson();
     }
@@ -147,25 +144,8 @@ public class RelJsonReader {
         /** {@inheritDoc} */
         @Override
         public RelOptTable getTable(String table) {
-            // For deserialization #getTableById() should be used instead 
because
-            // it's the only way to find out that someone just recreate the 
table
-            // (probably with different schema) with the same name while the 
plan
-            // was serialized
-            throw new AssertionError("Unexpected method was called");
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public RelOptTable getTableById() {
-            String tableId = getString("tableId");
-            int ver = ((Number) get("tableVer")).intValue();
-
-            IgniteTable table = 
schemaManager.tableById(UUID.fromString(tableId), ver);
-
-            List<String> tableName = getStringList("table");
-
-            return RelOptTableImpl.create(null, 
table.getRowType(Commons.typeFactory()), tableName,
-                    table, c -> null);
+            List<String> list = getStringList(table);
+            return relOptSchema.getTableForMember(list);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index d8f000b490..f104188038 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -54,4 +54,9 @@ public interface QueryStartRequest extends 
ExecutionContextAwareMessage {
      */
     @Marshallable
     TxAttributes txAttributes();
+
+    /**
+     * Return last schema version, just a stub, need to be removed after 
IGNITE-18733.
+     */
+    long schemaVersion();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
index e78d754877..e4b3fc3bf6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteTableModify.java
@@ -28,7 +28,6 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rex.RexNode;
-import org.apache.ignite.internal.sql.engine.externalize.RelInputEx;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 
@@ -77,7 +76,7 @@ public class IgniteTableModify extends TableModify implements 
IgniteRel {
         this(
                 input.getCluster(),
                 input.getTraitSet().replace(IgniteConvention.INSTANCE),
-                ((RelInputEx) input).getTableById(),
+                input.getTable("table"),
                 input.getInput(),
                 input.getEnum("operation", Operation.class),
                 input.getStringList("updateColumnList"),
@@ -115,10 +114,9 @@ public class IgniteTableModify extends TableModify 
implements IgniteRel {
 
     @Override
     public RelWriter explainTerms(RelWriter pw) {
+        // for correct rel obtaining from ExecutionServiceImpl#physNodesCache.
         return super.explainTerms(pw)
                 .itemIf("tableId", 
getTable().unwrap(IgniteTable.class).id().toString(),
-                        pw.getDetailLevel() == ALL_ATTRIBUTES)
-                .itemIf("tableVer", 
getTable().unwrap(IgniteTable.class).version(),
-                        pw.getDetailLevel() == ALL_ATTRIBUTES);
+                pw.getDetailLevel() == ALL_ATTRIBUTES);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
index 516cb77249..e264aded24 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/ProjectableFilterableTableScan.java
@@ -44,7 +44,6 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.ControlFlowException;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.sql.engine.externalize.RelInputEx;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -88,13 +87,7 @@ public abstract class ProjectableFilterableTableScan extends 
TableScan {
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     protected ProjectableFilterableTableScan(RelInput input) {
-        super(
-                input.getCluster(),
-                input.getTraitSet(),
-                List.of(),
-                ((RelInputEx) input).getTableById()
-        );
-
+        super(input);
         condition = input.getExpression("filters");
         projects = input.get("projects") == null ? null : 
input.getExpressionList("projects");
         requiredColumns = input.get("requiredColumns") == null ? null : 
input.getBitSet("requiredColumns");
@@ -135,10 +128,7 @@ public abstract class ProjectableFilterableTableScan 
extends TableScan {
         return explainTerms0(pw
                 .item("table", table.getQualifiedName())
                 .itemIf("tableId", 
table.unwrap(IgniteTable.class).id().toString(),
-                        pw.getDetailLevel() == ALL_ATTRIBUTES)
-                .itemIf("tableVer", table.unwrap(IgniteTable.class).version(),
-                        pw.getDetailLevel() == ALL_ATTRIBUTES)
-        );
+                pw.getDetailLevel() == ALL_ATTRIBUTES));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 01d2fba58d..2ecdfdbe0e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -35,6 +35,8 @@ public class IgniteSchema extends AbstractSchema {
 
     private final Map<UUID, IgniteIndex> idxMap;
 
+    private final long schemaVersion;
+
     /**
      * Creates a Schema with given tables and indexes.
      *
@@ -45,11 +47,13 @@ public class IgniteSchema extends AbstractSchema {
     public IgniteSchema(
             String schemaName,
             @Nullable Map<String, Table> tableMap,
-            @Nullable Map<UUID, IgniteIndex> indexMap
+            @Nullable Map<UUID, IgniteIndex> indexMap,
+            long schemaVersion
     ) {
         this.schemaName = schemaName;
         this.tblMap = tableMap == null ? new ConcurrentHashMap<>() : new 
ConcurrentHashMap<>(tableMap);
         this.idxMap = indexMap == null ? new ConcurrentHashMap<>() : new 
ConcurrentHashMap<>(indexMap);
+        this.schemaVersion = schemaVersion;
     }
 
     /**
@@ -58,11 +62,20 @@ public class IgniteSchema extends AbstractSchema {
      * @param schemaName A name of the schema to create.
      */
     public IgniteSchema(String schemaName) {
-        this(schemaName, null, null);
+        this(schemaName, null, null, -1);
+    }
+
+    /**
+     * Creates an empty Schema.
+     *
+     * @param schemaName A name of the schema to create.
+     */
+    public IgniteSchema(String schemaName, long schemaVersion) {
+        this(schemaName, null, null, schemaVersion);
     }
 
-    public static IgniteSchema copy(IgniteSchema old) {
-        return new IgniteSchema(old.schemaName, old.tblMap, old.idxMap);
+    public static IgniteSchema copy(IgniteSchema old, long schemaVersion) {
+        return new IgniteSchema(old.schemaName, old.tblMap, old.idxMap, 
schemaVersion);
     }
 
     /**
@@ -127,4 +140,11 @@ public class IgniteSchema extends AbstractSchema {
     public IgniteIndex index(UUID indexId) {
         return idxMap.get(indexId);
     }
+
+    /**
+     * Return actual schema version.
+     */
+    public long schemaVersion() {
+        return schemaVersion;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
index 251d4994f9..75462e7a47 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManager.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.internal.sql.engine.schema;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import org.apache.calcite.schema.SchemaPlus;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * SchemaHolder interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Sql schemas operations interface.
  */
 public interface SqlSchemaManager {
     /**
@@ -35,9 +35,13 @@ public interface SqlSchemaManager {
      * Returns a table by given id.
      *
      * @param id An id of required table.
-     * @param ver Minimal required version.
      *
      * @return The table.
      */
-    IgniteTable tableById(UUID id, int ver);
+    IgniteTable tableById(UUID id);
+
+    /**
+     * Wait for {@code ver} schema version, just a stub, need to be removed 
after IGNITE-18733.
+     */
+    CompletableFuture<?> actualSchemaAsync(long ver);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index db5a68e378..ee91d68700 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.sql.engine.schema;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.causality.VersionedValue.NOT_INITIALIZED;
 import static 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.OBJECT_NOT_FOUND_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.SCHEMA_EVALUATION_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Sql.TABLE_VER_NOT_FOUND_ERR;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.ints.IntList;
@@ -35,12 +36,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.causality.OutdatedTokenException;
 import org.apache.ignite.internal.causality.VersionedValue;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.index.Index;
@@ -57,7 +60,6 @@ import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -69,10 +71,10 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
     private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
 
     private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
+    private final Map<UUID, CompletableFuture<?>> pkIdxReady = new 
ConcurrentHashMap<>();
 
     private final VersionedValue<Map<UUID, IgniteIndex>> indicesVv;
 
-    private final TableManager tableManager;
     private final SchemaManager schemaManager;
     private final ReplicaService replicaService;
     private final HybridClock clock;
@@ -96,7 +98,6 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager 
{
             Consumer<Function<Long, CompletableFuture<?>>> registry,
             IgniteSpinBusyLock busyLock
     ) {
-        this.tableManager = tableManager;
         this.schemaManager = schemaManager;
         this.replicaService = replicaService;
         this.clock = clock;
@@ -145,63 +146,62 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
     public SchemaPlus schema(@Nullable String schema) {
         SchemaPlus schemaPlus = calciteSchemaVv.latest();
 
+        // stub for waiting pk indexes, more clear place is IgniteSchema
+        
CompletableFuture.allOf(pkIdxReady.values().toArray(CompletableFuture[]::new)).join();
+
         return schema != null ? schemaPlus.getSubSchema(schema) : 
schemaPlus.getSubSchema(DEFAULT_SCHEMA_NAME);
     }
 
     /** {@inheritDoc} */
     @Override
-    @NotNull
-    public IgniteTable tableById(UUID id, int ver) {
+    public CompletableFuture<?> actualSchemaAsync(long ver) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
         try {
-            IgniteTable table = tablesVv.latest().get(id);
-
-            // there is a chance that someone tries to resolve table before
-            // the distributed event of that table creation has been processed
-            // by TableManager, so we need to get in sync with the TableManager
-            if (table == null || ver > table.version()) {
-                table = awaitLatestTableSchema(id);
+            if (ver == NOT_INITIALIZED) {
+                return completedFuture(null);
             }
 
-            if (table == null) {
-                throw new IgniteInternalException(OBJECT_NOT_FOUND_ERR,
-                        IgniteStringFormatter.format("Table not found 
[tableId={}]", id));
-            }
+            CompletableFuture<SchemaPlus> lastSchemaFut;
 
-            if (table.version() < ver) {
-                throw new IgniteInternalException(TABLE_VER_NOT_FOUND_ERR,
-                        IgniteStringFormatter.format("Table version not found 
[tableId={}, requiredVer={}, latestKnownVer={}]",
-                                id, ver, table.version()));
+            try {
+                lastSchemaFut = calciteSchemaVv.get(ver);
+            } catch (OutdatedTokenException e) {
+                return completedFuture(null);
             }
 
-            return table;
+            return lastSchemaFut;
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    public void registerListener(SchemaUpdateListener listener) {
-        listeners.add(listener);
-    }
-
-    private @Nullable IgniteTable awaitLatestTableSchema(UUID tableId) {
+    /** {@inheritDoc} */
+    @Override
+    @NotNull
+    public IgniteTable tableById(UUID id) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
         try {
-            TableImpl table = tableManager.table(tableId);
+            IgniteTable table = tablesVv.latest().get(id);
 
             if (table == null) {
-                return null;
+                throw new IgniteInternalException(OBJECT_NOT_FOUND_ERR,
+                        format("Table not found [tableId={}]", id));
             }
 
-            table.schemaView().waitLatestSchema();
-
-            return convert(table);
-        } catch (NodeStoppingException e) {
-            throw new IgniteInternalException(NODE_STOPPING_ERR, e);
+            return table;
+        } finally {
+            busyLock.leaveBusy();
         }
     }
 
+    public void registerListener(SchemaUpdateListener listener) {
+        listeners.add(listener);
+    }
+
     /**
      * OnSqlTypeCreated.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -223,10 +223,12 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                 IgniteSchema schema = res.compute(schemaName,
-                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
+                        (k, v) -> v == null ? new IgniteSchema(schemaName, 
causalityToken) : IgniteSchema.copy(v, causalityToken));
 
                 CompletableFuture<IgniteTableImpl> igniteTableFuture = 
convert(causalityToken, table);
 
+                pkIdxReady.put(table.tableId(), new CompletableFuture<>());
+
                 return tablesVv.update(causalityToken, (tables, ex) ->
                                 inBusyLock(busyLock, () -> {
                                     if (ex != null) {
@@ -295,13 +297,15 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                 IgniteSchema schema = res.compute(schemaName,
-                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
+                        (k, v) -> v == null ? new IgniteSchema(schemaName, 
causalityToken) : IgniteSchema.copy(v, causalityToken));
 
                 IgniteTable table = (IgniteTable) schema.getTable(tableName);
 
                 if (table != null) {
                     schema.removeTable(tableName);
 
+                    pkIdxReady.remove(table.id());
+
                     return tablesVv.update(causalityToken, (tables, ex) -> 
inBusyLock(busyLock, () -> {
                         if (ex != null) {
                             return failedFuture(ex);
@@ -414,7 +418,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                 IgniteSchema schema = res.compute(schemaName,
-                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
+                        (k, v) -> v == null ? new IgniteSchema(schemaName, 
causalityToken) : IgniteSchema.copy(v, causalityToken));
 
                 return tablesVv.update(
                         causalityToken,
@@ -441,7 +445,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                                         resIdxs.put(index.id(), schemaIndex);
 
-                                        return 
CompletableFuture.completedFuture(resIdxs);
+                                        return completedFuture(resIdxs);
                                     })
                             ).thenCompose(ignore -> {
                                 table.addIndex(schemaIndex);
@@ -454,6 +458,15 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 ).thenCompose(v -> completedFuture(res));
             }));
 
+            // this stub is necessary for observing pk index creation.
+            schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) 
-> {
+                CompletableFuture<?> pkFut = pkIdxReady.get(index.tableId());
+                // this listener is called repeatedly on node stop.
+                if (pkFut != null) {
+                    pkFut.complete(null);
+                }
+            });
+
             return calciteSchemaVv.get(causalityToken);
         } finally {
             busyLock.leaveBusy();
@@ -481,9 +494,10 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
                 IgniteSchema schema = res.compute(schemaName,
-                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
+                        (k, v) -> v == null ? new IgniteSchema(schemaName, 
causalityToken) : IgniteSchema.copy(v, causalityToken));
 
                 IgniteIndex rmvIndex = schema.removeIndex(indexId);
+
                 if (rmvIndex != null) {
                     return tablesVv.update(
                             causalityToken,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 66bf84df95..3985f225c8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -50,9 +50,9 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
-import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
 import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.util.ArrayUtils;
 
@@ -157,7 +157,7 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
     private CalciteCatalogReader catalogReader;
 
-    private long plannerTimeout;
+    private final long plannerTimeout;
 
     /**
      * Private constructor, used by a builder.
@@ -232,6 +232,10 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
         return plannerTimeout;
     }
 
+    public long schemaVersion() {
+        return 
Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).schemaVersion();
+    }
+
     /**
      * Returns calcite catalog reader.
      */
@@ -292,8 +296,6 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
 
         private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY;
 
-        private TxAttributes txAttributes;
-
         private long plannerTimeout;
 
         public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
@@ -321,18 +323,14 @@ public final class BaseQueryContext extends 
AbstractQueryContext {
             return this;
         }
 
-        public Builder txAttributes(TxAttributes txAttributes) {
-            this.txAttributes = txAttributes;
-            return this;
-        }
-
         public Builder plannerTimeout(long plannerTimeout) {
             this.plannerTimeout = plannerTimeout;
             return this;
         }
 
         public BaseQueryContext build() {
-            return new BaseQueryContext(queryId, frameworkCfg, cancel, 
parameters, log, plannerTimeout);
+            return new BaseQueryContext(queryId, frameworkCfg, cancel, 
parameters,
+                    log, plannerTimeout);
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
index 10c60656dd..9cac261113 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/HashFunctionFactoryImpl.java
@@ -46,7 +46,7 @@ public class HashFunctionFactoryImpl<T> implements 
HashFunctionFactory<T> {
     public RowHashFunction<T> create(int[] fields, UUID tableId) {
         int fieldCnt = fields.length;
         NativeType[] fieldTypes = new NativeType[fieldCnt];
-        TableDescriptor tblDesc = sqlSchemaManager.tableById(tableId, 
-1).descriptor();
+        TableDescriptor tblDesc = 
sqlSchemaManager.tableById(tableId).descriptor();
         ImmutableIntList colocationColumns = tblDesc.distribution().getKeys();
 
         assert colocationColumns.size() == fieldCnt : "fieldsCount=" + 
fieldCnt + ", colocationColumns=" + colocationColumns;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 5c800ed23c..3c672e2748 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -49,6 +49,8 @@ import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.index.IndexManager;
+import org.apache.ignite.internal.index.event.IndexEvent;
+import org.apache.ignite.internal.index.event.IndexEventParameters;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.EventListener;
@@ -64,6 +66,7 @@ import 
org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.InternalTable;
@@ -140,6 +143,8 @@ public class StopCalciteModuleTest {
 
     private final ClusterNode localNode = new ClusterNode("mock-node-id", 
NODE_NAME, null);
 
+    private UUID tblId = UUID.randomUUID();
+
     /**
      * Before.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -168,12 +173,21 @@ public class StopCalciteModuleTest {
         doAnswer(invocation -> {
             EventListener<TableEventParameters> clo = 
(EventListener<TableEventParameters>) invocation.getArguments()[1];
 
-            clo.notify(new TableEventParameters(0, UUID.randomUUID(), "TEST", 
new TableImpl(tbl, schemaReg, new HeapLockManager())),
+            clo.notify(new TableEventParameters(0, tblId, "TEST", new 
TableImpl(tbl, schemaReg, new HeapLockManager())),
                     null);
 
             return null;
         }).when(tableManager).listen(eq(TableEvent.CREATE), any());
 
+        doAnswer(invocation -> {
+            EventListener<IndexEventParameters> clo = 
(EventListener<IndexEventParameters>) invocation.getArguments()[1];
+
+            clo.notify(new IndexEventParameters(0, 
TestHashIndex.create(List.of("ID"), "pk_idx", tblId)),
+                    null);
+
+            return null;
+        }).when(indexManager).listen(eq(IndexEvent.CREATE), any());
+
         RowAssembler asm = new RowAssembler(schemaReg.schema());
 
         asm.appendInt(0);
@@ -227,7 +241,7 @@ public class StopCalciteModuleTest {
                 clock
         );
 
-        when(tbl.tableId()).thenReturn(UUID.randomUUID());
+        when(tbl.tableId()).thenReturn(tblId);
         when(tbl.primaryReplicas()).thenReturn(List.of(new 
PrimaryReplica(localNode, -1L)));
 
         when(txManager.begin(anyBoolean())).thenReturn(new 
NoOpTransaction(localNode.name()));
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index aee1b4c8a8..5f944a52ff 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -30,7 +30,7 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -39,12 +39,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
+import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
@@ -119,7 +121,7 @@ public class ExecutionServiceImplTest {
     private final TestTable table = createTable("TEST_TBL", 1_000_000, 
IgniteDistributions.random(),
             "ID", NativeTypes.INT32, "VAL", NativeTypes.INT32);
 
-    private final IgniteSchema schema = new IgniteSchema("PUBLIC", 
Map.of(table.name(), table), null);
+    private final IgniteSchema schema = new IgniteSchema("PUBLIC", 
Map.of(table.name(), table), null, -1);
 
     private TestCluster testCluster;
     private List<ExecutionServiceImpl<?>> executionServices;
@@ -147,7 +149,7 @@ public class ExecutionServiceImplTest {
     public void testCloseByCursor() throws Exception {
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT *  FROM test_tbl", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
@@ -182,7 +184,7 @@ public class ExecutionServiceImplTest {
     public void testCancelOnInitiator() throws InterruptedException {
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT *  FROM test_tbl", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
@@ -217,7 +219,7 @@ public class ExecutionServiceImplTest {
     public void testInitializationFailedOnRemoteNode() throws 
InterruptedException {
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT *  FROM test_tbl", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
@@ -270,7 +272,7 @@ public class ExecutionServiceImplTest {
 
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT *  FROM test_tbl", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
@@ -297,7 +299,7 @@ public class ExecutionServiceImplTest {
     public void testCancelOnRemote() throws InterruptedException {
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT *  FROM test_tbl", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
 
@@ -333,7 +335,7 @@ public class ExecutionServiceImplTest {
     public void testCursorIsClosedAfterAllDataRead() throws 
InterruptedException {
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT *  FROM test_tbl", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         InternalTransaction tx = new NoOpTransaction(nodeNames.get(0));
         AsyncCursor<List<Object>> cursor = execService.executePlan(tx, plan, 
ctx);
@@ -360,7 +362,7 @@ public class ExecutionServiceImplTest {
     public void testCursorIsClosedAfterAllDataRead2() throws 
InterruptedException {
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("SELECT *  FROM test_tbl", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         InternalTransaction tx = new NoOpTransaction(nodeNames.get(0));
         AsyncCursor<List<Object>> cursor = execService.executePlan(tx, plan, 
ctx);
@@ -400,7 +402,15 @@ public class ExecutionServiceImplTest {
 
         when(topologyService.localMember()).thenReturn(clusterNode);
 
-        when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
+        when(schemaManagerMock.tableById(any())).thenReturn(table);
+
+        
when(schemaManagerMock.actualSchemaAsync(isA(long.class))).thenReturn(CompletableFuture.completedFuture(null));
+
+        CalciteSchema rootSch = CalciteSchema.createRootSchema(false);
+        rootSch.add(schema.getName(), schema);
+        SchemaPlus plus = rootSch.plus();
+
+        when(schemaManagerMock.schema(any())).thenReturn(plus);
 
         var executionService = new ExecutionServiceImpl<>(
                 messageService,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index 3f19350c2d..5a6bb03f48 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -20,21 +20,15 @@ package org.apache.ignite.internal.sql.engine.exec.schema;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
@@ -55,6 +49,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
@@ -64,8 +59,6 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.NodeStoppingException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -82,10 +75,8 @@ public class SqlSchemaManagerTest {
 
     private final UUID indexId = UUID.randomUUID();
 
-    private final int tableVer = 1;
-
     private final SchemaDescriptor schemaDescriptor = new SchemaDescriptor(
-            tableVer,
+            1,
             new Column[]{new Column(0, "ID", NativeTypes.INT64, false)},
             new Column[]{new Column(1, "VAL", NativeTypes.INT64, false)}
     );
@@ -136,116 +127,6 @@ public class SqlSchemaManagerTest {
         testRevisionRegister.moveForward();
     }
 
-    @Test
-    public void testNonExistingTable() throws NodeStoppingException {
-        UUID tblId = UUID.randomUUID();
-
-        IgniteInternalException ex = 
assertThrows(IgniteInternalException.class, () -> 
sqlSchemaManager.tableById(tblId, tableVer));
-        assertThat(ex.getMessage(), containsString("Table not found"));
-
-        Mockito.verify(tableManager).table(eq(tblId));
-
-        verifyNoMoreInteractions(tableManager);
-    }
-
-    @Test
-    public void testTableEventIsNotProcessed() throws NodeStoppingException {
-        when(tableManager.table(eq(tableId))).thenReturn(table);
-        when(table.schemaView()).thenReturn(schemaRegistry);
-
-        InternalTable mock = mock(InternalTable.class);
-        when(mock.tableId()).thenReturn(tableId);
-
-        when(table.internalTable()).thenReturn(mock);
-        when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
-        
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
-
-        when(schemaManager.schemaRegistry(any())).thenReturn(schemaRegistry);
-
-        IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer);
-
-        assertEquals(tableId, actTable.id());
-
-        Mockito.verify(tableManager).table(eq(tableId));
-
-        verifyNoMoreInteractions(tableManager);
-    }
-
-    @Test
-    public void testTableEventIsProcessedRequiredVersionIsSame() {
-        InternalTable mock = mock(InternalTable.class);
-        when(mock.tableId()).thenReturn(tableId);
-        when(mock.name()).thenReturn("PUBLIC.T");
-
-        when(table.internalTable()).thenReturn(mock);
-        when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
-        
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
-
-        when(schemaManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(schemaRegistry));
-
-        sqlSchemaManager.onTableCreated("PUBLIC", table, 
testRevisionRegister.actualToken() + 1);
-        testRevisionRegister.moveForward();
-
-        IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer);
-
-        assertEquals(tableId, actTable.id());
-
-        verifyNoMoreInteractions(tableManager);
-    }
-
-    @Test
-    public void testTableEventIsProcessedRequiredVersionIsLess() {
-        InternalTable mock = mock(InternalTable.class);
-        when(mock.tableId()).thenReturn(tableId);
-        when(mock.name()).thenReturn("PUBLIC.T");
-
-        when(table.internalTable()).thenReturn(mock);
-        when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
-        
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
-
-        when(schemaManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(schemaRegistry));
-
-        sqlSchemaManager.onTableCreated("PUBLIC", table, 
testRevisionRegister.actualToken() + 1);
-        testRevisionRegister.moveForward();
-
-        IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer - 
1);
-
-        assertEquals(tableId, actTable.id());
-
-        verifyNoMoreInteractions(tableManager);
-    }
-
-    @Test
-    public void testTableEventIsProcessedRequiredVersionIsGreater() throws 
NodeStoppingException {
-        when(table.schemaView()).thenReturn(schemaRegistry);
-
-        InternalTable mock = mock(InternalTable.class);
-        when(mock.tableId()).thenReturn(tableId);
-        when(mock.name()).thenReturn("PUBLIC.T");
-
-        when(table.internalTable()).thenReturn(mock);
-        when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
-        when(schemaRegistry.lastSchemaVersion()).thenReturn(tableVer - 1);
-        when(schemaManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(schemaRegistry));
-        when(schemaManager.schemaRegistry(any())).thenReturn(schemaRegistry);
-
-        sqlSchemaManager.onTableCreated("PUBLIC", table, 
testRevisionRegister.actualToken() + 1);
-        testRevisionRegister.moveForward();
-
-        when(tableManager.table(eq(tableId))).thenReturn(table);
-        when(schemaRegistry.lastSchemaVersion()).thenReturn(tableVer);
-
-        IgniteTable actTable = sqlSchemaManager.tableById(tableId, tableVer);
-        assertEquals(tableId, actTable.id());
-
-        IgniteInternalException ex = 
assertThrows(IgniteInternalException.class, () -> 
sqlSchemaManager.tableById(tableId, tableVer + 1));
-        assertThat(ex.getMessage(), containsString("Table version not found"));
-
-        Mockito.verify(tableManager, times(2)).table(eq(tableId));
-
-        verifyNoMoreInteractions(tableManager);
-    }
-
     @Test
     public void testOnTableDroppedHandler() {
         when(table.name()).thenReturn("T");
@@ -255,6 +136,7 @@ public class SqlSchemaManagerTest {
         when(mock.name()).thenReturn("T");
 
         when(table.internalTable()).thenReturn(mock);
+        when(table.tableId()).thenReturn(tableId);
         when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
         
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
 
@@ -262,6 +144,9 @@ public class SqlSchemaManagerTest {
 
         sqlSchemaManager.onTableCreated("PUBLIC", table, 
testRevisionRegister.actualToken() + 1);
         testRevisionRegister.moveForward();
+        sqlSchemaManager.onIndexCreated(
+                TestHashIndex.create(List.of("ID"), "pk_idx", tableId), 
testRevisionRegister.actualToken() + 1);
+        testRevisionRegister.moveForward();
 
         Table schemaTable = sqlSchemaManager.schema("PUBLIC").getTable("T");
 
@@ -282,6 +167,7 @@ public class SqlSchemaManagerTest {
         when(mock.name()).thenReturn("T");
 
         when(table.internalTable()).thenReturn(mock);
+        when(table.tableId()).thenReturn(tableId);
         when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
         
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
         when(schemaManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(schemaRegistry));
@@ -289,7 +175,11 @@ public class SqlSchemaManagerTest {
         sqlSchemaManager.onTableCreated("PUBLIC", table, 
testRevisionRegister.actualToken() + 1);
         testRevisionRegister.moveForward();
 
-        assertTrue(((IgniteTableImpl) 
sqlSchemaManager.schema("PUBLIC").getTable("T")).indexes().isEmpty());
+        sqlSchemaManager.onIndexCreated(
+                TestHashIndex.create(List.of("ID"), "pk_idx", tableId), 
testRevisionRegister.actualToken() + 1);
+        testRevisionRegister.moveForward();
+
+        assertEquals(1, ((IgniteTableImpl) 
sqlSchemaManager.schema("PUBLIC").getTable("T")).indexes().size());
 
         IndexDescriptor descMock = mock(IndexDescriptor.class);
         when(descMock.columns()).thenReturn(List.of());
@@ -323,18 +213,22 @@ public class SqlSchemaManagerTest {
 
 
     @Test
-    public void testIndexEventsProcessed() throws Exception {
+    public void testIndexEventsProcessed() {
         InternalTable mock = mock(InternalTable.class);
         when(mock.tableId()).thenReturn(tableId);
         when(mock.name()).thenReturn("T");
 
         when(table.internalTable()).thenReturn(mock);
+        when(table.tableId()).thenReturn(tableId);
         when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
         
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
         when(schemaManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(schemaRegistry));
 
         sqlSchemaManager.onTableCreated("PUBLIC", table, 
testRevisionRegister.actualToken() + 1);
         testRevisionRegister.moveForward();
+        sqlSchemaManager.onIndexCreated(
+                TestHashIndex.create(List.of("ID"), "pk_idx", tableId), 
testRevisionRegister.actualToken() + 1);
+        testRevisionRegister.moveForward();
 
         IndexDescriptor descMock = mock(IndexDescriptor.class);
         when(descMock.columns()).thenReturn(List.of());
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReaderTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReaderTest.java
deleted file mode 100644
index 0d61f3a638..0000000000
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReaderTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.externalize;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.UUID;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.schema.Statistic;
-import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
-
-/**
- * Tests to verify {@link RelJsonReader} behaviour.
- */
-public class RelJsonReaderTest {
-
-    /**
-     * Test verifies that during deserialization table being resolved by its 
ID.
-     */
-    @Test
-    void fromJson() {
-        UUID tableId = UUID.randomUUID();
-        int tableVer = 2;
-
-        IgniteTable igniteTableMock = mock(IgniteTable.class);
-        when(igniteTableMock.getStatistic()).thenReturn(new Statistic() {});
-        
when(igniteTableMock.getRowType(any())).thenReturn(mock(RelDataType.class));
-
-        SqlSchemaManager schemaMock = mock(SqlSchemaManager.class);
-        when(schemaMock.tableById(tableId, 
tableVer)).thenReturn(igniteTableMock);
-
-        String json = ""
-                + "{\n"
-                + "  \"rels\" : [ {\n"
-                + "    \"id\" : \"0\",\n"
-                + "    \"relOp\" : \"IgniteTableScan\",\n"
-                + "    \"table\" : [\"PUBLIC\", \"TEST\"],\n"
-                + "    \"tableId\" : \"" + tableId + "\",\n"
-                + "    \"tableVer\" : " + tableVer + ",\n"
-                + "    \"inputs\" : [ ]\n"
-                + "  } ]\n"
-                + "}";
-
-        RelNode node = RelJsonReader.fromJson(schemaMock, json);
-
-        assertThat(node, isA(IgniteTableScan.class));
-        assertThat(node.getTable(), notNullValue());
-        assertThat(node.getTable().unwrap(IgniteTable.class), 
is(igniteTableMock));
-        Mockito.verify(schemaMock).tableById(tableId, tableVer);
-    }
-}
\ No newline at end of file
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
index bd9e9bc4ce..a34dff4c1f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/PredefinedSchemaManager.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.calcite.schema.SchemaPlus;
@@ -45,12 +46,12 @@ public class PredefinedSchemaManager implements 
SqlSchemaManager {
     private final Map<UUID, IgniteTable> tableById;
 
     /** Constructs schema manager from a single schema. */
-    public PredefinedSchemaManager(IgniteSchema schema) {
+    PredefinedSchemaManager(IgniteSchema schema) {
         this(List.of(schema));
     }
 
     /** Constructs schema manager from a collection of schemas. */
-    public PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
+    PredefinedSchemaManager(Collection<IgniteSchema> schemas) {
         this.root = Frameworks.createRootSchema(false);
         this.tableById = new HashMap<>();
 
@@ -72,9 +73,14 @@ public class PredefinedSchemaManager implements 
SqlSchemaManager {
         return schema == null ? root : root.getSubSchema(schema);
     }
 
+    @Override
+    public CompletableFuture<?> actualSchemaAsync(long ver) {
+        return CompletableFuture.completedFuture(null);
+    }
+
     /** {@inheritDoc} */
     @Override
-    public IgniteTable tableById(UUID id, int ver) {
+    public IgniteTable tableById(UUID id) {
         return tableById.get(id);
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 6e82df41da..98ff797af7 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -267,7 +267,7 @@ public class TestBuilders {
                     .map(ClusterTableBuilderImpl::build)
                     .collect(Collectors.toMap(TestTable::name, 
Function.identity()));
 
-            var schemaManager = new PredefinedSchemaManager(new 
IgniteSchema("PUBLIC", tableMap, null));
+            var schemaManager = new PredefinedSchemaManager(new 
IgniteSchema("PUBLIC", tableMap, null, -1));
 
             Map<String, TestNode> nodes = nodeNames.stream()
                     .map(name -> new TestNode(name, 
clusterService.forNode(name), schemaManager))
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 454cbfe40a..c31027e6d3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -94,7 +94,6 @@ import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
-import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
 import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.prepare.Cloner;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
@@ -698,8 +697,10 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
 
         List<RelNode> deserializedNodes = new ArrayList<>();
 
+        BaseQueryContext ctx = baseQueryContext(schemas, null);
+
         for (String s : serialized) {
-            RelJsonReader reader = new RelJsonReader(new 
PredefinedSchemaManager(schemas));
+            RelJsonReader reader = new RelJsonReader(ctx.catalogReader());
             deserializedNodes.add(reader.read(s));
         }
 
@@ -1246,20 +1247,33 @@ public abstract class AbstractPlannerTest extends 
IgniteAbstractTest {
         }
     }
 
-    static class TestHashIndex implements Index<IndexDescriptor> {
+    /** Test Hash index implementation. */
+    public static class TestHashIndex implements Index<IndexDescriptor> {
         private final UUID id = UUID.randomUUID();
 
-        private final UUID tableId = UUID.randomUUID();
+        private UUID tableId = UUID.randomUUID();
 
         private final IndexDescriptor descriptor;
 
+        /** Create index. */
+        public static TestHashIndex create(List<String> indexedColumns, String 
name, UUID tableId) {
+            var descriptor = new IndexDescriptor(name, indexedColumns);
+
+            TestHashIndex idx = new TestHashIndex(descriptor);
+
+            idx.tableId = tableId;
+
+            return idx;
+        }
+
+        /** Create index. */
         public static TestHashIndex create(List<String> indexedColumns, String 
name) {
             var descriptor = new IndexDescriptor(name, indexedColumns);
 
             return new TestHashIndex(descriptor);
         }
 
-        public TestHashIndex(IndexDescriptor descriptor) {
+        TestHashIndex(IndexDescriptor descriptor) {
             this.descriptor = descriptor;
         }
 

Reply via email to