This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 608087a806 IGNITE-17431: Sql. Index support by optimizer (#1039)
608087a806 is described below
commit 608087a806096fd33f3c72adf87d4ddce8fac6e5
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Fri Sep 2 19:00:28 2022 +0300
IGNITE-17431: Sql. Index support by optimizer (#1039)
Co-authored-by: korlov42 <[email protected]>
---
.../apache/ignite/internal/index/IndexManager.java | 38 ----
.../ignite/internal/index/ItIndexManagerTest.java | 2 +-
.../internal/sql/engine/ItIndexSpoolTest.java | 36 +---
.../internal/sql/engine/ItSortAggregateTest.java | 73 +-------
.../sql/engine/exec/ExecutionServiceImpl.java | 26 ++-
.../sql/engine/exec/ddl/DdlCommandHandler.java | 198 ++++++++++-----------
.../internal/sql/engine/schema/IgniteSchema.java | 24 +--
.../sql/engine/schema/IgniteTableImpl.java | 19 +-
.../sql/engine/schema/SqlSchemaManagerImpl.java | 165 +++++++++++------
.../sql/engine/exec/MockedStructuresTest.java | 133 +++++++-------
.../engine/exec/schema/SqlSchemaManagerTest.java | 71 +++++++-
.../planner/SortedIndexSpoolPlannerTest.java | 50 ++++++
.../internal/table/distributed/TableManager.java | 16 +-
13 files changed, 455 insertions(+), 396 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 2da927850c..95c2129f2c 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -122,27 +122,6 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
LOG.info("Index manager stopped");
}
- /**
- * Creates index from provided configuration changer.
- *
- * @param schemaName A name of the schema to create index in.
- * @param indexName A name of the index to create.
- * @param tableName A name of the table to create index for.
- * @param indexChange A consumer that suppose to change the configuration
in order to provide description of an index.
- * @param failIfExists Flag indicates whether exception be thrown if index
exists or not.
- * @return {@code True} if index was created successfully, {@code false}
otherwise.
- * @throws IndexAlreadyExistsException If index already exists and
- */
- public boolean createIndex(
- String schemaName,
- String indexName,
- String tableName,
- boolean failIfExists,
- Consumer<TableIndexChange> indexChange
- ) {
- return join(createIndexAsync(schemaName, indexName, tableName,
failIfExists, indexChange)) != null;
- }
-
/**
* Creates index from provided configuration changer.
*
@@ -251,23 +230,6 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
}
}
- /**
- * Drops the index with a given name.
- *
- * @param schemaName A name of the schema the index belong to.
- * @param indexName A name of the index to drop.
- * @param failIfNotExist Flag, which force failure, when {@code trues} if
index doen't not exists.
- * @return {@code True} if index was removed, {@code false} otherwise.
- * @throws IndexNotFoundException If index doesn't exist and {@code
failIfNotExist} param was {@code true}.
- */
- public boolean dropIndex(
- String schemaName,
- String indexName,
- boolean failIfNotExist
- ) {
- return join(dropIndexAsync(schemaName, indexName, failIfNotExist));
- }
-
/**
* Drops the index with a given name asynchronously.
*
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index 95d9e30eaa..950e774b70 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -94,7 +94,7 @@ public class ItIndexManagerTest extends
AbstractBasicIntegrationTest {
assertThat(createEventParamHolder.get(), notNullValue());
assertThat(index, sameInstance(createEventParamHolder.get().index()));
- indexManager.dropIndex("PUBLIC", "INAME", true);
+ indexManager.dropIndexAsync("PUBLIC", "INAME", true).join();
assertThat(dropEventParamHolder.get(), notNullValue());
assertThat(index.id(),
sameInstance(dropEventParamHolder.get().indexId()));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index 4918964b4d..bc02b015d8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -24,10 +24,6 @@ import static
org.junit.jupiter.params.ParameterizedTest.ARGUMENTS_PLACEHOLDER;
import java.util.List;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.schema.definition.ColumnType;
-import org.apache.ignite.schema.definition.TableDefinition;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -52,7 +48,7 @@ public class ItIndexSpoolTest extends
AbstractBasicIntegrationTest {
// Remove this, indices must be dropped together with the table.
CLUSTER_NODES.get(0).tables().tables().stream()
.map(Table::name)
- .forEach(name -> sql("DROP INDEX " + name + "_JID_IDX"));
+ .forEach(name -> sql("DROP INDEX IF EXISTS " + name +
"_JID_IDX"));
CLUSTER_NODES.get(0).tables().tables().stream()
.map(Table::name)
@@ -81,29 +77,6 @@ public class ItIndexSpoolTest extends
AbstractBasicIntegrationTest {
res.forEach(r -> assertThat(r.get(0), is(r.get(1))));
}
- private Table createTable(String tableName) {
- TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC",
tableName)
- .columns(
- SchemaBuilders.column("ID", ColumnType.INT32).build(),
- SchemaBuilders.column("JID",
ColumnType.INT32).asNullable(true).build(),
- SchemaBuilders.column("VAL",
ColumnType.string()).asNullable(true).build()
- )
- .withPrimaryKey("ID")
- .build();
-
- Table table =
CLUSTER_NODES.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
- SchemaConfigurationConverter.convert(schTbl1, tblCh)
- .changeReplicas(2)
- .changePartitions(10)
- );
-
- CLUSTER_NODES.get(0).tables().alterTable(schTbl1.canonicalName(),
tblCh -> SchemaConfigurationConverter.addIndex(
- SchemaBuilders.sortedIndex(tableName +
"_JID_IDX").addIndexColumn("JID").done().build(), tblCh)
- );
-
- return table;
- }
-
private void prepareDataSet(int rowsCount) {
Object[][] dataRows = new Object[rowsCount][];
@@ -112,9 +85,12 @@ public class ItIndexSpoolTest extends
AbstractBasicIntegrationTest {
}
for (String name : List.of("TEST0", "TEST1")) {
- Table tbl = createTable(name);
+ sql("CREATE TABLE " + name + "(id INT PRIMARY KEY, jid INT, val
VARCHAR) WITH replicas=2,partitions=10");
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17304
uncomment this
+ // sql("CREATE INDEX " + name + "_jid_idx ON " + name + "(jid)");
- insertData(tbl, new String[]{"ID", "JID", "VAL"}, dataRows);
+ insertData("PUBLIC." + name, new String[]{"ID", "JID", "VAL"},
dataRows);
}
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
index d047780b3d..e0379a401a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSortAggregateTest.java
@@ -19,13 +19,6 @@ package org.apache.ignite.internal.sql.engine;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.schema.definition.ColumnType;
-import org.apache.ignite.schema.definition.TableDefinition;
-import org.apache.ignite.table.RecordView;
-import org.apache.ignite.table.Table;
-import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -40,68 +33,16 @@ public class ItSortAggregateTest extends
AbstractBasicIntegrationTest {
*/
@BeforeAll
static void initTestData() {
- TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "TEST")
- .columns(
- SchemaBuilders.column("ID", ColumnType.INT32).build(),
- SchemaBuilders.column("GRP0",
ColumnType.INT32).asNullable(true).build(),
- SchemaBuilders.column("GRP1",
ColumnType.INT32).asNullable(true).build(),
- SchemaBuilders.column("VAL0",
ColumnType.INT32).asNullable(true).build(),
- SchemaBuilders.column("VAL1",
ColumnType.INT32).asNullable(true).build()
- )
- .withPrimaryKey("ID")
- .build();
+ sql("CREATE TABLE test (id INT PRIMARY KEY, grp0 INT, grp1 INT, val0
INT, val1 INT) WITH replicas=2,partitions=10");
+ sql("CREATE TABLE test_one_col_idx (pk INT PRIMARY KEY, col0 INT)");
- TableDefinition schTbl2 = SchemaBuilders.tableBuilder("PUBLIC",
"TEST_ONE_COL_IDX")
- .columns(
- SchemaBuilders.column("PK", ColumnType.INT32).build(),
- SchemaBuilders.column("COL0",
ColumnType.INT32).asNullable(true).build()
- )
- .withPrimaryKey("PK")
- .build();
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17304 uncomment
this
+ // sql("CREATE INDEX test_idx ON test(grp0, grp1)");
+ // sql("CREATE INDEX test_one_col_idx_idx ON test_one_col_idx(col0)");
- Table table =
CLUSTER_NODES.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
- SchemaConfigurationConverter.convert(schTbl1, tblCh)
- .changeReplicas(2)
- .changePartitions(10)
- );
-
- Table tblOneColIdx =
CLUSTER_NODES.get(0).tables().createTable(schTbl2.canonicalName(), tblCh ->
- SchemaConfigurationConverter.convert(schTbl2, tblCh)
- );
-
- CLUSTER_NODES.get(0).tables().alterTable(schTbl1.canonicalName(),
tblCh ->
-
SchemaConfigurationConverter.addIndex(SchemaBuilders.sortedIndex(schTbl1.name()
+ "IDX")
- .addIndexColumn("GRP0").done()
- .addIndexColumn("GRP1").done()
- .build(), tblCh)
- );
- CLUSTER_NODES.get(0).tables().alterTable(schTbl2.canonicalName(),
tblCh ->
-
SchemaConfigurationConverter.addIndex(SchemaBuilders.sortedIndex(schTbl2.name()
+ "IDX")
- .addIndexColumn("COL0").desc().done()
- .build(), tblCh)
- );
-
- RecordView<Tuple> view = table.recordView();
- for (int i = 0; i < ROWS; i++) {
- view.insert(
- null,
- Tuple.create()
- .set("ID", i)
- .set("GRP0", i / 10)
- .set("GRP1", i / 100)
- .set("VAL0", 1)
- .set("VAL1", 2)
- );
- }
-
- RecordView<Tuple> view1 = tblOneColIdx.recordView();
for (int i = 0; i < ROWS; i++) {
- view1.insert(
- null,
- Tuple.create()
- .set("PK", i)
- .set("COL0", i)
- );
+ sql("INSERT INTO test (id, grp0, grp1, val0, val1) VALUES (?, ?,
?, ?, ?)", i, i / 10, i / 100, 1, 2);
+ sql("INSERT INTO test_one_col_idx (pk, col0) VALUES (?, ?)", i, i);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 189e0949c0..5b43afe8f9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -22,14 +22,15 @@ import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -69,6 +70,7 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -248,14 +250,26 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
- try {
- boolean ret = ddlCmdHnd.handle(plan.command());
+ CompletableFuture<Iterator<List<Object>>> ret =
ddlCmdHnd.handle(plan.command())
+ .thenApply(applied ->
List.of(List.<Object>of(applied)).iterator())
+ .exceptionally(th -> {
+ throw convertDdlException(th);
+ });
+
+ return new AsyncWrapper<>(ret, Runnable::run);
+ }
- return new
AsyncWrapper<>(Collections.singletonList(Collections.<Object>singletonList(ret)).iterator());
- } catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException("Failed to execute DDL statement
[stmt=" /*+ qry.sql()*/
+ private static RuntimeException convertDdlException(Throwable e) {
+ if (e instanceof CompletionException) {
+ e = e.getCause();
+ }
+
+ if (e instanceof IgniteInternalCheckedException) {
+ return new IgniteInternalException("Failed to execute DDL
statement [stmt=" /*+ qry.sql()*/
+ ", err=" + e.getMessage() + ']', e);
}
+
+ return (e instanceof RuntimeException) ? (RuntimeException) e : new
IgniteException(e);
}
private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 6ad4badcfd..5e1cf624eb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec.ddl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
import static
org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convertDefaultToConfiguration;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -25,8 +27,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListView;
@@ -88,7 +94,7 @@ public class DdlCommandHandler {
}
/** Handles ddl commands. */
- public boolean handle(DdlCommand cmd) throws
IgniteInternalCheckedException {
+ public CompletableFuture<Boolean> handle(DdlCommand cmd) {
validateCommand(cmd);
if (cmd instanceof CreateTableCommand) {
@@ -104,9 +110,9 @@ public class DdlCommandHandler {
} else if (cmd instanceof DropIndexCommand) {
return handleDropIndex((DropIndexCommand) cmd);
} else {
- throw new IgniteInternalCheckedException("Unsupported DDL
operation ["
+ return failedFuture(new
IgniteInternalCheckedException("Unsupported DDL operation ["
+ "cmdName=" + (cmd == null ? null :
cmd.getClass().getSimpleName()) + "; "
- + "cmd=\"" + cmd + "\"]");
+ + "cmd=\"" + cmd + "\"]"));
}
}
@@ -122,7 +128,7 @@ public class DdlCommandHandler {
}
/** Handles create table command. */
- private boolean handleCreateTable(CreateTableCommand cmd) {
+ private CompletableFuture<Boolean> handleCreateTable(CreateTableCommand
cmd) {
Consumer<TableChange> tblChanger = tableChange -> {
tableChange.changeColumns(columnsChange -> {
for (var col : cmd.columns()) {
@@ -157,42 +163,27 @@ public class DdlCommandHandler {
IgniteObjectName.quote(cmd.tableName())
);
- try {
- tableManager.createTable(fullName, tblChanger);
-
- return true;
- } catch (TableAlreadyExistsException ex) {
- if (!cmd.ifTableExists()) {
- throw ex;
- } else {
- return false;
- }
- }
+ return tableManager.createTableAsync(fullName, tblChanger)
+ .thenApply(Objects::nonNull)
+ .handle(handleTableModificationResult(cmd.ifTableExists()));
}
/** Handles drop table command. */
- private boolean handleDropTable(DropTableCommand cmd) {
+ private CompletableFuture<Boolean> handleDropTable(DropTableCommand cmd) {
String fullName = SchemaUtils.canonicalName(
IgniteObjectName.quote(cmd.schemaName()),
IgniteObjectName.quote(cmd.tableName())
);
- try {
- tableManager.dropTable(fullName);
-
- return true;
- } catch (TableNotFoundException ex) {
- if (!cmd.ifTableExists()) {
- throw ex;
- } else {
- return false;
- }
- }
+
+ return tableManager.dropTableAsync(fullName)
+ .thenApply(v -> Boolean.TRUE)
+ .handle(handleTableModificationResult(cmd.ifTableExists()));
}
/** Handles add column command. */
- private boolean handleAlterAddColumn(AlterTableAddCommand cmd) {
+ private CompletableFuture<Boolean>
handleAlterAddColumn(AlterTableAddCommand cmd) {
if (nullOrEmpty(cmd.columns())) {
- return false;
+ return completedFuture(Boolean.FALSE);
}
String fullName = SchemaUtils.canonicalName(
@@ -200,21 +191,14 @@ public class DdlCommandHandler {
IgniteObjectName.quote(cmd.tableName())
);
- try {
- return addColumnInternal(fullName, cmd.columns(),
cmd.ifColumnNotExists());
- } catch (TableNotFoundException ex) {
- if (!cmd.ifTableExists()) {
- throw ex;
- } else {
- return false;
- }
- }
+ return addColumnInternal(fullName, cmd.columns(),
cmd.ifColumnNotExists())
+ .handle(handleTableModificationResult(cmd.ifTableExists()));
}
/** Handles drop column command. */
- private boolean handleAlterDropColumn(AlterTableDropCommand cmd) {
+ private CompletableFuture<Boolean>
handleAlterDropColumn(AlterTableDropCommand cmd) {
if (nullOrEmpty(cmd.columns())) {
- return false;
+ return completedFuture(Boolean.FALSE);
}
String fullName = SchemaUtils.canonicalName(
@@ -222,35 +206,45 @@ public class DdlCommandHandler {
IgniteObjectName.quote(cmd.tableName())
);
- try {
- return dropColumnInternal(fullName, cmd.columns(),
cmd.ifColumnExists());
- } catch (TableNotFoundException ex) {
- if (!cmd.ifTableExists()) {
- throw ex;
- } else {
- return false;
+ return dropColumnInternal(fullName, cmd.columns(),
cmd.ifColumnExists())
+ .handle(handleTableModificationResult(cmd.ifTableExists()));
+ }
+
+ private static BiFunction<Object, Throwable, Boolean>
handleTableModificationResult(boolean ignoreTableExistenceErrors) {
+ return (val, err) -> {
+ if (err == null) {
+ return val instanceof Boolean ? (Boolean) val : Boolean.TRUE;
+ } else if (ignoreTableExistenceErrors) {
+ Throwable err0 = err instanceof CompletionException ?
err.getCause() : err;
+
+ if (err0 instanceof TableAlreadyExistsException || err0
instanceof TableNotFoundException) {
+ return Boolean.FALSE;
+ }
}
- }
+
+ throw (err instanceof RuntimeException) ? (RuntimeException) err :
new CompletionException(err);
+ };
}
/** Handles create index command. */
- private boolean handleCreateIndex(CreateIndexCommand cmd) {
+ private CompletableFuture<Boolean> handleCreateIndex(CreateIndexCommand
cmd) {
Consumer<TableIndexChange> indexChanger = tableIndexChange -> {
// Only sorted idx for now.
createSortedIndexInternal(cmd,
tableIndexChange.convert(SortedIndexChange.class));
};
- return indexManager.createIndex(
- cmd.schemaName(),
- cmd.indexName(),
- cmd.tableName(),
- !cmd.ifIndexNotExists(),
- indexChanger);
+ return indexManager.createIndexAsync(
+ cmd.schemaName(),
+ cmd.indexName(),
+ cmd.tableName(),
+ !cmd.ifIndexNotExists(),
+ indexChanger)
+ .thenApply(Objects::nonNull);
}
/** Handles drop index command. */
- private boolean handleDropIndex(DropIndexCommand cmd) {
- return indexManager.dropIndex(cmd.schemaName(), cmd.indexName(),
!cmd.ifNotExists());
+ private CompletableFuture<Boolean> handleDropIndex(DropIndexCommand cmd) {
+ return indexManager.dropIndexAsync(cmd.schemaName(), cmd.indexName(),
!cmd.ifNotExists());
}
/**
@@ -263,7 +257,7 @@ public class DdlCommandHandler {
indexChange.changeColumns(colsInit -> {
for (Pair<String, Boolean> col : cmd.columns()) {
//TODO: https://issues.apache.org/jira/browse/IGNITE-17563
Pass null ordering for columns.
- colsInit.create(col.getFirst(), colInit ->
colInit.changeAsc(col.getSecond()));
+ colsInit.create(col.getFirst(), colInit ->
colInit.changeAsc(!col.getSecond()));
}
});
}
@@ -272,30 +266,21 @@ public class DdlCommandHandler {
* Adds a column according to the column definition.
*
* @param fullName Table with schema name.
- * @param colsDef Columns defenitions.
- * @param colNotExist Flag indicates exceptionally behavior in case of
already existing column.
- *
+ * @param colsDef Columns defenitions.
+ * @param ignoreColumnExistance Flag indicates exceptionally behavior in
case of already existing column.
* @return {@code true} if the full columns set is applied successfully.
Otherwise, returns {@code false}.
*/
- private boolean addColumnInternal(String fullName, List<ColumnDefinition>
colsDef, boolean colNotExist) {
+ private CompletableFuture<Boolean> addColumnInternal(String fullName,
List<ColumnDefinition> colsDef, boolean ignoreColumnExistance) {
AtomicBoolean ret = new AtomicBoolean(true);
- tableManager.alterTable(
+
+ return tableManager.alterTableAsync(
fullName,
chng -> chng.changeColumns(cols -> {
Map<String, String> colNamesToOrders =
columnOrdersToNames(chng.columns());
List<ColumnDefinition> colsDef0;
- if (!colNotExist) {
- colsDef.stream()
- .filter(k ->
colNamesToOrders.containsKey(k.name()))
- .findAny()
- .ifPresent(c -> {
- throw new
ColumnAlreadyExistsException(c.name());
- });
-
- colsDef0 = colsDef;
- } else {
+ if (ignoreColumnExistance) {
colsDef0 = colsDef.stream().filter(k -> {
if (colNamesToOrders.containsKey(k.name())) {
ret.set(false);
@@ -305,14 +290,22 @@ public class DdlCommandHandler {
return true;
}
}).collect(Collectors.toList());
+ } else {
+ colsDef.stream()
+ .filter(k ->
colNamesToOrders.containsKey(k.name()))
+ .findAny()
+ .ifPresent(c -> {
+ throw new
ColumnAlreadyExistsException(c.name());
+ });
+
+ colsDef0 = colsDef;
}
for (ColumnDefinition col : colsDef0) {
cols.create(col.name(), colChg ->
convertColumnDefinition(col, colChg));
}
- }));
-
- return ret.get();
+ })
+ ).thenApply(v -> ret.get());
}
private void convertColumnDefinition(ColumnDefinition definition,
ColumnChange columnChange) {
@@ -354,44 +347,43 @@ public class DdlCommandHandler {
*
* @param fullName Table with schema name.
* @param colNames Columns definitions.
- * @param colExist Flag indicates exceptionally behavior in case of
already existing column.
+ * @param ignoreColumnExistance Flag indicates exceptionally behavior in
case of already existing column.
* @return {@code true} if the full columns set is applied successfully.
Otherwise, returns {@code false}.
*/
- private boolean dropColumnInternal(String fullName, Set<String> colNames,
boolean colExist) {
+ private CompletableFuture<Boolean> dropColumnInternal(String fullName,
Set<String> colNames, boolean ignoreColumnExistance) {
AtomicBoolean ret = new AtomicBoolean(true);
- tableManager.alterTable(
- fullName,
- chng -> chng.changeColumns(cols -> {
- PrimaryKeyView priKey = chng.primaryKey();
+ return tableManager.alterTableAsync(
+ fullName,
+ chng -> chng.changeColumns(cols -> {
+ PrimaryKeyView priKey = chng.primaryKey();
- Map<String, String> colNamesToOrders =
columnOrdersToNames(chng.columns());
+ Map<String, String> colNamesToOrders =
columnOrdersToNames(chng.columns());
- Set<String> colNames0 = new HashSet<>();
+ Set<String> colNames0 = new HashSet<>();
- Set<String> primaryCols = Set.of(priKey.columns());
+ Set<String> primaryCols = Set.of(priKey.columns());
- for (String colName : colNames) {
- if (!colNamesToOrders.containsKey(colName)) {
- ret.set(false);
+ for (String colName : colNames) {
+ if (!colNamesToOrders.containsKey(colName)) {
+ ret.set(false);
- if (!colExist) {
- throw new ColumnNotFoundException(colName,
fullName);
- }
- } else {
- colNames0.add(colName);
- }
-
- if (primaryCols.contains(colName)) {
- throw new IgniteException(IgniteStringFormatter
- .format("Can`t delete column, belongs to
primary key: [name={}]", colName));
- }
- }
+ if (!ignoreColumnExistance) {
+ throw new
ColumnNotFoundException(colName, fullName);
+ }
+ } else {
+ colNames0.add(colName);
+ }
- colNames0.forEach(k ->
cols.delete(colNamesToOrders.get(k)));
- }));
+ if (primaryCols.contains(colName)) {
+ throw new
IgniteException(IgniteStringFormatter
+ .format("Can`t delete column,
belongs to primary key: [name={}]", colName));
+ }
+ }
- return ret.get();
+ colNames0.forEach(k ->
cols.delete(colNamesToOrders.get(k)));
+ }))
+ .thenApply(v -> ret.get());
}
/** Map column name to order. */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index f983e3c5f1..243fa682d3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -54,6 +54,10 @@ public class IgniteSchema extends AbstractSchema {
this(schemaName, null, null);
}
+ public static IgniteSchema copy(IgniteSchema old) {
+ return new IgniteSchema(old.schemaName, old.tblMap, old.idxMap);
+ }
+
/**
* Get schema name.
*
@@ -99,10 +103,6 @@ public class IgniteSchema extends AbstractSchema {
* @param index Index.
*/
public void addIndex(UUID indexId, IgniteIndex index) {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17474
- // Decouple tables and indices.
- index.table().addIndex(index);
-
idxMap.put(indexId, index);
}
@@ -110,20 +110,10 @@ public class IgniteSchema extends AbstractSchema {
* Remove index.
*
* @param indexId Index id.
- * @return {@code True} if index was removed or {@code false} otherwise.
+ * @return Removed index.
*/
- public boolean removeIndex(UUID indexId) {
- IgniteIndex rmv = idxMap.remove(indexId);
-
- if (rmv == null) {
- return false;
- }
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17474
- // Decouple tables and indices.
- rmv.table().removeIndex(rmv.name());
-
- return true;
+ public IgniteIndex removeIndex(UUID indexId) {
+ return idxMap.remove(indexId);
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index cd9258cad5..b3f4992089 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -22,11 +22,11 @@ import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.calcite.plan.Convention;
@@ -81,7 +81,7 @@ public class IgniteTableImpl extends AbstractTable implements
InternalIgniteTabl
private final Statistic statistic;
- private final Map<String, IgniteIndex> indexes = new ConcurrentHashMap<>();
+ private Map<String, IgniteIndex> indexes = new HashMap<>();
private final List<ColumnDescriptor> columnsOrderedByPhysSchema;
@@ -115,6 +115,21 @@ public class IgniteTableImpl extends AbstractTable
implements InternalIgniteTabl
statistic = new StatisticsImpl();
}
+ private IgniteTableImpl(IgniteTableImpl t) {
+ this.desc = t.desc;
+ this.ver = t.ver;
+ this.table = t.table;
+ this.schemaRegistry = t.schemaRegistry;
+ this.schemaDescriptor = t.schemaDescriptor;
+ this.statistic = t.statistic;
+ this.columnsOrderedByPhysSchema = t.columnsOrderedByPhysSchema;
+ this.indexes.putAll(t.indexes);
+ }
+
+ public static IgniteTableImpl copyOf(IgniteTableImpl v) {
+ return new IgniteTableImpl(v);
+ }
+
/** {@inheritDoc} */
@Override
public UUID id() {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 861500261a..fd393e22b8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -66,7 +67,7 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager
{
private final VersionedValue<Map<String, IgniteSchema>> schemasVv;
- private final VersionedValue<Map<UUID, IgniteTable>> tablesVv;
+ private final VersionedValue<Map<UUID, InternalIgniteTable>> tablesVv;
private final VersionedValue<Map<UUID, IgniteIndex>> indicesVv;
@@ -213,7 +214,8 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
Map<String, IgniteSchema> res = new HashMap<>(schemas);
- IgniteSchema schema = res.computeIfAbsent(schemaName,
IgniteSchema::new);
+ IgniteSchema schema = res.compute(schemaName,
+ (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
CompletableFuture<IgniteTableImpl> igniteTableFuture =
convert(causalityToken, table);
@@ -223,11 +225,18 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
return failedFuture(ex);
}
- Map<UUID, IgniteTable> resTbls = new
HashMap<>(tables);
+ Map<UUID, InternalIgniteTable> resTbls =
new HashMap<>(tables);
return igniteTableFuture
.thenApply(igniteTable ->
inBusyLock(busyLock, () -> {
- resTbls.put(igniteTable.id(),
igniteTable);
+ InternalIgniteTable oldTable =
resTbls.put(igniteTable.id(), igniteTable);
+
+ // looks like this is UPDATE
operation
+ if (oldTable != null) {
+ for (var index :
oldTable.indexes().values()) {
+
igniteTable.addIndex(index);
+ }
+ }
return resTbls;
}));
@@ -281,7 +290,8 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
Map<String, IgniteSchema> res = new HashMap<>(schemas);
- IgniteSchema schema = res.computeIfAbsent(schemaName,
IgniteSchema::new);
+ IgniteSchema schema = res.compute(schemaName,
+ (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
String calciteTableName = objectSimpleName(schemaName,
tableName);
@@ -295,7 +305,7 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
return failedFuture(ex);
}
- Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
+ Map<UUID, InternalIgniteTable> resTbls = new
HashMap<>(tables);
resTbls.remove(table.id());
@@ -363,15 +373,9 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
);
}
- private CompletableFuture<IgniteIndex> convert(long causalityToken,
Index<?> index) {
- return schemaManager.schemaRegistry(causalityToken, index.tableId())
- .thenApply(schemaRegistry -> inBusyLock(busyLock, () ->
convert(index, schemaRegistry)));
- }
-
- private IgniteIndex convert(Index<?> index, SchemaRegistry schemaRegistry)
{
+ private IgniteIndex convert(Index<?> index, SchemaRegistry schemaRegistry,
IgniteTable table) {
IndexDescriptor desc = index.descriptor();
SchemaDescriptor schema = schemaRegistry.schema();
- IgniteTable table = tablesVv.latest().get(index.tableId());
assert table != null;
@@ -421,47 +425,63 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
}
try {
- schemasVv.update(
- causalityToken,
- (schemas, e) -> inBusyLock(busyLock, () -> {
- if (e != null) {
- return failedFuture(e);
- }
+ schemasVv.update(causalityToken, (schemas, e) ->
inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
- Map<String, IgniteSchema> res = new HashMap<>(schemas);
+ Map<String, IgniteSchema> res = new HashMap<>(schemas);
- IgniteSchema schema = res.computeIfAbsent(schemaName,
IgniteSchema::new);
+ IgniteSchema schema = res.compute(schemaName,
+ (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
- CompletableFuture<IgniteIndex> igniteIndexFuture =
convert(causalityToken, index);
+ return tablesVv.update(
+ causalityToken,
+ (tables, tblEx) -> inBusyLock(busyLock, () -> {
+ if (tblEx != null) {
+ return failedFuture(tblEx);
+ }
- return indicesVv
- .update(
- causalityToken,
- (indices, ex) -> inBusyLock(busyLock,
() -> {
- if (ex != null) {
- return failedFuture(ex);
- }
+ Map<UUID, InternalIgniteTable> resTbls = new
HashMap<>(tables);
- Map<UUID, IgniteIndex> resIdxs =
new HashMap<>(indices);
+ InternalIgniteTable table =
resTbls.compute(index.tableId(),
+ (k, v) ->
IgniteTableImpl.copyOf((IgniteTableImpl) v));
- return igniteIndexFuture
- .thenApply(igniteIndex -> {
-
resIdxs.put(index.id(), igniteIndex);
+ CompletableFuture<IgniteIndex> igniteIndexFuture =
schemaManager
+ .schemaRegistry(causalityToken,
index.tableId())
+ .thenApply(reg -> convert(index, reg,
table));
- return resIdxs;
- });
- })
- )
- .thenCombine(
- igniteIndexFuture,
- (v, igniteIndex) ->
inBusyLock(busyLock, () -> {
- schema.addIndex(index.id(),
igniteIndex);
+ return indicesVv.update(
+ causalityToken,
+ (indices, idxEx) -> inBusyLock(busyLock,
() -> {
+ if (idxEx != null) {
+ return failedFuture(idxEx);
+ }
- return null;
- })
- )
- .thenCompose(v -> inBusyLock(busyLock, () ->
completedFuture(res)));
- }));
+ Map<UUID, IgniteIndex> resIdxs = new
HashMap<>(indices);
+
+ return igniteIndexFuture
+ .thenApply(igniteIndex -> {
+ resIdxs.put(index.id(),
igniteIndex);
+
+ return resIdxs;
+ });
+ })
+ ).thenCombine(
+ igniteIndexFuture,
+ (v, igniteIndex) -> inBusyLock(busyLock,
() -> {
+ String tblName = tableNameById(schema,
index.tableId());
+
+ table.addIndex(igniteIndex);
+ schema.addTable(tblName,
(InternalIgniteTable) table);
+ schema.addIndex(index.id(),
igniteIndex);
+
+ return v;
+ })
+ ).thenCompose(v -> inBusyLock(busyLock, () ->
completedFuture(resTbls)));
+ })
+ ).thenCompose(v -> inBusyLock(busyLock, () ->
completedFuture(res)));
+ }));
return calciteSchemaVv.get(causalityToken);
} finally {
@@ -469,6 +489,16 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
}
}
+ private static String tableNameById(IgniteSchema schema, UUID tableId) {
+ return schema.getTableMap()
+ .entrySet()
+ .stream()
+ .filter(entry -> tableId
+ .equals(((InternalIgniteTable) entry.getValue()).id()))
+ .map(Entry::getKey)
+ .findFirst().get();
+ }
+
/**
* Index dropped callback method deregisters index from Calcite schema.
*
@@ -489,21 +519,44 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
Map<String, IgniteSchema> res = new HashMap<>(schemas);
- IgniteSchema schema = res.computeIfAbsent(schemaName,
IgniteSchema::new);
+ IgniteSchema schema = res.compute(schemaName,
+ (k, v) -> v == null ? new IgniteSchema(schemaName) :
IgniteSchema.copy(v));
- if (schema.removeIndex(indexId)) {
- return indicesVv.update(causalityToken, (indices, ex) ->
inBusyLock(busyLock, () -> {
- if (ex != null) {
- return failedFuture(ex);
+ IgniteIndex rmvIndex = schema.removeIndex(indexId);
+ if (rmvIndex != null) {
+ return tablesVv.update(
+ causalityToken,
+ (tables, tlbEx) -> inBusyLock(busyLock, () -> {
+ if (tlbEx != null) {
+ return failedFuture(tlbEx);
}
- Map<UUID, IgniteIndex> resIdxs = new
HashMap<>(indices);
+ Map<UUID, InternalIgniteTable> resTbls = new
HashMap<>(tables);
- resIdxs.remove(indexId);
+ InternalIgniteTable table =
resTbls.compute(rmvIndex.table().id(),
+ (k, v) ->
IgniteTableImpl.copyOf((IgniteTableImpl) v));
- return completedFuture(resIdxs);
- }
- )).thenCompose(v -> inBusyLock(busyLock, () ->
completedFuture(res)));
+ table.removeIndex(rmvIndex.name());
+
+ return indicesVv.update(causalityToken,
(indices, idxEx) -> inBusyLock(busyLock, () -> {
+ if (idxEx != null) {
+ return failedFuture(idxEx);
+ }
+
+ Map<UUID, IgniteIndex> resIdxs =
new HashMap<>(indices);
+
+ IgniteIndex rmvIdx =
resIdxs.remove(indexId);
+
+ assert
table.id().equals(rmvIdx.table().id());
+
+ String tblName =
tableNameById(schema, rmvIdx.table().id());
+ schema.addTable(tblName, table);
+
+ return completedFuture(resIdxs);
+ }
+ )).thenCompose(v -> inBusyLock(busyLock, () ->
completedFuture(resTbls)));
+ })
+ ).thenCompose(v -> inBusyLock(busyLock, () ->
completedFuture(res)));
}
return completedFuture(res);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 417c22bba4..cf5db348c2 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.SchemaUtils;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
@@ -330,25 +331,25 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varbinary(255)) "
+ "with partitions=1,replicas=1", curMethodName);
- awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
String finalNewTblSql1 = newTblSql;
- assertThrows(TableAlreadyExistsException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1)));
+ assertThrows(TableAlreadyExistsException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1)));
String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1
int PRIMARY KEY, c2 varbinary(255)) "
+ "with partitions=1,replicas=1", curMethodName);
- assertThrows(TableAlreadyExistsException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2)));
+ assertThrows(TableAlreadyExistsException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2)));
// todo: correct exception need to be thrown
https://issues.apache.org/jira/browse/IGNITE-16084
- assertThrows(SqlException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with
partitions__wrong=1,replicas=1")));
- assertThrows(SqlException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with
partitions=1,replicas__wrong=1")));
newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varchar(255))",
@@ -368,7 +369,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
String newTblSql = String.format("CREATE TABLE %s (c1 int, c2 int NOT
NULL DEFAULT 1, c3 int, primary key(c1, c2))", curMethodName);
- awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -383,24 +384,24 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varchar(255))", curMethodName);
- awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
- awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " +
curMethodName));
+ readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " +
curMethodName));
SqlQueryProcessor finalQueryProc = queryProc;
- assertThrows(TableNotFoundException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
"DROP TABLE " + curMethodName + "_not_exist")));
- assertThrows(TableNotFoundException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
"DROP TABLE " + curMethodName)));
- assertThrows(TableNotFoundException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
"DROP TABLE PUBLIC." + curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS
PUBLIC." + curMethodName + "_not_exist"));
+ readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS
PUBLIC." + curMethodName + "_not_exist"));
- awaitFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS
PUBLIC." + curMethodName));
+ readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS
PUBLIC." + curMethodName));
assertTrue(tblManager.tables().stream().noneMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -417,40 +418,40 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varchar(255))", curMethodName);
- awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
String alterCmd = String.format("ALTER TABLE %s ADD COLUMN (c3
varchar, c4 int)", curMethodName);
- awaitFirst(queryProc.queryAsync("PUBLIC", alterCmd));
+ readFirst(queryProc.queryAsync("PUBLIC", alterCmd));
String alterCmd1 = String.format("ALTER TABLE %s ADD COLUMN c5 int NOT
NULL DEFAULT 1", curMethodName);
- awaitFirst(queryProc.queryAsync("PUBLIC", alterCmd1));
+ readFirst(queryProc.queryAsync("PUBLIC", alterCmd1));
- assertThrows(ColumnAlreadyExistsException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
+ assertThrows(ColumnAlreadyExistsException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
String alterCmdNoTbl = String.format("ALTER TABLE %s ADD COLUMN (c3
varchar, c4 int)", curMethodName + "_notExist");
- assertThrows(TableNotFoundException.class, () ->
awaitFirst(queryProc.queryAsync("PUBLIC", alterCmdNoTbl)));
+ assertThrows(TableNotFoundException.class, () ->
readFirst(queryProc.queryAsync("PUBLIC", alterCmdNoTbl)));
String alterIfExistsCmd = String.format("ALTER TABLE IF EXISTS %s ADD
COLUMN (c3 varchar, c4 int)", curMethodName + "NotExist");
- awaitFirst(queryProc.queryAsync("PUBLIC", alterIfExistsCmd));
+ readFirst(queryProc.queryAsync("PUBLIC", alterIfExistsCmd));
- assertThrows(ColumnAlreadyExistsException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
+ assertThrows(ColumnAlreadyExistsException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
- awaitFirst(finalQueryProc.queryAsync("PUBLIC", String.format("ALTER
TABLE %s DROP COLUMN c4", curMethodName)));
+ readFirst(finalQueryProc.queryAsync("PUBLIC", String.format("ALTER
TABLE %s DROP COLUMN c4", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE
%s ADD COLUMN IF NOT EXISTS c3 varchar", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s
ADD COLUMN IF NOT EXISTS c3 varchar", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE
%s DROP COLUMN c3", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s
DROP COLUMN c3", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE
%s DROP COLUMN IF EXISTS c3", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s
DROP COLUMN IF EXISTS c3", curMethodName)));
- assertThrows(ColumnNotFoundException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(ColumnNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s DROP COLUMN (c3, c4)",
curMethodName))));
- assertThrows(IgniteException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(IgniteException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s DROP COLUMN c1",
curMethodName))));
}
@@ -461,20 +462,20 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
public void testAlterColumnsAddBatch() {
String curMethodName = getCurrentMethodName();
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE
%s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE
%s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE
%s ADD COLUMN (c3 varchar, c4 varchar)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s
ADD COLUMN (c3 varchar, c4 varchar)", curMethodName)));
- awaitFirst(queryProc
+ readFirst(queryProc
.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN
IF NOT EXISTS (c3 varchar, c4 varchar)", curMethodName)));
- awaitFirst(
+ readFirst(
queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s
ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar, c5 varchar)",
curMethodName)));
SqlQueryProcessor finalQueryProc = queryProc;
- assertThrows(ColumnAlreadyExistsException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(ColumnAlreadyExistsException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s ADD COLUMN (c5 varchar)",
curMethodName))));
}
@@ -485,16 +486,16 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
public void testAlterColumnsDropBatch() {
String curMethodName = getCurrentMethodName();
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE
%s "
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE
%s "
+ "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4
varchar, c5 varchar)", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE
%s DROP COLUMN c4", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s
DROP COLUMN c4", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE
%s DROP COLUMN IF EXISTS (c3, c4, c5)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s
DROP COLUMN IF EXISTS (c3, c4, c5)", curMethodName)));
SqlQueryProcessor finalQueryProc = queryProc;
- assertThrows(ColumnNotFoundException.class, () ->
awaitFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(ColumnNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("ALTER TABLE %s DROP COLUMN c4",
curMethodName))));
}
@@ -510,35 +511,35 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varbinary(255)) with partitions=1", curMethodName);
- awaitFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index1 ON %s (c1)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index1 ON %s (c1)", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
IF NOT EXISTS index1 ON %s (c1)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
IF NOT EXISTS index1 ON %s (c1)", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index2 ON %s (c1)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index2 ON %s (c1)", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index3 ON %s (c2)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index3 ON %s (c2)", curMethodName)));
assertThrows(IndexAlreadyExistsException.class, () ->
- awaitFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("CREATE INDEX index3 ON %s (c1)", curMethodName))));
+ readFirst(finalQueryProc.queryAsync("PUBLIC",
String.format("CREATE INDEX index3 ON %s (c1)", curMethodName))));
assertThrows(IgniteException.class, () ->
- awaitFirst(finalQueryProc
+ readFirst(finalQueryProc
.queryAsync("PUBLIC", String.format("CREATE INDEX
index_3 ON %s (c1)", curMethodName + "_nonExist"))));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX
index4 ON %s", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX
index4 ON %s", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX
index4 ON %s (c2 desc, c1 asc)", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX
index4 ON %s", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX
index4 ON %s", curMethodName)));
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX IF
EXISTS index4 ON %s", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX IF
EXISTS index4 ON %s", curMethodName)));
}
@Disabled("https://issues.apache.org/jira/browse/IGNITE-17197")
@@ -547,7 +548,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
String method = getCurrentMethodName();
// Without engine.
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255))", method + 0)
)));
@@ -555,7 +556,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
assertThat(tableView(method + 0).dataStorage(),
instanceOf(RocksDbDataStorageView.class));
// With existing engine.
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format(
"CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) engine %s",
@@ -567,7 +568,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
assertThat(tableView(method + 1).dataStorage(),
instanceOf(TestConcurrentHashMapDataStorageView.class));
// With existing engine in mixed case
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) engine %s", method + 2, "\"RocksDb\"")
)));
@@ -576,7 +577,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
IgniteException exception = assertThrows(
IgniteException.class,
- () -> awaitFirst(queryProc.queryAsync(
+ () -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) engine %s", method + 3, method)
))
@@ -588,7 +589,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
exception = assertThrows(
IgniteException.class,
- () -> awaitFirst(queryProc.queryAsync(
+ () -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255))", method + 4)
))
@@ -601,34 +602,34 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
void createTableWithTableOptions() {
String method = getCurrentMethodName();
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with replicas=1", method + 0)
)));
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with REPLICAS=1", method + 1)
)));
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with \"replicas\"=1", method + 2)
)));
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with \"replICAS\"=1", method + 3)
)));
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with replicas=1, partitions=1", method + 4)
)));
IgniteException exception = assertThrows(
IgniteException.class,
- () -> awaitFirst(queryProc.queryAsync(
+ () -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with replicas='%s'", method + 5, method)
))
@@ -638,7 +639,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
exception = assertThrows(
IgniteException.class,
- () -> awaitFirst(queryProc.queryAsync(
+ () -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with %s='%s'", method + 6, method, method)
))
@@ -648,7 +649,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
exception = assertThrows(
IgniteException.class,
- () -> awaitFirst(queryProc.queryAsync(
+ () -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with replicas=-1", method + 7)
))
@@ -661,7 +662,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
void createTableWithDataStorageOptions() {
String method = getCurrentMethodName();
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with dataRegion='default'", method + 0)
)));
@@ -671,7 +672,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
equalTo(DEFAULT_DATA_REGION_NAME)
);
- assertDoesNotThrow(() -> awaitFirst(queryProc.queryAsync(
+ assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
"PUBLIC",
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with DATAREGION='test_region'", method + 1)
)));
@@ -691,14 +692,14 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
public void testSchemaForTheFutureUpdate() throws Exception {
String curMethodName = getCurrentMethodName();
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE
%s "
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE
%s "
+ "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4
varchar, c5 varchar)", curMethodName)));
SchemaRegistry schemaRegistry = (((TableImpl)
tblManager.tables().get(0)).schemaView());
runAsync(() -> {
Thread.sleep(3000);
- awaitFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER
TABLE %s DROP COLUMN c4", curMethodName)));
+ readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER
TABLE %s DROP COLUMN c4", curMethodName)));
});
int lastSchemaVersion = schemaRegistry.lastSchemaVersion();
@@ -794,8 +795,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
return tableManager;
}
- private <T> AsyncSqlCursor<T>
awaitFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) {
- return await(cursors.get(0));
+ private <T> BatchedResult<T>
readFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) {
+ return await(await(cursors.get(0)).requestNextAsync(512));
}
private @Nullable TableView tableView(String tableName) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index b9d2f64351..0355c749df 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -42,6 +43,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
+import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.ignite.internal.index.Index;
import org.apache.ignite.internal.index.IndexDescriptor;
@@ -54,6 +56,7 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.IgniteTableImpl;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManagerImpl;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
@@ -83,7 +86,7 @@ public class SqlSchemaManagerTest {
tableVer,
new Column[]{new Column(0, "ID", NativeTypes.INT64, false)},
new Column[]{new Column(1, "VAL", NativeTypes.INT64, false)}
- );
+ );
@Mock
private TableManager tableManager;
@@ -312,6 +315,68 @@ public class SqlSchemaManagerTest {
verifyNoMoreInteractions(tableManager);
}
+
+ @Test
+ public void testIndexEventsProcessed() {
+ InternalTable mock = mock(InternalTable.class);
+ when(mock.tableId()).thenReturn(tableId);
+
+ when(table.name()).thenReturn("TEST_SCHEMA.T");
+ when(table.internalTable()).thenReturn(mock);
+ when(schemaRegistry.schema()).thenReturn(schemaDescriptor);
+
when(schemaRegistry.lastSchemaVersion()).thenReturn(schemaDescriptor.version());
+ when(schemaManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(schemaRegistry));
+
+ sqlSchemaManager.onTableCreated("TEST_SCHEMA", table,
testRevisionRegister.actualToken() + 1);
+ testRevisionRegister.moveForward();
+
+ IndexDescriptor descMock = mock(IndexDescriptor.class);
+ when(descMock.columns()).thenReturn(List.of());
+
+ when(index.name()).thenReturn("I");
+ when(index.id()).thenReturn(indexId);
+ when(index.tableId()).thenReturn(tableId);
+ when(index.descriptor()).thenReturn(descMock);
+
+ {
+ SchemaPlus schema1 = sqlSchemaManager.schema("TEST_SCHEMA");
+
+ sqlSchemaManager.onIndexCreated("TEST_SCHEMA", index,
testRevisionRegister.actualToken() + 1);
+ testRevisionRegister.moveForward();
+
+ SchemaPlus schema2 = sqlSchemaManager.schema("TEST_SCHEMA");
+
+ // Validate schema snapshot.
+ assertNotSame(schema1, schema2);
+ assertNotSame(schema1.getTable("T"), schema2.getTable("T"));
+
+ assertNull(schema1.unwrap(IgniteSchema.class).index(indexId));
+ assertNotNull(schema2.unwrap(IgniteSchema.class).index(indexId));
+
+ assertNull(((InternalIgniteTable)
schema1.getTable("T")).getIndex("I"));
+ assertNotNull(((InternalIgniteTable)
schema2.getTable("T")).getIndex("I"));
+ }
+ {
+ sqlSchemaManager.onIndexDropped("TEST_SCHEMA", indexId,
testRevisionRegister.actualToken() + 1);
+ SchemaPlus schema1 = sqlSchemaManager.schema("TEST_SCHEMA");
+ testRevisionRegister.moveForward();
+
+ SchemaPlus schema2 = sqlSchemaManager.schema("TEST_SCHEMA");
+
+ // Validate schema snapshot.
+ assertNotSame(schema1, schema2);
+ assertNotSame(schema1.getTable("T"), schema2.getTable("T"));
+
+ assertNotNull(schema1.unwrap(IgniteSchema.class).index(indexId));
+ assertNull(schema2.unwrap(IgniteSchema.class).index(indexId));
+
+ assertNull(((InternalIgniteTable)
schema2.getTable("T")).getIndex("I"));
+ assertNotNull(((InternalIgniteTable)
schema1.getTable("T")).getIndex("I"));
+ }
+
+ verifyNoMoreInteractions(tableManager);
+ }
+
/**
* Test revision register.
*/
@@ -346,8 +411,8 @@ public class SqlSchemaManagerTest {
Function<Long, CompletableFuture<?>> old = moveRevision;
moveRevision = rev -> allOf(
- old.apply(rev),
- function.apply(rev)
+ old.apply(rev),
+ function.apply(rev)
);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index a4d9cd36af..0a13813ff8 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -19,19 +19,24 @@ package org.apache.ignite.internal.sql.engine.planner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexFieldAccess;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.index.ColumnCollation;
+import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSortedIndexSpool;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
import org.junit.jupiter.api.Test;
@@ -193,4 +198,49 @@ public class SortedIndexSpoolPlannerTest extends
AbstractPlannerTest {
assertTrue(((RexLiteral) lowerBound.get(3)).isNull());
assertTrue(upperBound.get(1) instanceof RexFieldAccess);
}
+
+ /**
+ * Check colocated fields with DESC ordering.
+ */
+ @Test
+ public void testDescFields() throws Exception {
+ IgniteSchema publicSchema = createSchema(
+ createTable("T0", 10, IgniteDistributions.affinity(0, "T0",
"hash"),
+ "ID", Integer.class, "JID", Integer.class, "VAL",
String.class)
+ .addIndex("t0_jid_idx", 1),
+ createTable("T1", 100, IgniteDistributions.affinity(0, "T1",
"hash"),
+ "ID", Integer.class, "JID", Integer.class, "VAL",
String.class)
+
.addIndex(RelCollations.of(TraitUtils.createFieldCollation(1,
ColumnCollation.DESC_NULLS_FIRST)), "t1_jid_idx")
+ );
+
+ String sql = "select * "
+ + "from t0 "
+ + "join t1 on t1.jid < t0.jid";
+
+ assertPlan(sql, publicSchema,
+ isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(input(1,
isInstanceOf(IgniteSortedIndexSpool.class)
+ .and(spool -> {
+ List<RexNode> lowerBound =
spool.indexCondition().lowerBound();
+
+ // Condition is LESS_THEN, but we have
DESC field and condition should be in lower bound
+ // instead of upper bound.
+ assertNotNull(lowerBound);
+ assertEquals(3, lowerBound.size());
+
+ assertTrue(((RexLiteral)
lowerBound.get(0)).isNull());
+ assertTrue(lowerBound.get(1) instanceof
RexFieldAccess);
+ assertTrue(((RexLiteral)
lowerBound.get(2)).isNull());
+
+ List<RexNode> upperBound =
spool.indexCondition().upperBound();
+
+ assertNull(upperBound);
+
+ return true;
+ })
+ .and(hasChildThat(isIndexScan("T1",
"t1_jid_idx")))
+ )),
+ "MergeJoinConverter", "NestedLoopJoinConverter",
"FilterSpoolMergeToHashIndexSpoolRule"
+ );
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 32bb02cf2f..a6a2722686 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1156,6 +1156,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
});
}
+ }).exceptionally(th -> {
+ tblFut.completeExceptionally(th);
+
+ return null;
});
return tblFut;
@@ -1305,16 +1309,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return allOf(tableFuts).thenApply(unused ->
inBusyLock(busyLock, () -> {
var tables = new ArrayList<Table>(tableIds.size());
- try {
- for (var fut : tableFuts) {
- var table = fut.get();
+ for (var fut : tableFuts) {
+ var table = fut.join();
- if (table != null) {
- tables.add((Table) table);
- }
+ if (table != null) {
+ tables.add((Table) table);
}
- } catch (Throwable t) {
- throw new CompletionException(t);
}
return tables;