This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 608087a806 IGNITE-17431: Sql. Index support by optimizer (#1039)
608087a806 is described below

commit 608087a806096fd33f3c72adf87d4ddce8fac6e5
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Fri Sep 2 19:00:28 2022 +0300

    IGNITE-17431: Sql. Index support by optimizer (#1039)
    
    Co-authored-by: korlov42 <[email protected]>
---
 .../apache/ignite/internal/index/IndexManager.java |  38 ----
 .../ignite/internal/index/ItIndexManagerTest.java  |   2 +-
 .../internal/sql/engine/ItIndexSpoolTest.java      |  36 +---
 .../internal/sql/engine/ItSortAggregateTest.java   |  73 +-------
 .../sql/engine/exec/ExecutionServiceImpl.java      |  26 ++-
 .../sql/engine/exec/ddl/DdlCommandHandler.java     | 198 ++++++++++-----------
 .../internal/sql/engine/schema/IgniteSchema.java   |  24 +--
 .../sql/engine/schema/IgniteTableImpl.java         |  19 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    | 165 +++++++++++------
 .../sql/engine/exec/MockedStructuresTest.java      | 133 +++++++-------
 .../engine/exec/schema/SqlSchemaManagerTest.java   |  71 +++++++-
 .../planner/SortedIndexSpoolPlannerTest.java       |  50 ++++++
 .../internal/table/distributed/TableManager.java   |  16 +-
 13 files changed, 455 insertions(+), 396 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 2da927850c..95c2129f2c 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -122,27 +122,6 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         LOG.info("Index manager stopped");
     }
 
-    /**
-     * Creates index from provided configuration changer.
-     *
-     * @param schemaName A name of the schema to create index in.
-     * @param indexName A name of the index to create.
-     * @param tableName A name of the table to create index for.
-     * @param indexChange A consumer that suppose to change the configuration 
in order to provide description of an index.
-     * @param failIfExists Flag indicates whether exception be thrown if index 
exists or not.
-     * @return {@code True} if index was created successfully, {@code false} 
otherwise.
-     * @throws IndexAlreadyExistsException If index already exists and
-     */
-    public boolean createIndex(
-            String schemaName,
-            String indexName,
-            String tableName,
-            boolean failIfExists,
-            Consumer<TableIndexChange> indexChange
-    ) {
-        return join(createIndexAsync(schemaName, indexName, tableName, 
failIfExists, indexChange)) != null;
-    }
-
     /**
      * Creates index from provided configuration changer.
      *
@@ -251,23 +230,6 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         }
     }
 
-    /**
-     * Drops the index with a given name.
-     *
-     * @param schemaName A name of the schema the index belong to.
-     * @param indexName A name of the index to drop.
-     * @param failIfNotExist Flag, which force failure, when {@code trues} if 
index doen't not exists.
-     * @return {@code True} if index was removed, {@code false} otherwise.
-     * @throws IndexNotFoundException If index doesn't exist and {@code 
failIfNotExist} param was {@code true}.
-     */
-    public boolean dropIndex(
-            String schemaName,
-            String indexName,
-            boolean failIfNotExist
-    ) {
-        return join(dropIndexAsync(schemaName, indexName, failIfNotExist));
-    }
-
     /**
      * Drops the index with a given name asynchronously.
      *
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index 95d9e30eaa..950e774b70 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -94,7 +94,7 @@ public class ItIndexManagerTest extends 
AbstractBasicIntegrationTest {
         assertThat(createEventParamHolder.get(), notNullValue());
         assertThat(index, sameInstance(createEventParamHolder.get().index()));
 
-        indexManager.dropIndex("PUBLIC", "INAME", true);
+        indexManager.dropIndexAsync("PUBLIC", "INAME", true).join();
 
         assertThat(dropEventParamHolder.get(), notNullValue());
         assertThat(index.id(), 
sameInstance(dropEventParamHolder.get().indexId()));
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index 4918964b4d..bc02b015d8 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -24,10 +24,6 @@ import static 
org.junit.jupiter.params.ParameterizedTest.ARGUMENTS_PLACEHOLDER;
 import java.util.List;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.schema.definition.ColumnType;
-import org.apache.ignite.schema.definition.TableDefinition;
 import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -52,7 +48,7 @@ public class ItIndexSpoolTest extends 
AbstractBasicIntegrationTest {
         // Remove this, indices must be dropped together with the table.
         CLUSTER_NODES.get(0).tables().tables().stream()
                 .map(Table::name)
-                .forEach(name -> sql("DROP INDEX " + name + "_JID_IDX"));
+                .forEach(name -> sql("DROP INDEX IF EXISTS " + name + 
"_JID_IDX"));
 
         CLUSTER_NODES.get(0).tables().tables().stream()
                 .map(Table::name)
@@ -81,29 +77,6 @@ public class ItIndexSpoolTest extends 
AbstractBasicIntegrationTest {
         res.forEach(r -> assertThat(r.get(0), is(r.get(1))));
     }
 
-    private Table createTable(String tableName) {
-        TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", 
tableName)
-                .columns(
-                        SchemaBuilders.column("ID", ColumnType.INT32).build(),
-                        SchemaBuilders.column("JID", 
ColumnType.INT32).asNullable(true).build(),
-                        SchemaBuilders.column("VAL", 
ColumnType.string()).asNullable(true).build()
-                )
-                .withPrimaryKey("ID")
-                .build();
-
-        Table table = 
CLUSTER_NODES.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
-                SchemaConfigurationConverter.convert(schTbl1, tblCh)
-                        .changeReplicas(2)
-                        .changePartitions(10)
-        );
-
-        CLUSTER_NODES.get(0).tables().alterTable(schTbl1.canonicalName(), 
tblCh -> SchemaConfigurationConverter.addIndex(
-                SchemaBuilders.sortedIndex(tableName + 
"_JID_IDX").addIndexColumn("JID").done().build(), tblCh)
-        );
-
-        return table;
-    }
-
     private void prepareDataSet(int rowsCount) {
         Object[][] dataRows = new Object[rowsCount][];
 
@@ -112,9 +85,12 @@ public class ItIndexSpoolTest extends 
AbstractBasicIntegrationTest {
         }
 
         for (String name : List.of("TEST0", "TEST1")) {
-            Table tbl = createTable(name);
+            sql("CREATE TABLE " + name + "(id INT PRIMARY KEY, jid INT, val 
VARCHAR) WITH replicas=2,partitions=10");
+
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-17304 
uncomment this
+            // sql("CREATE INDEX " + name + "_jid_idx ON " + name + "(jid)");
 
-            insertData(tbl, new String[]{"ID", "JID", "VAL"}, dataRows);
+            insertData("PUBLIC." + name, new String[]{"ID", "JID", "VAL"}, 
dataRows);
         }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
index d047780b3d..e0379a401a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
@@ -19,13 +19,6 @@ package org.apache.ignite.internal.sql.engine;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.schema.definition.ColumnType;
-import org.apache.ignite.schema.definition.TableDefinition;
-import org.apache.ignite.table.RecordView;
-import org.apache.ignite.table.Table;
-import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
@@ -40,68 +33,16 @@ public class ItSortAggregateTest extends 
AbstractBasicIntegrationTest {
      */
     @BeforeAll
     static void initTestData() {
-        TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "TEST")
-                .columns(
-                        SchemaBuilders.column("ID", ColumnType.INT32).build(),
-                        SchemaBuilders.column("GRP0", 
ColumnType.INT32).asNullable(true).build(),
-                        SchemaBuilders.column("GRP1", 
ColumnType.INT32).asNullable(true).build(),
-                        SchemaBuilders.column("VAL0", 
ColumnType.INT32).asNullable(true).build(),
-                        SchemaBuilders.column("VAL1", 
ColumnType.INT32).asNullable(true).build()
-                )
-                .withPrimaryKey("ID")
-                .build();
+        sql("CREATE TABLE test (id INT PRIMARY KEY, grp0 INT, grp1 INT, val0 
INT, val1 INT) WITH replicas=2,partitions=10");
+        sql("CREATE TABLE test_one_col_idx (pk INT PRIMARY KEY, col0 INT)");
 
-        TableDefinition schTbl2 = SchemaBuilders.tableBuilder("PUBLIC", 
"TEST_ONE_COL_IDX")
-                .columns(
-                        SchemaBuilders.column("PK", ColumnType.INT32).build(),
-                        SchemaBuilders.column("COL0", 
ColumnType.INT32).asNullable(true).build()
-                )
-                .withPrimaryKey("PK")
-                .build();
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-17304 uncomment 
this
+        // sql("CREATE INDEX test_idx ON test(grp0, grp1)");
+        // sql("CREATE INDEX test_one_col_idx_idx ON test_one_col_idx(col0)");
 
-        Table table = 
CLUSTER_NODES.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
-                SchemaConfigurationConverter.convert(schTbl1, tblCh)
-                        .changeReplicas(2)
-                        .changePartitions(10)
-        );
-
-        Table tblOneColIdx = 
CLUSTER_NODES.get(0).tables().createTable(schTbl2.canonicalName(), tblCh ->
-                SchemaConfigurationConverter.convert(schTbl2, tblCh)
-        );
-
-        CLUSTER_NODES.get(0).tables().alterTable(schTbl1.canonicalName(), 
tblCh ->
-                
SchemaConfigurationConverter.addIndex(SchemaBuilders.sortedIndex(schTbl1.name() 
+ "IDX")
-                        .addIndexColumn("GRP0").done()
-                        .addIndexColumn("GRP1").done()
-                        .build(), tblCh)
-        );
-        CLUSTER_NODES.get(0).tables().alterTable(schTbl2.canonicalName(), 
tblCh ->
-                
SchemaConfigurationConverter.addIndex(SchemaBuilders.sortedIndex(schTbl2.name() 
+ "IDX")
-                        .addIndexColumn("COL0").desc().done()
-                        .build(), tblCh)
-        );
-
-        RecordView<Tuple> view = table.recordView();
-        for (int i = 0; i < ROWS; i++) {
-            view.insert(
-                    null,
-                    Tuple.create()
-                            .set("ID", i)
-                            .set("GRP0", i / 10)
-                            .set("GRP1", i / 100)
-                            .set("VAL0", 1)
-                            .set("VAL1", 2)
-            );
-        }
-
-        RecordView<Tuple> view1 = tblOneColIdx.recordView();
         for (int i = 0; i < ROWS; i++) {
-            view1.insert(
-                    null,
-                    Tuple.create()
-                            .set("PK", i)
-                            .set("COL0", i)
-            );
+            sql("INSERT INTO test (id, grp0, grp1, val0, val1) VALUES (?, ?, 
?, ?, ?)", i, i / 10, i / 100, 1, 2);
+            sql("INSERT INTO test_one_col_idx (pk, col0) VALUES (?, ?)", i, i);
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 189e0949c0..5b43afe8f9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -22,14 +22,15 @@ import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -69,6 +70,7 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
@@ -248,14 +250,26 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
     }
 
     private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
-        try {
-            boolean ret = ddlCmdHnd.handle(plan.command());
+        CompletableFuture<Iterator<List<Object>>> ret = 
ddlCmdHnd.handle(plan.command())
+                .thenApply(applied -> 
List.of(List.<Object>of(applied)).iterator())
+                .exceptionally(th -> {
+                    throw convertDdlException(th);
+                });
+
+        return new AsyncWrapper<>(ret, Runnable::run);
+    }
 
-            return new 
AsyncWrapper<>(Collections.singletonList(Collections.<Object>singletonList(ret)).iterator());
-        } catch (IgniteInternalCheckedException e) {
-            throw new IgniteInternalException("Failed to execute DDL statement 
[stmt=" /*+ qry.sql()*/
+    private static RuntimeException convertDdlException(Throwable e) {
+        if (e instanceof CompletionException) {
+            e = e.getCause();
+        }
+
+        if (e instanceof IgniteInternalCheckedException) {
+            return new IgniteInternalException("Failed to execute DDL 
statement [stmt=" /*+ qry.sql()*/
                     + ", err=" + e.getMessage() + ']', e);
         }
+
+        return (e instanceof RuntimeException) ? (RuntimeException) e : new 
IgniteException(e);
     }
 
     private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 6ad4badcfd..5e1cf624eb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.ddl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
 import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convertDefaultToConfiguration;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -25,8 +27,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.NamedListView;
@@ -88,7 +94,7 @@ public class DdlCommandHandler {
     }
 
     /** Handles ddl commands. */
-    public boolean handle(DdlCommand cmd) throws 
IgniteInternalCheckedException {
+    public CompletableFuture<Boolean> handle(DdlCommand cmd) {
         validateCommand(cmd);
 
         if (cmd instanceof CreateTableCommand) {
@@ -104,9 +110,9 @@ public class DdlCommandHandler {
         } else if (cmd instanceof DropIndexCommand) {
             return handleDropIndex((DropIndexCommand) cmd);
         } else {
-            throw new IgniteInternalCheckedException("Unsupported DDL 
operation ["
+            return failedFuture(new 
IgniteInternalCheckedException("Unsupported DDL operation ["
                     + "cmdName=" + (cmd == null ? null : 
cmd.getClass().getSimpleName()) + "; "
-                    + "cmd=\"" + cmd + "\"]");
+                    + "cmd=\"" + cmd + "\"]"));
         }
     }
 
@@ -122,7 +128,7 @@ public class DdlCommandHandler {
     }
 
     /** Handles create table command. */
-    private boolean handleCreateTable(CreateTableCommand cmd) {
+    private CompletableFuture<Boolean> handleCreateTable(CreateTableCommand 
cmd) {
         Consumer<TableChange> tblChanger = tableChange -> {
             tableChange.changeColumns(columnsChange -> {
                 for (var col : cmd.columns()) {
@@ -157,42 +163,27 @@ public class DdlCommandHandler {
                 IgniteObjectName.quote(cmd.tableName())
         );
 
-        try {
-            tableManager.createTable(fullName, tblChanger);
-
-            return true;
-        } catch (TableAlreadyExistsException ex) {
-            if (!cmd.ifTableExists()) {
-                throw ex;
-            } else {
-                return false;
-            }
-        }
+        return tableManager.createTableAsync(fullName, tblChanger)
+                .thenApply(Objects::nonNull)
+                .handle(handleTableModificationResult(cmd.ifTableExists()));
     }
 
     /** Handles drop table command. */
-    private boolean handleDropTable(DropTableCommand cmd) {
+    private CompletableFuture<Boolean> handleDropTable(DropTableCommand cmd) {
         String fullName = SchemaUtils.canonicalName(
                 IgniteObjectName.quote(cmd.schemaName()),
                 IgniteObjectName.quote(cmd.tableName())
         );
-        try {
-            tableManager.dropTable(fullName);
-
-            return true;
-        } catch (TableNotFoundException ex) {
-            if (!cmd.ifTableExists()) {
-                throw ex;
-            } else {
-                return false;
-            }
-        }
+
+        return tableManager.dropTableAsync(fullName)
+                .thenApply(v -> Boolean.TRUE)
+                .handle(handleTableModificationResult(cmd.ifTableExists()));
     }
 
     /** Handles add column command. */
-    private boolean handleAlterAddColumn(AlterTableAddCommand cmd) {
+    private CompletableFuture<Boolean> 
handleAlterAddColumn(AlterTableAddCommand cmd) {
         if (nullOrEmpty(cmd.columns())) {
-            return false;
+            return completedFuture(Boolean.FALSE);
         }
 
         String fullName = SchemaUtils.canonicalName(
@@ -200,21 +191,14 @@ public class DdlCommandHandler {
                 IgniteObjectName.quote(cmd.tableName())
         );
 
-        try {
-            return addColumnInternal(fullName, cmd.columns(), 
cmd.ifColumnNotExists());
-        } catch (TableNotFoundException ex) {
-            if (!cmd.ifTableExists()) {
-                throw ex;
-            } else {
-                return false;
-            }
-        }
+        return addColumnInternal(fullName, cmd.columns(), 
cmd.ifColumnNotExists())
+                .handle(handleTableModificationResult(cmd.ifTableExists()));
     }
 
     /** Handles drop column command. */
-    private boolean handleAlterDropColumn(AlterTableDropCommand cmd) {
+    private CompletableFuture<Boolean> 
handleAlterDropColumn(AlterTableDropCommand cmd) {
         if (nullOrEmpty(cmd.columns())) {
-            return false;
+            return completedFuture(Boolean.FALSE);
         }
 
         String fullName = SchemaUtils.canonicalName(
@@ -222,35 +206,45 @@ public class DdlCommandHandler {
                 IgniteObjectName.quote(cmd.tableName())
         );
 
-        try {
-            return dropColumnInternal(fullName, cmd.columns(), 
cmd.ifColumnExists());
-        } catch (TableNotFoundException ex) {
-            if (!cmd.ifTableExists()) {
-                throw ex;
-            } else {
-                return false;
+        return dropColumnInternal(fullName, cmd.columns(), 
cmd.ifColumnExists())
+                .handle(handleTableModificationResult(cmd.ifTableExists()));
+    }
+
+    private static BiFunction<Object, Throwable, Boolean> 
handleTableModificationResult(boolean ignoreTableExistenceErrors) {
+        return (val, err) -> {
+            if (err == null) {
+                return val instanceof Boolean ? (Boolean) val : Boolean.TRUE;
+            } else if (ignoreTableExistenceErrors) {
+                Throwable err0 = err instanceof CompletionException ? 
err.getCause() : err;
+
+                if (err0 instanceof TableAlreadyExistsException || err0 
instanceof TableNotFoundException) {
+                    return Boolean.FALSE;
+                }
             }
-        }
+
+            throw (err instanceof RuntimeException) ? (RuntimeException) err : 
new CompletionException(err);
+        };
     }
 
     /** Handles create index command. */
-    private boolean handleCreateIndex(CreateIndexCommand cmd) {
+    private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand 
cmd) {
         Consumer<TableIndexChange> indexChanger = tableIndexChange -> {
             // Only sorted idx for now.
             createSortedIndexInternal(cmd, 
tableIndexChange.convert(SortedIndexChange.class));
         };
 
-        return indexManager.createIndex(
-                cmd.schemaName(),
-                cmd.indexName(),
-                cmd.tableName(),
-                !cmd.ifIndexNotExists(),
-                indexChanger);
+        return indexManager.createIndexAsync(
+                        cmd.schemaName(),
+                        cmd.indexName(),
+                        cmd.tableName(),
+                        !cmd.ifIndexNotExists(),
+                        indexChanger)
+                .thenApply(Objects::nonNull);
     }
 
     /** Handles drop index command. */
-    private boolean handleDropIndex(DropIndexCommand cmd) {
-        return indexManager.dropIndex(cmd.schemaName(), cmd.indexName(), 
!cmd.ifNotExists());
+    private CompletableFuture<Boolean> handleDropIndex(DropIndexCommand cmd) {
+        return indexManager.dropIndexAsync(cmd.schemaName(), cmd.indexName(), 
!cmd.ifNotExists());
     }
 
     /**
@@ -263,7 +257,7 @@ public class DdlCommandHandler {
         indexChange.changeColumns(colsInit -> {
             for (Pair<String, Boolean> col : cmd.columns()) {
                 //TODO: https://issues.apache.org/jira/browse/IGNITE-17563 
Pass null ordering for columns.
-                colsInit.create(col.getFirst(), colInit -> 
colInit.changeAsc(col.getSecond()));
+                colsInit.create(col.getFirst(), colInit -> 
colInit.changeAsc(!col.getSecond()));
             }
         });
     }
@@ -272,30 +266,21 @@ public class DdlCommandHandler {
      * Adds a column according to the column definition.
      *
      * @param fullName Table with schema name.
-     * @param colsDef  Columns defenitions.
-     * @param colNotExist Flag indicates exceptionally behavior in case of 
already existing column.
-     *
+     * @param colsDef Columns defenitions.
+     * @param ignoreColumnExistance Flag indicates exceptionally behavior in 
case of already existing column.
      * @return {@code true} if the full columns set is applied successfully. 
Otherwise, returns {@code false}.
      */
-    private boolean addColumnInternal(String fullName, List<ColumnDefinition> 
colsDef, boolean colNotExist) {
+    private CompletableFuture<Boolean> addColumnInternal(String fullName, 
List<ColumnDefinition> colsDef, boolean ignoreColumnExistance) {
         AtomicBoolean ret = new AtomicBoolean(true);
-        tableManager.alterTable(
+
+        return tableManager.alterTableAsync(
                 fullName,
                 chng -> chng.changeColumns(cols -> {
                     Map<String, String> colNamesToOrders = 
columnOrdersToNames(chng.columns());
 
                     List<ColumnDefinition> colsDef0;
 
-                    if (!colNotExist) {
-                        colsDef.stream()
-                                .filter(k -> 
colNamesToOrders.containsKey(k.name()))
-                                .findAny()
-                                .ifPresent(c -> {
-                                    throw new 
ColumnAlreadyExistsException(c.name());
-                                });
-
-                        colsDef0 = colsDef;
-                    } else {
+                    if (ignoreColumnExistance) {
                         colsDef0 = colsDef.stream().filter(k -> {
                             if (colNamesToOrders.containsKey(k.name())) {
                                 ret.set(false);
@@ -305,14 +290,22 @@ public class DdlCommandHandler {
                                 return true;
                             }
                         }).collect(Collectors.toList());
+                    } else {
+                        colsDef.stream()
+                                .filter(k -> 
colNamesToOrders.containsKey(k.name()))
+                                .findAny()
+                                .ifPresent(c -> {
+                                    throw new 
ColumnAlreadyExistsException(c.name());
+                                });
+
+                        colsDef0 = colsDef;
                     }
 
                     for (ColumnDefinition col : colsDef0) {
                         cols.create(col.name(), colChg -> 
convertColumnDefinition(col, colChg));
                     }
-                }));
-
-        return ret.get();
+                })
+        ).thenApply(v -> ret.get());
     }
 
     private void convertColumnDefinition(ColumnDefinition definition, 
ColumnChange columnChange) {
@@ -354,44 +347,43 @@ public class DdlCommandHandler {
      *
      * @param fullName Table with schema name.
      * @param colNames Columns definitions.
-     * @param colExist Flag indicates exceptionally behavior in case of 
already existing column.
+     * @param ignoreColumnExistance Flag indicates exceptionally behavior in 
case of already existing column.
      * @return {@code true} if the full columns set is applied successfully. 
Otherwise, returns {@code false}.
      */
-    private boolean dropColumnInternal(String fullName, Set<String> colNames, 
boolean colExist) {
+    private CompletableFuture<Boolean> dropColumnInternal(String fullName, 
Set<String> colNames, boolean ignoreColumnExistance) {
         AtomicBoolean ret = new AtomicBoolean(true);
 
-        tableManager.alterTable(
-                fullName,
-                chng -> chng.changeColumns(cols -> {
-                    PrimaryKeyView priKey = chng.primaryKey();
+        return tableManager.alterTableAsync(
+                        fullName,
+                        chng -> chng.changeColumns(cols -> {
+                            PrimaryKeyView priKey = chng.primaryKey();
 
-                    Map<String, String> colNamesToOrders = 
columnOrdersToNames(chng.columns());
+                            Map<String, String> colNamesToOrders = 
columnOrdersToNames(chng.columns());
 
-                    Set<String> colNames0 = new HashSet<>();
+                            Set<String> colNames0 = new HashSet<>();
 
-                    Set<String> primaryCols = Set.of(priKey.columns());
+                            Set<String> primaryCols = Set.of(priKey.columns());
 
-                    for (String colName : colNames) {
-                        if (!colNamesToOrders.containsKey(colName)) {
-                            ret.set(false);
+                            for (String colName : colNames) {
+                                if (!colNamesToOrders.containsKey(colName)) {
+                                    ret.set(false);
 
-                            if (!colExist) {
-                                throw new ColumnNotFoundException(colName, 
fullName);
-                            }
-                        } else {
-                            colNames0.add(colName);
-                        }
-
-                        if (primaryCols.contains(colName)) {
-                            throw new IgniteException(IgniteStringFormatter
-                                    .format("Can`t delete column, belongs to 
primary key: [name={}]", colName));
-                        }
-                    }
+                                    if (!ignoreColumnExistance) {
+                                        throw new 
ColumnNotFoundException(colName, fullName);
+                                    }
+                                } else {
+                                    colNames0.add(colName);
+                                }
 
-                    colNames0.forEach(k -> 
cols.delete(colNamesToOrders.get(k)));
-                }));
+                                if (primaryCols.contains(colName)) {
+                                    throw new 
IgniteException(IgniteStringFormatter
+                                            .format("Can`t delete column, 
belongs to primary key: [name={}]", colName));
+                                }
+                            }
 
-        return ret.get();
+                            colNames0.forEach(k -> 
cols.delete(colNamesToOrders.get(k)));
+                        }))
+                .thenApply(v -> ret.get());
     }
 
     /** Map column name to order. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index f983e3c5f1..243fa682d3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -54,6 +54,10 @@ public class IgniteSchema extends AbstractSchema {
         this(schemaName, null, null);
     }
 
+    public static IgniteSchema copy(IgniteSchema old) {
+        return new IgniteSchema(old.schemaName, old.tblMap, old.idxMap);
+    }
+
     /**
      * Get schema name.
      *
@@ -99,10 +103,6 @@ public class IgniteSchema extends AbstractSchema {
      * @param index Index.
      */
     public void addIndex(UUID indexId, IgniteIndex index) {
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17474
-        // Decouple tables and indices.
-        index.table().addIndex(index);
-
         idxMap.put(indexId, index);
     }
 
@@ -110,20 +110,10 @@ public class IgniteSchema extends AbstractSchema {
      * Remove index.
      *
      * @param indexId Index id.
-     * @return {@code True} if index was removed or {@code false} otherwise.
+     * @return Removed index.
      */
-    public boolean removeIndex(UUID indexId) {
-        IgniteIndex rmv = idxMap.remove(indexId);
-
-        if (rmv == null) {
-            return false;
-        }
-
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17474
-        // Decouple tables and indices.
-        rmv.table().removeIndex(rmv.name());
-
-        return true;
+    public IgniteIndex removeIndex(UUID indexId) {
+        return idxMap.remove(indexId);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index cd9258cad5..b3f4992089 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -22,11 +22,11 @@ import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.Convention;
@@ -81,7 +81,7 @@ public class IgniteTableImpl extends AbstractTable implements 
InternalIgniteTabl
 
     private final Statistic statistic;
 
-    private final Map<String, IgniteIndex> indexes = new ConcurrentHashMap<>();
+    private Map<String, IgniteIndex> indexes = new HashMap<>();
 
     private final List<ColumnDescriptor> columnsOrderedByPhysSchema;
 
@@ -115,6 +115,21 @@ public class IgniteTableImpl extends AbstractTable 
implements InternalIgniteTabl
         statistic = new StatisticsImpl();
     }
 
+    private IgniteTableImpl(IgniteTableImpl t) {
+        this.desc = t.desc;
+        this.ver = t.ver;
+        this.table = t.table;
+        this.schemaRegistry = t.schemaRegistry;
+        this.schemaDescriptor = t.schemaDescriptor;
+        this.statistic = t.statistic;
+        this.columnsOrderedByPhysSchema = t.columnsOrderedByPhysSchema;
+        this.indexes.putAll(t.indexes);
+    }
+
+    public static IgniteTableImpl copyOf(IgniteTableImpl v) {
+        return new IgniteTableImpl(v);
+    }
+
     /** {@inheritDoc} */
     @Override
     public UUID id() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 861500261a..fd393e22b8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -66,7 +67,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager 
{
 
     private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
 
-    private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
+    private final VersionedValue<Map<UUID, InternalIgniteTable>> tablesVv;
 
     private final VersionedValue<Map<UUID, IgniteIndex>> indicesVv;
 
@@ -213,7 +214,8 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
-                IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
+                IgniteSchema schema = res.compute(schemaName,
+                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
 
                 CompletableFuture<IgniteTableImpl> igniteTableFuture = 
convert(causalityToken, table);
 
@@ -223,11 +225,18 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                                         return failedFuture(ex);
                                     }
 
-                                    Map<UUID, IgniteTable> resTbls = new 
HashMap<>(tables);
+                                    Map<UUID, InternalIgniteTable> resTbls = 
new HashMap<>(tables);
 
                                     return igniteTableFuture
                                             .thenApply(igniteTable -> 
inBusyLock(busyLock, () -> {
-                                                resTbls.put(igniteTable.id(), 
igniteTable);
+                                                InternalIgniteTable oldTable = 
resTbls.put(igniteTable.id(), igniteTable);
+
+                                                // looks like this is UPDATE 
operation
+                                                if (oldTable != null) {
+                                                    for (var index : 
oldTable.indexes().values()) {
+                                                        
igniteTable.addIndex(index);
+                                                    }
+                                                }
 
                                                 return resTbls;
                                             }));
@@ -281,7 +290,8 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
-                IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
+                IgniteSchema schema = res.compute(schemaName,
+                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
 
                 String calciteTableName = objectSimpleName(schemaName, 
tableName);
 
@@ -295,7 +305,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                             return failedFuture(ex);
                         }
 
-                        Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
+                        Map<UUID, InternalIgniteTable> resTbls = new 
HashMap<>(tables);
 
                         resTbls.remove(table.id());
 
@@ -363,15 +373,9 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
         );
     }
 
-    private CompletableFuture<IgniteIndex> convert(long causalityToken, 
Index<?> index) {
-        return schemaManager.schemaRegistry(causalityToken, index.tableId())
-                .thenApply(schemaRegistry -> inBusyLock(busyLock, () -> 
convert(index, schemaRegistry)));
-    }
-
-    private IgniteIndex convert(Index<?> index, SchemaRegistry schemaRegistry) 
{
+    private IgniteIndex convert(Index<?> index, SchemaRegistry schemaRegistry, 
IgniteTable table) {
         IndexDescriptor desc = index.descriptor();
         SchemaDescriptor schema = schemaRegistry.schema();
-        IgniteTable table = tablesVv.latest().get(index.tableId());
 
         assert table != null;
 
@@ -421,47 +425,63 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
             return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
         }
         try {
-            schemasVv.update(
-                    causalityToken,
-                    (schemas, e) -> inBusyLock(busyLock, () -> {
-                        if (e != null) {
-                            return failedFuture(e);
-                        }
+            schemasVv.update(causalityToken, (schemas, e) -> 
inBusyLock(busyLock, () -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
 
-                        Map<String, IgniteSchema> res = new HashMap<>(schemas);
+                Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
-                        IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
+                IgniteSchema schema = res.compute(schemaName,
+                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
 
-                        CompletableFuture<IgniteIndex> igniteIndexFuture = 
convert(causalityToken, index);
+                return tablesVv.update(
+                        causalityToken,
+                        (tables, tblEx) -> inBusyLock(busyLock, () -> {
+                            if (tblEx != null) {
+                                return failedFuture(tblEx);
+                            }
 
-                        return indicesVv
-                                .update(
-                                        causalityToken,
-                                        (indices, ex) -> inBusyLock(busyLock, 
() -> {
-                                            if (ex != null) {
-                                                return failedFuture(ex);
-                                            }
+                            Map<UUID, InternalIgniteTable> resTbls = new 
HashMap<>(tables);
 
-                                            Map<UUID, IgniteIndex> resIdxs = 
new HashMap<>(indices);
+                            InternalIgniteTable table = 
resTbls.compute(index.tableId(),
+                                    (k, v) -> 
IgniteTableImpl.copyOf((IgniteTableImpl) v));
 
-                                            return igniteIndexFuture
-                                                    .thenApply(igniteIndex -> {
-                                                        
resIdxs.put(index.id(), igniteIndex);
+                            CompletableFuture<IgniteIndex> igniteIndexFuture = 
schemaManager
+                                    .schemaRegistry(causalityToken, 
index.tableId())
+                                    .thenApply(reg -> convert(index, reg, 
table));
 
-                                                        return resIdxs;
-                                                    });
-                                        })
-                                )
-                                .thenCombine(
-                                        igniteIndexFuture,
-                                        (v, igniteIndex) -> 
inBusyLock(busyLock, () -> {
-                                            schema.addIndex(index.id(), 
igniteIndex);
+                            return indicesVv.update(
+                                    causalityToken,
+                                    (indices, idxEx) -> inBusyLock(busyLock, 
() -> {
+                                        if (idxEx != null) {
+                                            return failedFuture(idxEx);
+                                        }
 
-                                            return null;
-                                        })
-                                )
-                                .thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(res)));
-                    }));
+                                        Map<UUID, IgniteIndex> resIdxs = new 
HashMap<>(indices);
+
+                                        return igniteIndexFuture
+                                                .thenApply(igniteIndex -> {
+                                                    resIdxs.put(index.id(), 
igniteIndex);
+
+                                                    return resIdxs;
+                                                });
+                                    })
+                            ).thenCombine(
+                                    igniteIndexFuture,
+                                    (v, igniteIndex) -> inBusyLock(busyLock, 
() -> {
+                                        String tblName = tableNameById(schema, 
index.tableId());
+
+                                        table.addIndex(igniteIndex);
+                                        schema.addTable(tblName, 
(InternalIgniteTable) table);
+                                        schema.addIndex(index.id(), 
igniteIndex);
+
+                                        return v;
+                                    })
+                            ).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(resTbls)));
+                        })
+                ).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(res)));
+            }));
 
             return calciteSchemaVv.get(causalityToken);
         } finally {
@@ -469,6 +489,16 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
         }
     }
 
+    private static String tableNameById(IgniteSchema schema, UUID tableId) {
+        return schema.getTableMap()
+                .entrySet()
+                .stream()
+                .filter(entry -> tableId
+                        .equals(((InternalIgniteTable) entry.getValue()).id()))
+                .map(Entry::getKey)
+                .findFirst().get();
+    }
+
     /**
      * Index dropped callback method deregisters index from Calcite schema.
      *
@@ -489,21 +519,44 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                 Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
-                IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
+                IgniteSchema schema = res.compute(schemaName,
+                        (k, v) -> v == null ? new IgniteSchema(schemaName) : 
IgniteSchema.copy(v));
 
-                if (schema.removeIndex(indexId)) {
-                    return indicesVv.update(causalityToken, (indices, ex) -> 
inBusyLock(busyLock, () -> {
-                                if (ex != null) {
-                                    return failedFuture(ex);
+                IgniteIndex rmvIndex = schema.removeIndex(indexId);
+                if (rmvIndex != null) {
+                    return tablesVv.update(
+                            causalityToken,
+                            (tables, tlbEx) -> inBusyLock(busyLock, () -> {
+                                if (tlbEx != null) {
+                                    return failedFuture(tlbEx);
                                 }
 
-                                Map<UUID, IgniteIndex> resIdxs = new 
HashMap<>(indices);
+                                Map<UUID, InternalIgniteTable> resTbls = new 
HashMap<>(tables);
 
-                                resIdxs.remove(indexId);
+                                InternalIgniteTable table = 
resTbls.compute(rmvIndex.table().id(),
+                                        (k, v) -> 
IgniteTableImpl.copyOf((IgniteTableImpl) v));
 
-                                return completedFuture(resIdxs);
-                            }
-                    )).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(res)));
+                                table.removeIndex(rmvIndex.name());
+
+                                return indicesVv.update(causalityToken, 
(indices, idxEx) -> inBusyLock(busyLock, () -> {
+                                            if (idxEx != null) {
+                                                return failedFuture(idxEx);
+                                            }
+
+                                            Map<UUID, IgniteIndex> resIdxs = 
new HashMap<>(indices);
+
+                                            IgniteIndex rmvIdx = 
resIdxs.remove(indexId);
+
+                                            assert 
table.id().equals(rmvIdx.table().id());
+
+                                            String tblName = 
tableNameById(schema, rmvIdx.table().id());
+                                            schema.addTable(tblName, table);
+
+                                            return completedFuture(resIdxs);
+                                        }
+                                )).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(resTbls)));
+                            })
+                    ).thenCompose(v -> inBusyLock(busyLock, () -> 
completedFuture(res)));
                 }
 
                 return completedFuture(res);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 417c22bba4..cf5db348c2 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.SchemaUtils;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.QueryContext;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
@@ -330,25 +331,25 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varbinary(255)) "
                 + "with partitions=1,replicas=1", curMethodName);
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
 
         assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
                 .equalsIgnoreCase("PUBLIC." + curMethodName)));
 
         String finalNewTblSql1 = newTblSql;
 
-        assertThrows(TableAlreadyExistsException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1)));
+        assertThrows(TableAlreadyExistsException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1)));
 
         String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1 
int PRIMARY KEY, c2 varbinary(255)) "
                 + "with partitions=1,replicas=1", curMethodName);
 
-        assertThrows(TableAlreadyExistsException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2)));
+        assertThrows(TableAlreadyExistsException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2)));
 
         // todo: correct exception need to be thrown 
https://issues.apache.org/jira/browse/IGNITE-16084
-        assertThrows(SqlException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with 
partitions__wrong=1,replicas=1")));
 
-        assertThrows(SqlException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(SqlException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with 
partitions=1,replicas__wrong=1")));
 
         newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varchar(255))",
@@ -368,7 +369,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int, c2 int NOT 
NULL DEFAULT 1, c3 int, primary key(c1, c2))", curMethodName);
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
 
         assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
                 .equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -383,24 +384,24 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varchar(255))", curMethodName);
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + 
curMethodName));
+        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + 
curMethodName));
 
         SqlQueryProcessor finalQueryProc = queryProc;
 
-        assertThrows(TableNotFoundException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 "DROP TABLE " + curMethodName + "_not_exist")));
 
-        assertThrows(TableNotFoundException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 "DROP TABLE " + curMethodName)));
 
-        assertThrows(TableNotFoundException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(TableNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 "DROP TABLE PUBLIC." + curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS 
PUBLIC." + curMethodName + "_not_exist"));
+        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS 
PUBLIC." + curMethodName + "_not_exist"));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS 
PUBLIC." + curMethodName));
+        readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS 
PUBLIC." + curMethodName));
 
         assertTrue(tblManager.tables().stream().noneMatch(t -> t.name()
                 .equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -417,40 +418,40 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varchar(255))", curMethodName);
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
 
         String alterCmd = String.format("ALTER TABLE %s ADD COLUMN (c3 
varchar, c4 int)", curMethodName);
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", alterCmd));
+        readFirst(queryProc.queryAsync("PUBLIC", alterCmd));
 
         String alterCmd1 = String.format("ALTER TABLE %s ADD COLUMN c5 int NOT 
NULL DEFAULT 1", curMethodName);
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", alterCmd1));
+        readFirst(queryProc.queryAsync("PUBLIC", alterCmd1));
 
-        assertThrows(ColumnAlreadyExistsException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
+        assertThrows(ColumnAlreadyExistsException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
 
         String alterCmdNoTbl = String.format("ALTER TABLE %s ADD COLUMN (c3 
varchar, c4 int)", curMethodName + "_notExist");
 
-        assertThrows(TableNotFoundException.class, () -> 
awaitFirst(queryProc.queryAsync("PUBLIC", alterCmdNoTbl)));
+        assertThrows(TableNotFoundException.class, () -> 
readFirst(queryProc.queryAsync("PUBLIC", alterCmdNoTbl)));
 
         String alterIfExistsCmd = String.format("ALTER TABLE IF EXISTS %s ADD 
COLUMN (c3 varchar, c4 int)", curMethodName + "NotExist");
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", alterIfExistsCmd));
+        readFirst(queryProc.queryAsync("PUBLIC", alterIfExistsCmd));
 
-        assertThrows(ColumnAlreadyExistsException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
+        assertThrows(ColumnAlreadyExistsException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
 
-        awaitFirst(finalQueryProc.queryAsync("PUBLIC", String.format("ALTER 
TABLE %s DROP COLUMN c4", curMethodName)));
+        readFirst(finalQueryProc.queryAsync("PUBLIC", String.format("ALTER 
TABLE %s DROP COLUMN c4", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE 
%s ADD COLUMN IF NOT EXISTS c3 varchar", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s 
ADD COLUMN IF NOT EXISTS c3 varchar", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE 
%s DROP COLUMN c3", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s 
DROP COLUMN c3", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE 
%s DROP COLUMN IF EXISTS c3", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s 
DROP COLUMN IF EXISTS c3", curMethodName)));
 
-        assertThrows(ColumnNotFoundException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(ColumnNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 String.format("ALTER TABLE %s DROP COLUMN (c3, c4)", 
curMethodName))));
 
-        assertThrows(IgniteException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(IgniteException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 String.format("ALTER TABLE %s DROP COLUMN c1", 
curMethodName))));
     }
 
@@ -461,20 +462,20 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     public void testAlterColumnsAddBatch() {
         String curMethodName = getCurrentMethodName();
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE 
%s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE 
%s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE 
%s ADD COLUMN (c3 varchar, c4 varchar)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s 
ADD COLUMN (c3 varchar, c4 varchar)", curMethodName)));
 
-        awaitFirst(queryProc
+        readFirst(queryProc
                 .queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN 
IF NOT EXISTS (c3 varchar, c4 varchar)", curMethodName)));
 
-        awaitFirst(
+        readFirst(
                 queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s 
ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar, c5 varchar)",
                         curMethodName)));
 
         SqlQueryProcessor finalQueryProc = queryProc;
 
-        assertThrows(ColumnAlreadyExistsException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(ColumnAlreadyExistsException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 String.format("ALTER TABLE %s ADD COLUMN (c5 varchar)", 
curMethodName))));
     }
 
@@ -485,16 +486,16 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     public void testAlterColumnsDropBatch() {
         String curMethodName = getCurrentMethodName();
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE 
%s "
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE 
%s "
                 + "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4 
varchar, c5 varchar)", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE 
%s DROP COLUMN c4", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s 
DROP COLUMN c4", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE 
%s DROP COLUMN IF EXISTS (c3, c4, c5)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s 
DROP COLUMN IF EXISTS (c3, c4, c5)", curMethodName)));
 
         SqlQueryProcessor finalQueryProc = queryProc;
 
-        assertThrows(ColumnNotFoundException.class, () -> 
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+        assertThrows(ColumnNotFoundException.class, () -> 
readFirst(finalQueryProc.queryAsync("PUBLIC",
                 String.format("ALTER TABLE %s DROP COLUMN c4", 
curMethodName))));
     }
 
@@ -510,35 +511,35 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, 
c2 varbinary(255)) with partitions=1", curMethodName);
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+        readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
 
         assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
                 .equalsIgnoreCase("PUBLIC." + curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index1 ON %s (c1)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index1 ON %s (c1)", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
IF NOT EXISTS index1 ON %s (c1)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
IF NOT EXISTS index1 ON %s (c1)", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index2 ON %s (c1)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index2 ON %s (c1)", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index3 ON %s (c2)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index3 ON %s (c2)", curMethodName)));
 
         assertThrows(IndexAlreadyExistsException.class, () ->
-                awaitFirst(finalQueryProc.queryAsync("PUBLIC", 
String.format("CREATE INDEX index3 ON %s (c1)", curMethodName))));
+                readFirst(finalQueryProc.queryAsync("PUBLIC", 
String.format("CREATE INDEX index3 ON %s (c1)", curMethodName))));
 
         assertThrows(IgniteException.class, () ->
-                awaitFirst(finalQueryProc
+                readFirst(finalQueryProc
                         .queryAsync("PUBLIC", String.format("CREATE INDEX 
index_3 ON %s (c1)", curMethodName + "_nonExist"))));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX 
index4 ON %s", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX 
index4 ON %s", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX 
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX 
index4 ON %s", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX 
index4 ON %s", curMethodName)));
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX IF 
EXISTS index4 ON %s", curMethodName)));
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX IF 
EXISTS index4 ON %s", curMethodName)));
     }
 
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-17197";)
@@ -547,7 +548,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
         String method = getCurrentMethodName();
 
         // Without engine.
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255))", method + 0)
         )));
@@ -555,7 +556,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
         assertThat(tableView(method + 0).dataStorage(), 
instanceOf(RocksDbDataStorageView.class));
 
         // With existing engine.
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format(
                         "CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) engine %s",
@@ -567,7 +568,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
         assertThat(tableView(method + 1).dataStorage(), 
instanceOf(TestConcurrentHashMapDataStorageView.class));
 
         // With existing engine in mixed case
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) engine %s", method + 2, "\"RocksDb\"")
         )));
@@ -576,7 +577,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         IgniteException exception = assertThrows(
                 IgniteException.class,
-                () -> awaitFirst(queryProc.queryAsync(
+                () -> readFirst(queryProc.queryAsync(
                         "PUBLIC",
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) engine %s", method + 3, method)
                 ))
@@ -588,7 +589,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         exception = assertThrows(
                 IgniteException.class,
-                () -> awaitFirst(queryProc.queryAsync(
+                () -> readFirst(queryProc.queryAsync(
                         "PUBLIC",
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255))", method + 4)
                 ))
@@ -601,34 +602,34 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     void createTableWithTableOptions() {
         String method = getCurrentMethodName();
 
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with replicas=1", method + 0)
         )));
 
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with REPLICAS=1", method + 1)
         )));
 
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with \"replicas\"=1", method + 2)
         )));
 
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with \"replICAS\"=1", method + 3)
         )));
 
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with replicas=1, partitions=1", method + 4)
         )));
 
         IgniteException exception = assertThrows(
                 IgniteException.class,
-                () -> awaitFirst(queryProc.queryAsync(
+                () -> readFirst(queryProc.queryAsync(
                         "PUBLIC",
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with replicas='%s'", method + 5, method)
                 ))
@@ -638,7 +639,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         exception = assertThrows(
                 IgniteException.class,
-                () -> awaitFirst(queryProc.queryAsync(
+                () -> readFirst(queryProc.queryAsync(
                         "PUBLIC",
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with %s='%s'", method + 6, method, method)
                 ))
@@ -648,7 +649,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
 
         exception = assertThrows(
                 IgniteException.class,
-                () -> awaitFirst(queryProc.queryAsync(
+                () -> readFirst(queryProc.queryAsync(
                         "PUBLIC",
                         String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with replicas=-1", method + 7)
                 ))
@@ -661,7 +662,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     void createTableWithDataStorageOptions() {
         String method = getCurrentMethodName();
 
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with dataRegion='default'", method + 0)
         )));
@@ -671,7 +672,7 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
                 equalTo(DEFAULT_DATA_REGION_NAME)
         );
 
-        assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+        assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
                 "PUBLIC",
                 String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 
varbinary(255)) with DATAREGION='test_region'", method + 1)
         )));
@@ -691,14 +692,14 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
     public void testSchemaForTheFutureUpdate() throws Exception {
         String curMethodName = getCurrentMethodName();
 
-        awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE 
%s "
+        readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE 
%s "
                 + "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4 
varchar, c5 varchar)", curMethodName)));
 
         SchemaRegistry schemaRegistry = (((TableImpl) 
tblManager.tables().get(0)).schemaView());
 
         runAsync(() -> {
             Thread.sleep(3000);
-            awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER 
TABLE %s DROP COLUMN c4", curMethodName)));
+            readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER 
TABLE %s DROP COLUMN c4", curMethodName)));
         });
 
         int lastSchemaVersion = schemaRegistry.lastSchemaVersion();
@@ -794,8 +795,8 @@ public class MockedStructuresTest extends 
IgniteAbstractTest {
         return tableManager;
     }
 
-    private <T> AsyncSqlCursor<T> 
awaitFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) {
-        return await(cursors.get(0));
+    private <T> BatchedResult<T> 
readFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) {
+        return await(await(cursors.get(0)).requestNextAsync(512));
     }
 
     private @Nullable TableView tableView(String tableName) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index b9d2f64351..0355c749df 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -42,6 +43,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.ignite.internal.index.Index;
 import org.apache.ignite.internal.index.IndexDescriptor;
@@ -54,6 +56,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTableImpl;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
@@ -83,7 +86,7 @@ public class SqlSchemaManagerTest {
             tableVer,
             new Column[]{new Column(0, "ID", NativeTypes.INT64, false)},
             new Column[]{new Column(1, "VAL", NativeTypes.INT64, false)}
-        );
+    );
 
     @Mock
     private TableManager tableManager;
@@ -312,6 +315,68 @@ public class SqlSchemaManagerTest {
         verifyNoMoreInteractions(tableManager);
     }
 
+
+    @Test
+    public void testIndexEventsProcessed() {
+        InternalTable mock = mock(InternalTable.class);
+        when(mock.tableId()).thenReturn(tableId);
+
+        when(table.name()).thenReturn("TEST_SCHEMA.T");
+        when(table.internalTable()).thenReturn(mock);
+        when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
+        
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
+        when(schemaManager.schemaRegistry(anyLong(), 
any())).thenReturn(completedFuture(schemaRegistry));
+
+        sqlSchemaManager.onTableCreated("TEST_SCHEMA", table, 
testRevisionRegister.actualToken() + 1);
+        testRevisionRegister.moveForward();
+
+        IndexDescriptor descMock = mock(IndexDescriptor.class);
+        when(descMock.columns()).thenReturn(List.of());
+
+        when(index.name()).thenReturn("I");
+        when(index.id()).thenReturn(indexId);
+        when(index.tableId()).thenReturn(tableId);
+        when(index.descriptor()).thenReturn(descMock);
+
+        {
+            SchemaPlus schema1 = sqlSchemaManager.schema("TEST_SCHEMA");
+
+            sqlSchemaManager.onIndexCreated("TEST_SCHEMA", index, 
testRevisionRegister.actualToken() + 1);
+            testRevisionRegister.moveForward();
+
+            SchemaPlus schema2 = sqlSchemaManager.schema("TEST_SCHEMA");
+
+            // Validate schema snapshot.
+            assertNotSame(schema1, schema2);
+            assertNotSame(schema1.getTable("T"), schema2.getTable("T"));
+
+            assertNull(schema1.unwrap(IgniteSchema.class).index(indexId));
+            assertNotNull(schema2.unwrap(IgniteSchema.class).index(indexId));
+
+            assertNull(((InternalIgniteTable) 
schema1.getTable("T")).getIndex("I"));
+            assertNotNull(((InternalIgniteTable) 
schema2.getTable("T")).getIndex("I"));
+        }
+        {
+            sqlSchemaManager.onIndexDropped("TEST_SCHEMA", indexId, 
testRevisionRegister.actualToken() + 1);
+            SchemaPlus schema1 = sqlSchemaManager.schema("TEST_SCHEMA");
+            testRevisionRegister.moveForward();
+
+            SchemaPlus schema2 = sqlSchemaManager.schema("TEST_SCHEMA");
+
+            // Validate schema snapshot.
+            assertNotSame(schema1, schema2);
+            assertNotSame(schema1.getTable("T"), schema2.getTable("T"));
+
+            assertNotNull(schema1.unwrap(IgniteSchema.class).index(indexId));
+            assertNull(schema2.unwrap(IgniteSchema.class).index(indexId));
+
+            assertNull(((InternalIgniteTable) 
schema2.getTable("T")).getIndex("I"));
+            assertNotNull(((InternalIgniteTable) 
schema1.getTable("T")).getIndex("I"));
+        }
+
+        verifyNoMoreInteractions(tableManager);
+    }
+
     /**
      * Test revision register.
      */
@@ -346,8 +411,8 @@ public class SqlSchemaManagerTest {
                 Function<Long, CompletableFuture<?>> old = moveRevision;
 
                 moveRevision = rev -> allOf(
-                    old.apply(rev),
-                    function.apply(rev)
+                        old.apply(rev),
+                        function.apply(rev)
                 );
             }
         }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index a4d9cd36af..0a13813ff8 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -19,19 +19,24 @@ package org.apache.ignite.internal.sql.engine.planner;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.index.ColumnCollation;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSortedIndexSpool;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
 import org.junit.jupiter.api.Test;
@@ -193,4 +198,49 @@ public class SortedIndexSpoolPlannerTest extends 
AbstractPlannerTest {
         assertTrue(((RexLiteral) lowerBound.get(3)).isNull());
         assertTrue(upperBound.get(1) instanceof RexFieldAccess);
     }
+
+    /**
+     * Check colocated fields with DESC ordering.
+     */
+    @Test
+    public void testDescFields() throws Exception {
+        IgniteSchema publicSchema = createSchema(
+                createTable("T0", 10, IgniteDistributions.affinity(0, "T0", 
"hash"),
+                        "ID", Integer.class, "JID", Integer.class, "VAL", 
String.class)
+                        .addIndex("t0_jid_idx", 1),
+                createTable("T1", 100, IgniteDistributions.affinity(0, "T1", 
"hash"),
+                        "ID", Integer.class, "JID", Integer.class, "VAL", 
String.class)
+                        
.addIndex(RelCollations.of(TraitUtils.createFieldCollation(1, 
ColumnCollation.DESC_NULLS_FIRST)), "t1_jid_idx")
+        );
+
+        String sql = "select * "
+                + "from t0 "
+                + "join t1 on t1.jid < t0.jid";
+
+        assertPlan(sql, publicSchema,
+                isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                        .and(input(1, 
isInstanceOf(IgniteSortedIndexSpool.class)
+                                .and(spool -> {
+                                    List<RexNode> lowerBound = 
spool.indexCondition().lowerBound();
+
+                                    // Condition is LESS_THEN, but we have 
DESC field and condition should be in lower bound
+                                    // instead of upper bound.
+                                    assertNotNull(lowerBound);
+                                    assertEquals(3, lowerBound.size());
+
+                                    assertTrue(((RexLiteral) 
lowerBound.get(0)).isNull());
+                                    assertTrue(lowerBound.get(1) instanceof 
RexFieldAccess);
+                                    assertTrue(((RexLiteral) 
lowerBound.get(2)).isNull());
+
+                                    List<RexNode> upperBound = 
spool.indexCondition().upperBound();
+
+                                    assertNull(upperBound);
+
+                                    return true;
+                                })
+                                .and(hasChildThat(isIndexScan("T1", 
"t1_jid_idx")))
+                        )),
+                "MergeJoinConverter", "NestedLoopJoinConverter", 
"FilterSpoolMergeToHashIndexSpoolRule"
+        );
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 32bb02cf2f..a6a2722686 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1156,6 +1156,10 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                     }
                 });
             }
+        }).exceptionally(th -> {
+            tblFut.completeExceptionally(th);
+
+            return null;
         });
 
         return tblFut;
@@ -1305,16 +1309,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                     return allOf(tableFuts).thenApply(unused -> 
inBusyLock(busyLock, () -> {
                         var tables = new ArrayList<Table>(tableIds.size());
 
-                        try {
-                            for (var fut : tableFuts) {
-                                var table = fut.get();
+                        for (var fut : tableFuts) {
+                            var table = fut.join();
 
-                                if (table != null) {
-                                    tables.add((Table) table);
-                                }
+                            if (table != null) {
+                                tables.add((Table) table);
                             }
-                        } catch (Throwable t) {
-                            throw new CompletionException(t);
                         }
 
                         return tables;

Reply via email to