This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3a690ba4a9 IGNITE-19501 SchemaManager should use CatalogService for
building BinaryRow descriptors (#2356)
3a690ba4a9 is described below
commit 3a690ba4a977de0311c27c6e375b6290facf20be
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Aug 1 14:05:43 2023 +0400
IGNITE-19501 SchemaManager should use CatalogService for building BinaryRow
descriptors (#2356)
---
.../internal/catalog/commands/CatalogUtils.java | 1 +
.../descriptors/CatalogTableDescriptor.java | 11 +
.../catalog/events/AddColumnEventParameters.java | 11 +-
.../catalog/events/AlterColumnEventParameters.java | 14 +-
.../catalog/events/CreateTableEventParameters.java | 4 +-
.../catalog/events/DropColumnEventParameters.java | 11 +-
.../catalog/events/DropTableEventParameters.java | 13 +-
...ntParameters.java => TableEventParameters.java} | 12 +-
.../internal/catalog/storage/AlterColumnEntry.java | 1 +
.../internal/catalog/storage/DropColumnsEntry.java | 1 +
.../internal/catalog/storage/NewColumnsEntry.java | 1 +
.../internal/catalog/CatalogManagerSelfTest.java | 79 ++++
.../DistributionZoneRebalanceEngineTest.java | 3 +
.../RebalanceUtilUpdateAssignmentsTest.java | 1 +
modules/schema/build.gradle | 4 +
.../internal/schema/CatalogDescriptorUtils.java | 2 +
...chemaManager.java => CatalogSchemaManager.java} | 330 +++++++------
.../ignite/internal/schema/SchemaManager.java | 2 +-
.../apache/ignite/internal/schema/SchemaUtils.java | 12 +
.../CatalogToSchemaDescriptorConverter.java | 179 ++++++++
.../AbstractSchemaConverterTest.java | 10 +-
.../internal/schema/CatalogSchemaManagerTest.java | 509 +++++++++++++++++++++
.../CatalogToSchemaDescriptorConverterTest.java | 199 ++++++++
...nfigurationToSchemaDescriptorConverterTest.java | 1 +
.../testutils/SchemaConfigurationConverter.java | 3 +
.../engine/schema/CatalogSqlSchemaManagerTest.java | 10 +-
26 files changed, 1219 insertions(+), 205 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index 7823b09d47..d085f4ca0d 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -60,6 +60,7 @@ public class CatalogUtils {
id,
params.tableName(),
zoneId,
+ CatalogTableDescriptor.INITIAL_TABLE_VERSION,
params.columns().stream().map(CatalogUtils::fromParams).collect(toList()),
params.primaryKeyColumns(),
params.colocationColumns()
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index 58a744d74c..2948aa90f7 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -35,8 +35,12 @@ import org.jetbrains.annotations.Nullable;
public class CatalogTableDescriptor extends CatalogObjectDescriptor {
private static final long serialVersionUID = -2021394971104316570L;
+ public static final int INITIAL_TABLE_VERSION = 1;
+
private final int zoneId;
+ private final int tableVersion;
+
private final List<CatalogTableColumnDescriptor> columns;
private final List<String> primaryKeyColumns;
private final List<String> colocationColumns;
@@ -50,6 +54,7 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
* @param id Table id.
* @param name Table name.
* @param zoneId Distribution zone ID.
+ * @param tableVersion Version of the table.
* @param columns Table column descriptors.
* @param pkCols Primary key column names.
* @param colocationCols Colocation column names.
@@ -58,6 +63,7 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
int id,
String name,
int zoneId,
+ int tableVersion,
List<CatalogTableColumnDescriptor> columns,
List<String> pkCols,
@Nullable List<String> colocationCols
@@ -65,6 +71,7 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
super(id, Type.TABLE, name);
this.zoneId = zoneId;
+ this.tableVersion = tableVersion;
this.columns = Objects.requireNonNull(columns, "No columns defined.");
primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key
columns.");
colocationColumns = colocationCols == null ? pkCols : colocationCols;
@@ -89,6 +96,10 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
return zoneId;
}
+ public int tableVersion() {
+ return tableVersion;
+ }
+
public List<String> primaryKeyColumns() {
return primaryKeyColumns;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
index 6f0226d80f..382dd8f72c 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
@@ -23,9 +23,8 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
/**
* Add column event parameters contains descriptors of added columns.
*/
-public class AddColumnEventParameters extends CatalogEventParameters {
+public class AddColumnEventParameters extends TableEventParameters {
- private final int tableId;
private final List<CatalogTableColumnDescriptor> columnDescriptors;
/**
@@ -42,17 +41,11 @@ public class AddColumnEventParameters extends
CatalogEventParameters {
int tableId,
List<CatalogTableColumnDescriptor> columnDescriptors
) {
- super(causalityToken, catalogVersion);
+ super(causalityToken, catalogVersion, tableId);
- this.tableId = tableId;
this.columnDescriptors = columnDescriptors;
}
- /** Returns table id. */
- public int tableId() {
- return tableId;
- }
-
/**
* Returns descriptors of columns to add.
*/
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
index fdd124198d..b73f057495 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
@@ -22,9 +22,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
/**
* Create table event parameters contains a column descriptor for the modified
column.
*/
-public class AlterColumnEventParameters extends CatalogEventParameters {
-
- private final int tableId;
+public class AlterColumnEventParameters extends TableEventParameters {
private final CatalogTableColumnDescriptor columnDescriptor;
@@ -33,21 +31,15 @@ public class AlterColumnEventParameters extends
CatalogEventParameters {
*
* @param causalityToken Causality token.
* @param catalogVersion Catalog version.
- * @param tableId Returns an id the table to be modified.
+ * @param tableId ID of the table to be modified.
* @param columnDescriptor Descriptor for the column to be replaced.
*/
public AlterColumnEventParameters(long causalityToken, int catalogVersion,
int tableId, CatalogTableColumnDescriptor columnDescriptor) {
- super(causalityToken, catalogVersion);
+ super(causalityToken, catalogVersion, tableId);
- this.tableId = tableId;
this.columnDescriptor = columnDescriptor;
}
- /** Returns an id of a modified table. */
- public int tableId() {
- return tableId;
- }
-
/** Returns column descriptor for the column to be replaced. */
public CatalogTableColumnDescriptor columnDescriptor() {
return columnDescriptor;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
index a8cd6f6df3..dada194914 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
@@ -22,7 +22,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
/**
* Create table event parameters contains a table descriptor for newly created
table.
*/
-public class CreateTableEventParameters extends CatalogEventParameters {
+public class CreateTableEventParameters extends TableEventParameters {
private final CatalogTableDescriptor tableDescriptor;
@@ -34,7 +34,7 @@ public class CreateTableEventParameters extends
CatalogEventParameters {
* @param tableDescriptor Newly created table descriptor.
*/
public CreateTableEventParameters(long causalityToken, int catalogVersion,
CatalogTableDescriptor tableDescriptor) {
- super(causalityToken, catalogVersion);
+ super(causalityToken, catalogVersion, tableDescriptor.id());
this.tableDescriptor = tableDescriptor;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
index 46a17becb5..0b9d98a1ee 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
@@ -22,9 +22,8 @@ import java.util.Collection;
/**
* Drop column event parameters contains descriptors of dropped columns.
*/
-public class DropColumnEventParameters extends CatalogEventParameters {
+public class DropColumnEventParameters extends TableEventParameters {
- private final int tableId;
private final Collection<String> columns;
/**
@@ -36,17 +35,11 @@ public class DropColumnEventParameters extends
CatalogEventParameters {
* @param columns Names of columns to drop.
*/
public DropColumnEventParameters(long causalityToken, int catalogVersion,
int tableId, Collection<String> columns) {
- super(causalityToken, catalogVersion);
+ super(causalityToken, catalogVersion, tableId);
- this.tableId = tableId;
this.columns = columns;
}
- /** Returns table id. */
- public int tableId() {
- return tableId;
- }
-
/**
* Returns names of columns to drop.
*/
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
index 241e849a2d..aab6e9b790 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
@@ -20,9 +20,7 @@ package org.apache.ignite.internal.catalog.events;
/**
* Drop table event parameters contains an id of dropped table.
*/
-public class DropTableEventParameters extends CatalogEventParameters {
-
- private final int tableId;
+public class DropTableEventParameters extends TableEventParameters {
/**
* Constructor.
@@ -32,13 +30,6 @@ public class DropTableEventParameters extends
CatalogEventParameters {
* @param tableId An id of dropped table.
*/
public DropTableEventParameters(long causalityToken, int catalogVersion,
int tableId) {
- super(causalityToken, catalogVersion);
-
- this.tableId = tableId;
- }
-
- /** Returns an id of dropped table. */
- public int tableId() {
- return tableId;
+ super(causalityToken, catalogVersion, tableId);
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
similarity index 78%
copy from
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
copy to
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
index 241e849a2d..089a1be64e 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
@@ -18,10 +18,9 @@
package org.apache.ignite.internal.catalog.events;
/**
- * Drop table event parameters contains an id of dropped table.
+ * Event that is related to a table.
*/
-public class DropTableEventParameters extends CatalogEventParameters {
-
+public abstract class TableEventParameters extends CatalogEventParameters {
private final int tableId;
/**
@@ -29,15 +28,14 @@ public class DropTableEventParameters extends
CatalogEventParameters {
*
* @param causalityToken Causality token.
* @param catalogVersion Catalog version.
- * @param tableId An id of dropped table.
+ * @param tableId ID of the table to which the event relates.
*/
- public DropTableEventParameters(long causalityToken, int catalogVersion,
int tableId) {
+ public TableEventParameters(long causalityToken, int catalogVersion, int
tableId) {
super(causalityToken, catalogVersion);
-
this.tableId = tableId;
}
- /** Returns an id of dropped table. */
+ /** Returns an id of a modified table. */
public int tableId() {
return tableId;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index 910a363d81..5ef3f9965d 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -92,6 +92,7 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
table.id(),
table.name(),
table.zoneId(),
+ table.tableVersion() + 1,
table.columns().stream()
.map(source ->
source.name().equals(column.name()) ? column : source)
.collect(toList()),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index e29c250283..a3c5dcadd0 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -89,6 +89,7 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
table.id(),
table.name(),
table.zoneId(),
+ table.tableVersion() + 1,
table.columns().stream()
.filter(col ->
!columns.contains(col.name()))
.collect(toList()),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index 1e1aff017e..936e7c0bf5 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -89,6 +89,7 @@ public class NewColumnsEntry implements UpdateEntry, Fireable
{
table.id(),
table.name(),
table.zoneId(),
+ table.tableVersion() + 1,
CollectionUtils.concat(table.columns(), descriptors),
table.primaryKeyColumns(),
table.colocationColumns()) : table
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index a1ef062a31..1a2c34c4d6 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -1873,6 +1873,85 @@ public class CatalogManagerSelfTest extends
BaseIgniteAbstractTest {
assertThat(manager.indexes(2), hasItems(index(2,
createPkIndexName(TABLE_NAME)), index(2, INDEX_NAME)));
}
+ @Test
+ public void createTableProducesTableVersion1() {
+ createSomeTable(TABLE_NAME);
+
+ CatalogTableDescriptor table = manager.table(TABLE_NAME,
Long.MAX_VALUE);
+
+ assertThat(table.tableVersion(), is(1));
+ }
+
+ @Test
+ public void addColumnIncrementsTableVersion() {
+ createSomeTable(TABLE_NAME);
+
+ CompletableFuture<Void> future = manager.addColumn(
+ AlterTableAddColumnParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .tableName(TABLE_NAME)
+
.columns(List.of(ColumnParams.builder().name("val2").type(ColumnType.INT32).build()))
+ .build()
+ );
+ assertThat(future, willCompleteSuccessfully());
+
+ CatalogTableDescriptor table = manager.table(TABLE_NAME,
Long.MAX_VALUE);
+
+ assertThat(table.tableVersion(), is(2));
+ }
+
+ @Test
+ public void dropColumnIncrementsTableVersion() {
+ createSomeTable(TABLE_NAME);
+
+ CompletableFuture<Void> future = manager.dropColumn(
+ AlterTableDropColumnParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .tableName(TABLE_NAME)
+ .columns(Set.of("val1"))
+ .build()
+ );
+ assertThat(future, willCompleteSuccessfully());
+
+ CatalogTableDescriptor table = manager.table(TABLE_NAME,
Long.MAX_VALUE);
+
+ assertThat(table.tableVersion(), is(2));
+ }
+
+ @Test
+ public void alterColumnIncrementsTableVersion() {
+ createSomeTable(TABLE_NAME);
+
+ CompletableFuture<Void> future = manager.alterColumn(
+ AlterColumnParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .tableName(TABLE_NAME)
+ .columnName("val1")
+ .type(ColumnType.INT64)
+ .build()
+ );
+ assertThat(future, willCompleteSuccessfully());
+
+ CatalogTableDescriptor table = manager.table(TABLE_NAME,
Long.MAX_VALUE);
+
+ assertThat(table.tableVersion(), is(2));
+ }
+
+ private void createSomeTable(String tableName) {
+ CreateTableParams params = CreateTableParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .tableName(tableName)
+ .zone(ZONE_NAME)
+ .columns(List.of(
+
ColumnParams.builder().name("key1").type(ColumnType.INT32).build(),
+
ColumnParams.builder().name("val1").type(ColumnType.INT32).build()
+ ))
+ .primaryKeyColumns(List.of("key1"))
+ .build();
+
+ assertThat(manager.createTable(params), willCompleteSuccessfully());
+ }
+
private CompletableFuture<Void> changeColumn(
String tab,
String col,
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 6382aba231..75adff45d3 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -461,6 +462,8 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
}));
tableChange.changePrimaryKey(primaryKeyChange ->
primaryKeyChange.changeColumns("k1"));
+
+ ((ExtendedTableChange)
tableChange).changeSchemaId(1);
}));
});
});
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index 93800f158e..1187527d71 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -98,6 +98,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
1,
"table1",
0,
+ 1,
List.of(new CatalogTableColumnDescriptor("k1", ColumnType.INT32,
false, 0, 0, 0, null)),
List.of("k1"),
null
diff --git a/modules/schema/build.gradle b/modules/schema/build.gradle
index 92e8100a45..0fd37e6cdd 100644
--- a/modules/schema/build.gradle
+++ b/modules/schema/build.gradle
@@ -41,11 +41,15 @@ dependencies {
testAnnotationProcessor libs.jmh.annotation.processor
testImplementation project(':ignite-configuration')
testImplementation project(':ignite-core')
+ testImplementation project(':ignite-vault')
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-configuration')))
+ testImplementation(testFixtures(project(':ignite-metastorage')))
+ testImplementation(testFixtures(project(':ignite-vault')))
testImplementation libs.hamcrest.core
testImplementation libs.hamcrest.optional
testImplementation libs.mockito.core
+ testImplementation libs.mockito.junit
testImplementation libs.jmh.core
testImplementation libs.javax.annotations
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
index 8acc7aa194..101cddf769 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
@@ -33,6 +33,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.schema.configuration.ColumnTypeView;
import org.apache.ignite.internal.schema.configuration.ColumnView;
import
org.apache.ignite.internal.schema.configuration.ConfigurationToSchemaDescriptorConverter;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
import org.apache.ignite.internal.schema.configuration.PrimaryKeyView;
import org.apache.ignite.internal.schema.configuration.TableView;
import
org.apache.ignite.internal.schema.configuration.ValueSerializationHelper;
@@ -65,6 +66,7 @@ public class CatalogDescriptorUtils {
config.id(),
config.name(),
config.zoneId(),
+ ((ExtendedTableView) config).schemaId(),
config.columns().stream().map(CatalogDescriptorUtils::toTableColumnDescriptor).collect(toList()),
List.of(primaryKeyConfig.columns()),
List.of(primaryKeyConfig.colocationColumns())
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
similarity index 56%
copy from
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
copy to
modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
index 51f0d71c8a..becabab496 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
@@ -17,42 +17,45 @@
package org.apache.ignite.internal.schema;
+import static java.util.Collections.emptyList;
+import static java.util.Objects.requireNonNullElse;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
-import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Flow.Subscriber;
-import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongFunction;
-import org.apache.ignite.configuration.NamedListView;
-import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
+import org.apache.ignite.internal.catalog.events.TableEventParameters;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.manager.Producer;
-import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.schema.configuration.ColumnView;
-import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
@@ -61,16 +64,14 @@ import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.Nullable;
/**
- * The class services a management of table schemas.
+ * This class services management of table schemas.
*/
-public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters> implements IgniteComponent {
- private static final IgniteLogger LOGGER =
Loggers.forClass(SchemaManager.class);
-
- /** Initial version for schemas. */
- public static final int INITIAL_SCHEMA_VERSION = 1;
+public class CatalogSchemaManager extends Producer<SchemaEvent,
SchemaEventParameters> implements IgniteComponent {
+ private static final IgniteLogger LOGGER =
Loggers.forClass(CatalogSchemaManager.class);
/** Schema history key predicate part. */
private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
+ private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX =
".sch-hist-latest";
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -78,61 +79,93 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
- /** Tables configuration. */
- private final TablesConfiguration tablesCfg;
+ private final CatalogService catalogService;
- /** Versioned store for tables by name. */
+ /** Versioned store for tables by ID. */
private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>>
registriesVv;
/** Meta storage manager. */
private final MetaStorageManager metastorageMgr;
/** Constructor. */
- public SchemaManager(
+ public CatalogSchemaManager(
Consumer<LongFunction<CompletableFuture<?>>> registry,
- TablesConfiguration tablesCfg,
+ CatalogService catalogService,
MetaStorageManager metastorageMgr
) {
this.registriesVv = new IncrementalVersionedValue<>(registry,
HashMap::new);
- this.tablesCfg = tablesCfg;
+ this.catalogService = catalogService;
this.metastorageMgr = metastorageMgr;
}
- /** {@inheritDoc} */
@Override
public void start() {
- tablesCfg.tables().any().columns().listen(this::onSchemaChange);
+ catalogService.listen(CatalogEvent.TABLE_CREATE, this::onTableCreated);
+ catalogService.listen(CatalogEvent.TABLE_ALTER, this::onTableAltered);
+
+ registerExistingTables();
}
- /**
- * Listener of schema configuration changes.
- *
- * @param ctx Configuration context.
- * @return A future.
- */
- private CompletableFuture<?>
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
- if (!busyLock.enterBusy()) {
- return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
+ private void registerExistingTables() {
+ // TODO: IGNITE-20051 - add proper recovery (consider tables that are
removed now; take token and catalog version
+ // exactly matching the tables).
+
+ long causalityToken = metastorageMgr.appliedRevision();
+ int catalogVersion = catalogService.latestCatalogVersion();
+
+ for (CatalogTableDescriptor tableDescriptor :
catalogService.tables(catalogVersion)) {
+ onTableCreated(new CreateTableEventParameters(causalityToken,
catalogVersion, tableDescriptor), null);
}
- try {
- ExtendedTableView tblCfg = ctx.newValue(ExtendedTableView.class);
+ registriesVv.complete(causalityToken);
+ }
+
+ private CompletableFuture<Boolean> onTableCreated(CatalogEventParameters
event, @Nullable Throwable ex) {
+ if (ex != null) {
+ return failedFuture(ex);
+ }
+
+ CreateTableEventParameters creationEvent =
(CreateTableEventParameters) event;
+
+ return onTableCreatedOrAltered(creationEvent.tableDescriptor(),
creationEvent.causalityToken());
+ }
+
+ private CompletableFuture<Boolean> onTableAltered(CatalogEventParameters
event, @Nullable Throwable ex) {
+ if (ex != null) {
+ return failedFuture(ex);
+ }
+
+ assert event instanceof TableEventParameters;
+
+ TableEventParameters tableEvent = ((TableEventParameters) event);
+
+ CatalogTableDescriptor tableDescriptor =
catalogService.table(tableEvent.tableId(), tableEvent.catalogVersion());
+
+ assert tableDescriptor != null;
- int newSchemaVersion = tblCfg.schemaId();
+ return onTableCreatedOrAltered(tableDescriptor,
event.causalityToken());
+ }
- int tblId = tblCfg.id();
+ private CompletableFuture<Boolean>
onTableCreatedOrAltered(CatalogTableDescriptor tableDescriptor, long
causalityToken) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
- if (searchSchemaByVersion(tblId, newSchemaVersion) != null) {
+ try {
+ int tableId = tableDescriptor.id();
+ int newSchemaVersion = tableDescriptor.tableVersion();
+
+ if (searchSchemaByVersion(tableId, newSchemaVersion) != null) {
return completedFuture(null);
}
- SchemaDescriptor newSchema =
SchemaUtils.prepareSchemaDescriptor(newSchemaVersion, tblCfg);
+ SchemaDescriptor newSchema =
SchemaUtils.prepareSchemaDescriptor(tableDescriptor);
- // This is intentionally a blocking call to enforce configuration
listener execution order. Unfortunately it is not possible
+ // This is intentionally a blocking call to enforce catalog
listener execution order. Unfortunately it is not possible
// to execute this method asynchronously, because the schema
descriptor is needed to fire the CREATE event as a synchronous part
- // of the configuration listener.
+ // of the catalog listener.
try {
- setColumnMapping(newSchema, tblId);
+ setColumnMapping(newSchema, tableId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -141,10 +174,8 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
return failedFuture(e);
}
- long causalityToken = ctx.storageRevision();
-
// Fire event early, because dependent listeners have to register
VersionedValues' update futures
- var eventParams = new SchemaEventParameters(causalityToken, tblId,
newSchema);
+ var eventParams = new SchemaEventParameters(causalityToken,
tableId, newSchema);
fireEvent(SchemaEvent.CREATE, eventParams)
.whenComplete((v, e) -> {
@@ -156,37 +187,20 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
return registriesVv.update(causalityToken, (registries, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(new
IgniteInternalException(IgniteStringFormatter.format(
- "Cannot create a schema for the table [tblId={},
ver={}]", tblId, newSchemaVersion), e)
+ "Cannot create a schema for the table [tblId={},
ver={}]", tableId, newSchemaVersion), e)
);
}
- return saveSchemaDescriptor(tblId, newSchema)
- .thenApply(t -> registerSchema(tblId, newSchema,
registries));
- }));
+ return saveSchemaDescriptor(tableId, newSchema)
+ .thenApply(t -> registerSchema(registries, tableId,
newSchema));
+ })).thenApply(ignored -> false);
} finally {
busyLock.leaveBusy();
}
}
- private Map<Integer, SchemaRegistryImpl> registerSchema(int tblId,
SchemaDescriptor newSchema,
- Map<Integer, SchemaRegistryImpl> registries) {
- SchemaRegistryImpl reg = registries.get(tblId);
-
- if (reg == null) {
- Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
-
- copy.put(tblId, createSchemaRegistry(tblId, newSchema));
-
- return copy;
- } else {
- reg.onSchemaRegistered(newSchema);
-
- return registries;
- }
- }
-
private void setColumnMapping(SchemaDescriptor schema, int tableId) throws
ExecutionException, InterruptedException {
- if (schema.version() == INITIAL_SCHEMA_VERSION) {
+ if (schema.version() == CatalogTableDescriptor.INITIAL_TABLE_VERSION) {
return;
}
@@ -197,32 +211,79 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
if (prevSchema == null) {
// This is intentionally a blocking call, because this method is
used in a synchronous part of the configuration listener.
// See the call site for more details.
- prevSchema = schemaByVersion(tableId, prevVersion).get();
+ prevSchema = loadSchemaDescriptor(tableId, prevVersion).get();
}
schema.columnMapping(SchemaUtils.columnMapper(prevSchema, schema));
}
/**
- * Gets a schema descriptor from the configuration storage.
+ * Loads the table schema descriptor by version from Metastore.
*
- * @param tableId Table ID.
- * @param schemaVer Schema version.
- * @return Schema descriptor.
+ * @param tblId Table id.
+ * @param ver Schema version.
+ * @return Schema representation if schema found, {@code null} otherwise.
*/
- private CompletableFuture<SchemaDescriptor> loadSchemaDescriptor(int
tableId, int schemaVer) {
- CompletableFuture<Entry> ent =
metastorageMgr.get(schemaWithVerHistKey(tableId, schemaVer));
+ private CompletableFuture<SchemaDescriptor> loadSchemaDescriptor(int
tblId, int ver) {
+ return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
+ .thenApply(entry -> {
+ byte[] value = entry.value();
- return ent.thenApply(e ->
SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
+ assert value != null;
+
+ return SchemaSerializerImpl.INSTANCE.deserialize(value);
+ });
}
- /** Saves a schema descriptor to the configuration storage. */
- private CompletableFuture<Boolean> saveSchemaDescriptor(int tableId,
SchemaDescriptor schema) {
- ByteArray key = schemaWithVerHistKey(tableId, schema.version());
+ /**
+ * Saves a schema in the MetaStorage.
+ *
+ * @param tableId Table id.
+ * @param schema Schema descriptor.
+ * @return Future that will be completed when the schema gets saved.
+ */
+ private CompletableFuture<Void> saveSchemaDescriptor(int tableId,
SchemaDescriptor schema) {
+ ByteArray schemaKey = schemaWithVerHistKey(tableId, schema.version());
+ ByteArray latestSchemaVersionKey = latestSchemaVersionKey(tableId);
byte[] serializedSchema =
SchemaSerializerImpl.INSTANCE.serialize(schema);
- return metastorageMgr.invoke(notExists(key), put(key,
serializedSchema), noop());
+ return metastorageMgr.invoke(
+ notExists(schemaKey),
+ List.of(
+ put(schemaKey, serializedSchema),
+ put(latestSchemaVersionKey,
intToBytes(schema.version()))
+ ),
+ emptyList()
+ ).thenApply(unused -> null);
+ }
+
+ /**
+ * Registers the new schema in the registries.
+ *
+ * @param registries Registries before registering this schema.
+ * @param tableId ID of the table to which the schema belongs.
+ * @param schema The schema to register.
+ * @return Registries after registering this schema.
+ */
+ private Map<Integer, SchemaRegistryImpl> registerSchema(
+ Map<Integer, SchemaRegistryImpl> registries,
+ int tableId,
+ SchemaDescriptor schema
+ ) {
+ SchemaRegistryImpl reg = registries.get(tableId);
+
+ if (reg == null) {
+ Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
+
+ copy.put(tableId, createSchemaRegistry(tableId, schema));
+
+ return copy;
+ } else {
+ reg.onSchemaRegistered(schema);
+
+ return registries;
+ }
}
/**
@@ -235,7 +296,7 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
private SchemaRegistryImpl createSchemaRegistry(int tableId,
SchemaDescriptor initialSchema) {
return new SchemaRegistryImpl(
ver -> inBusyLock(busyLock, () ->
loadSchemaDescriptor(tableId, ver)),
- () -> inBusyLock(busyLock, () -> latestSchemaVersion(tableId)),
+ () -> inBusyLock(busyLock, () ->
latestSchemaVersionOrDefault(tableId)),
initialSchema
);
}
@@ -290,12 +351,18 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
}
/**
- * Drop schema registry for the given table id.
+ * Drops schema registry for the given table id (along with the
corresponding schemas).
*
* @param causalityToken Causality token.
* @param tableId Table id.
*/
public CompletableFuture<?> dropRegistry(long causalityToken, int tableId)
{
+ return removeRegistry(causalityToken, tableId).thenCompose(unused -> {
+ return destroySchemas(tableId);
+ });
+ }
+
+ private CompletableFuture<?> removeRegistry(long causalityToken, int
tableId) {
return registriesVv.update(causalityToken, (registries, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(new IgniteInternalException(
@@ -310,7 +377,23 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
}));
}
- /** {@inheritDoc} */
+ private CompletableFuture<?> destroySchemas(int tableId) {
+ return latestSchemaVersion(tableId)
+ .thenCompose(latestVersion -> {
+ if (latestVersion == null) {
+ // Nothing to remove.
+ return completedFuture(null);
+ }
+
+ Set<ByteArray> keysToRemove =
IntStream.rangeClosed(CatalogTableDescriptor.INITIAL_TABLE_VERSION,
latestVersion)
+ .mapToObj(version -> schemaWithVerHistKey(tableId,
version))
+ .collect(toSet());
+ keysToRemove.add(latestSchemaVersionKey(tableId));
+
+ return metastorageMgr.removeAll(keysToRemove);
+ });
+ }
+
@Override
public void stop() throws Exception {
if (!stopGuard.compareAndSet(false, true)) {
@@ -318,77 +401,36 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
}
busyLock.block();
-
- IgniteUtils.closeAllManually(registriesVv.latest().values().stream());
}
/**
- * Gets the latest version of the table schema which available in
Metastore.
+ * Gets the latest version of the table schema which is available in
Metastore or the default (1) if nothing is available.
*
- * @param tblId Table id.
+ * @param tableId Table id.
* @return The latest schema version.
*/
- private CompletableFuture<Integer> latestSchemaVersion(int tblId) {
- var latestVersionFuture = new CompletableFuture<Integer>();
-
- metastorageMgr.prefix(schemaHistPrefix(tblId)).subscribe(new
Subscriber<>() {
- private int lastVer = INITIAL_SCHEMA_VERSION;
-
- @Override
- public void onSubscribe(Subscription subscription) {
- // Request unlimited demand.
- subscription.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Entry item) {
- String key = new String(item.key(), StandardCharsets.UTF_8);
- int descVer = extractVerFromSchemaKey(key);
-
- if (descVer > lastVer) {
- lastVer = descVer;
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- latestVersionFuture.completeExceptionally(throwable);
- }
-
- @Override
- public void onComplete() {
- latestVersionFuture.complete(lastVer);
- }
- });
-
- return latestVersionFuture;
+ private CompletableFuture<Integer> latestSchemaVersionOrDefault(int
tableId) {
+ return latestSchemaVersion(tableId)
+ .thenApply(versionOrNull -> requireNonNullElse(versionOrNull,
CatalogTableDescriptor.INITIAL_TABLE_VERSION));
}
/**
- * Gets the defined version of the table schema which available in
Metastore.
+ * Gets the latest version of the table schema which is available in
Metastore or {@code null} if nothing is available.
*
- * @param tblId Table id.
- * @return Schema representation if schema found, {@code null} otherwise.
+ * @param tableId Table id.
+ * @return The latest schema version or {@code null} if nothing is
available.
*/
- private CompletableFuture<SchemaDescriptor> schemaByVersion(int tblId, int
ver) {
- return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
+ private CompletableFuture<Integer> latestSchemaVersion(int tableId) {
+ return metastorageMgr.get(latestSchemaVersionKey(tableId))
.thenApply(entry -> {
- byte[] value = entry.value();
-
- assert value != null;
-
- return SchemaSerializerImpl.INSTANCE.deserialize(value);
+ if (entry == null || entry.value() == null) {
+ return null;
+ } else {
+ return ByteUtils.bytesToInt(entry.value());
+ }
});
}
- private int extractVerFromSchemaKey(String key) {
- int pos = key.lastIndexOf('.');
- assert pos != -1 : "Unexpected key: " + key;
-
- key = key.substring(pos + 1);
- return Integer.parseInt(key);
- }
-
/**
* Forms schema history key.
*
@@ -400,13 +442,7 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
}
- /**
- * Forms schema history predicate.
- *
- * @param tblId Table id.
- * @return {@link ByteArray} representation.
- */
- private static ByteArray schemaHistPrefix(int tblId) {
- return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX);
+ private static ByteArray latestSchemaVersionKey(int tableId) {
+ return ByteArray.fromString(tableId +
LATEST_SCHEMA_VERSION_STORE_SUFFIX);
}
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 51f0d71c8a..e23b1ad562 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -81,7 +81,7 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
/** Tables configuration. */
private final TablesConfiguration tablesCfg;
- /** Versioned store for tables by name. */
+ /** Versioned store for tables by ID. */
private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>>
registriesVv;
/** Meta storage manager. */
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
index aa0bbc51b3..a420ec4cbd 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.schema;
import java.util.Arrays;
import java.util.Comparator;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import
org.apache.ignite.internal.schema.catalog.CatalogToSchemaDescriptorConverter;
import
org.apache.ignite.internal.schema.configuration.ConfigurationToSchemaDescriptorConverter;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.mapping.ColumnMapper;
@@ -39,6 +41,16 @@ public class SchemaUtils {
return ConfigurationToSchemaDescriptorConverter.convert(schemaVer,
tableView);
}
+ /**
+ * Creates schema descriptor for the table with specified descriptor.
+ *
+ * @param tableDescriptor Table descriptor.
+ * @return Schema descriptor.
+ */
+ public static SchemaDescriptor
prepareSchemaDescriptor(CatalogTableDescriptor tableDescriptor) {
+ return CatalogToSchemaDescriptorConverter.convert(tableDescriptor);
+ }
+
/**
* Prepares column mapper.
*
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
new file mode 100644
index 0000000000..d6a26fedf6
--- /dev/null
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.catalog;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.commands.DefaultValue.ConstantValue;
+import org.apache.ignite.internal.catalog.commands.DefaultValue.FunctionCall;
+import org.apache.ignite.internal.catalog.commands.DefaultValue.Type;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.DefaultValueGenerator;
+import org.apache.ignite.internal.schema.DefaultValueProvider;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Utility class to convert table descriptor from the Catalog as well as all
related objects to a {@link SchemaDescriptor} domain.
+ */
+public final class CatalogToSchemaDescriptorConverter {
+ private static final Map<String, NativeType> FIX_SIZED_TYPES;
+
+ static {
+ List<NativeType> types =
IgniteUtils.collectStaticFields(NativeTypes.class, NativeType.class);
+
+ Map<String, NativeType> tmp = new HashMap<>(types.size(), 1.0f);
+
+ for (NativeType type : types) {
+ if (!type.spec().fixedLength()) {
+ continue;
+ }
+
+ tmp.put(type.spec().name(), type);
+ }
+
+ FIX_SIZED_TYPES = Map.copyOf(tmp);
+ }
+
+ /**
+ * Converts type of a column descriptor to a {@link NativeType}.
+ *
+ * @param columnDescriptor Descriptor to convert.
+ * @return A {@link NativeType} object represented by given descriptor.
+ */
+ public static NativeType convertType(CatalogTableColumnDescriptor
columnDescriptor) {
+ String typeName = columnDescriptor.type().name();
+ NativeType res = FIX_SIZED_TYPES.get(typeName);
+
+ if (res != null) {
+ return res;
+ }
+
+ switch (typeName) {
+ case "BITMASK":
+ int bitmaskLen = columnDescriptor.length();
+
+ return NativeTypes.bitmaskOf(bitmaskLen);
+
+ case "STRING":
+ int strLen = columnDescriptor.length();
+
+ return NativeTypes.stringOf(strLen);
+
+ case "BYTES":
+ case "BYTE_ARRAY":
+ int blobLen = columnDescriptor.length();
+
+ return NativeTypes.blobOf(blobLen);
+
+ case "DECIMAL":
+ int prec = columnDescriptor.precision();
+ int scale = columnDescriptor.scale();
+
+ return NativeTypes.decimalOf(prec, scale);
+
+ case "NUMBER":
+ return NativeTypes.numberOf(columnDescriptor.precision());
+
+ case "TIME":
+ return NativeTypes.time(columnDescriptor.precision());
+
+ case "DATETIME":
+ return NativeTypes.datetime(columnDescriptor.precision());
+
+ case "TIMESTAMP":
+ return NativeTypes.timestamp(columnDescriptor.precision());
+
+ default:
+ throw new IllegalArgumentException("Unknown type " + typeName);
+ }
+ }
+
+ /**
+ * Converts given column view to a {@link Column}.
+ *
+ * @param columnOrder Number of the current column.
+ * @param columnDescriptor Descriptor to convert.
+ * @return A {@link Column} object representing the table column
descriptor.
+ */
+ public static Column convert(int columnOrder, CatalogTableColumnDescriptor
columnDescriptor) {
+ NativeType type = convertType(columnDescriptor);
+
+ DefaultValue defaultValue = columnDescriptor.defaultValue();
+
+ DefaultValueProvider defaultValueProvider;
+
+ if (defaultValue == null) {
+ defaultValueProvider = DefaultValueProvider.NULL_PROVIDER;
+ } else if (defaultValue.type() == Type.CONSTANT) {
+ ConstantValue constantValue = (ConstantValue) defaultValue;
+ defaultValueProvider =
DefaultValueProvider.constantProvider(constantValue.value());
+ } else if (defaultValue.type() == Type.FUNCTION_CALL) {
+ FunctionCall functionCall = (FunctionCall) defaultValue;
+ defaultValueProvider = DefaultValueProvider.forValueGenerator(
+ DefaultValueGenerator.valueOf(functionCall.functionName())
+ );
+ } else {
+ throw new IllegalStateException("Unknown value supplier class " +
defaultValue.getClass().getName());
+ }
+
+ return new Column(columnOrder, columnDescriptor.name(), type,
columnDescriptor.nullable(), defaultValueProvider);
+ }
+
+ /**
+ * Converts given table descriptor to a {@link SchemaDescriptor}.
+ *
+ * @param tableDescriptor Descriptor to convert.
+ * @return A {@link SchemaDescriptor} object representing the table
descriptor.
+ */
+ public static SchemaDescriptor convert(CatalogTableDescriptor
tableDescriptor) {
+ Set<String> keyColumnsNames =
Set.copyOf(tableDescriptor.primaryKeyColumns());
+
+ List<Column> keyCols = new ArrayList<>(keyColumnsNames.size());
+ List<Column> valCols = new
ArrayList<>(tableDescriptor.columns().size() - keyColumnsNames.size());
+
+ int idx = 0;
+
+ for (CatalogTableColumnDescriptor column : tableDescriptor.columns()) {
+ if (keyColumnsNames.contains(column.name())) {
+ keyCols.add(convert(idx, column));
+ } else {
+ valCols.add(convert(idx, column));
+ }
+
+ idx++;
+ }
+
+ return new SchemaDescriptor(
+ tableDescriptor.tableVersion(),
+ keyCols.toArray(Column[]::new),
+ tableDescriptor.colocationColumns().toArray(String[]::new),
+ valCols.toArray(Column[]::new)
+ );
+ }
+
+ private CatalogToSchemaDescriptorConverter() { }
+}
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractSchemaConverterTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/AbstractSchemaConverterTest.java
similarity index 95%
rename from
modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractSchemaConverterTest.java
rename to
modules/schema/src/test/java/org/apache/ignite/internal/schema/AbstractSchemaConverterTest.java
index b7c7a5154d..81b7a67055 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractSchemaConverterTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/AbstractSchemaConverterTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.schema.configuration;
+package org.apache.ignite.internal.schema;
import static java.math.RoundingMode.HALF_UP;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -35,10 +35,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.schema.DecimalNativeType;
-import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.schema.NativeTypeSpec;
-import org.apache.ignite.internal.schema.NativeTypes;
import
org.apache.ignite.internal.schema.testutils.definition.ColumnType.DecimalColumnType;
import org.apache.ignite.internal.util.ArrayUtils;
@@ -184,8 +180,8 @@ public class AbstractSchemaConverterTest {
* Class represents a default value of particular type.
*/
protected static class DefaultValueArg {
- final NativeType type;
- final Object defaultValue;
+ public final NativeType type;
+ public final Object defaultValue;
/**
* Constructor.
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
new file mode 100644
index 0000000000..f1404eb2b8
--- /dev/null
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.catalog.CatalogService;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
+import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
+import org.apache.ignite.internal.util.subscription.ListAccumulator;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class CatalogSchemaManagerTest {
+ private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
+ private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX =
".sch-hist-latest";
+
+ private static final int TABLE_ID = 3;
+ private static final String TABLE_NAME = "t";
+
+ private static final long CAUSALITY_TOKEN_1 = 42;
+ private static final long CAUSALITY_TOKEN_2 = 45;
+
+ private static final int CATALOG_VERSION_1 = 10;
+ private static final int CATALOG_VERSION_2 = 11;
+
+ private final AtomicReference<LongFunction<CompletableFuture<?>>>
onMetastoreRevisionCompleteHolder = new AtomicReference<>();
+
+ private final Consumer<LongFunction<CompletableFuture<?>>> registry =
onMetastoreRevisionCompleteHolder::set;
+
+ @Mock
+ private CatalogService catalogService;
+
+ private VaultManager vaultManager;
+
+ private MetaStorageManager metaStorageManager;
+
+ private final SimpleInMemoryKeyValueStorage metaStorageKvStorage = new
SimpleInMemoryKeyValueStorage("test");
+
+ private CatalogSchemaManager schemaManager;
+
+ private EventListener<CatalogEventParameters> tableCreatedListener;
+ private EventListener<CatalogEventParameters> tableAlteredListener;
+
+ private final Exception cause = new Exception("Oops");
+
+ @BeforeEach
+ void setUp() {
+ vaultManager = new VaultManager(new InMemoryVaultService());
+ vaultManager.start();
+
+ metaStorageManager =
spy(StandaloneMetaStorageManager.create(vaultManager, metaStorageKvStorage));
+ metaStorageManager.start();
+
+ doAnswer(invocation -> {
+ tableCreatedListener = invocation.getArgument(1);
+ return null;
+ }).when(catalogService).listen(eq(CatalogEvent.TABLE_CREATE), any());
+
+ doAnswer(invocation -> {
+ tableAlteredListener = invocation.getArgument(1);
+ return null;
+ }).when(catalogService).listen(eq(CatalogEvent.TABLE_ALTER), any());
+
+ schemaManager = new CatalogSchemaManager(registry, catalogService,
metaStorageManager);
+ schemaManager.start();
+
+ assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ schemaManager.stop();
+ metaStorageManager.stop();
+ vaultManager.stop();
+ }
+
+ @Test
+ void savesSchemaOnTableCreation() {
+ createSomeTable();
+
+ SchemaDescriptor schemaDescriptor = getSchemaDescriptor(1);
+
+ assertThat(schemaDescriptor.version(), is(1));
+ assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1"));
+
+ Column k1 = schemaDescriptor.column("k1");
+ assertThat(k1, is(notNullValue()));
+
+ assertThat(k1.name(), is("k1"));
+ assertThat(k1.type().spec(), is(NativeTypeSpec.INT16));
+
+ Column v1 = schemaDescriptor.column("v1");
+ assertThat(v1, is(notNullValue()));
+
+ assertThat(v1.name(), is("v1"));
+ assertThat(v1.type().spec(), is(NativeTypeSpec.INT32));
+ }
+
+ private void createSomeTable() {
+ List<CatalogTableColumnDescriptor> columns = List.of(
+ new CatalogTableColumnDescriptor("k1", ColumnType.INT16,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("k2", ColumnType.STRING,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("v1", ColumnType.INT32,
false, 0, 0, 0, null)
+ );
+ CatalogTableDescriptor tableDescriptor = new
CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 1, columns, List.of("k1",
"k2"), null);
+
+ CompletableFuture<Boolean> future = tableCreatedListener()
+ .notify(new CreateTableEventParameters(CAUSALITY_TOKEN_1,
CATALOG_VERSION_1, tableDescriptor), null);
+
+ assertThat(future, willBe(false));
+
+ completeCausalityToken(CAUSALITY_TOKEN_1);
+ }
+
+ private EventListener<CatalogEventParameters> tableCreatedListener() {
+ return Objects.requireNonNull(tableCreatedListener,
"tableCreatedListener is not registered with CatalogService");
+ }
+
+ private EventListener<CatalogEventParameters> tableAlteredListener() {
+ return Objects.requireNonNull(tableAlteredListener,
"tableAlteredListener is not registered with CatalogService");
+ }
+
+ private SchemaDescriptor getSchemaDescriptor(int schemaVersion) {
+ Entry entry = metaStorageKvStorage.get(schemaWithVerHistKey(TABLE_ID,
schemaVersion).bytes());
+ assertThat(entry, is(notNullValue()));
+
+ byte[] value = entry.value();
+ assertThat(value, is(notNullValue()));
+
+ return SchemaSerializerImpl.INSTANCE.deserialize(value);
+ }
+
+ private static ByteArray schemaWithVerHistKey(int tblId, int ver) {
+ return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
+ }
+
+ @Test
+ void savesSchemaOnColumnAddition() {
+ createSomeTable();
+
+ when(catalogService.table(TABLE_ID,
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAddition());
+
+ AddColumnEventParameters event = new AddColumnEventParameters(
+ CAUSALITY_TOKEN_2,
+ CATALOG_VERSION_2,
+ TABLE_ID,
+ List.of(new CatalogTableColumnDescriptor("v2",
ColumnType.STRING, false, 0, 0, 0, null))
+ );
+
+ CompletableFuture<Boolean> future =
tableAlteredListener().notify(event, null);
+
+ assertThat(future, willBe(false));
+
+ SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
+
+ assertThat(schemaDescriptor.version(), is(2));
+ assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1",
"v2"));
+
+ Column v2 = schemaDescriptor.column("v2");
+ assertThat(v2, is(notNullValue()));
+
+ assertThat(v2.name(), is("v2"));
+ assertThat(v2.type().spec(), is(NativeTypeSpec.STRING));
+ }
+
+ private static CatalogTableDescriptor tableDescriptorAfterColumnAddition()
{
+ List<CatalogTableColumnDescriptor> columns = List.of(
+ new CatalogTableColumnDescriptor("k1", ColumnType.INT16,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("k2", ColumnType.STRING,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("v1", ColumnType.INT32,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("v2", ColumnType.STRING,
false, 0, 0, 0, null)
+ );
+
+ return new CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 2, columns,
List.of("k1", "k2"), null);
+ }
+
+ private void completeCausalityToken(long causalityToken) {
+
assertThat(onMetastoreRevisionCompleteHolder.get().apply(causalityToken),
willCompleteSuccessfully());
+ }
+
+ @Test
+ void savesSchemaOnColumnRemoval() {
+ createSomeTable();
+
+ when(catalogService.table(TABLE_ID,
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnRemoval());
+
+ DropColumnEventParameters event = new DropColumnEventParameters(
+ CAUSALITY_TOKEN_2,
+ CATALOG_VERSION_2,
+ TABLE_ID,
+ List.of("v1")
+ );
+
+ CompletableFuture<Boolean> future =
tableAlteredListener().notify(event, null);
+
+ assertThat(future, willBe(false));
+
+ SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
+
+ assertThat(schemaDescriptor.version(), is(2));
+ assertThat(schemaDescriptor.columnNames(), contains("k1", "k2"));
+ }
+
+ private static CatalogTableDescriptor tableDescriptorAfterColumnRemoval() {
+ List<CatalogTableColumnDescriptor> columns = List.of(
+ new CatalogTableColumnDescriptor("k1", ColumnType.INT16,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("k2", ColumnType.STRING,
false, 0, 0, 0, null)
+ );
+
+ return new CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 2, columns,
List.of("k1", "k2"), null);
+ }
+
+ @Test
+ void savesSchemaOnColumnAlteration() {
+ createSomeTable();
+
+ when(catalogService.table(TABLE_ID,
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAlteration());
+
+ AlterColumnEventParameters event = new AlterColumnEventParameters(
+ CAUSALITY_TOKEN_2,
+ CATALOG_VERSION_2,
+ TABLE_ID,
+ new CatalogTableColumnDescriptor("v1", ColumnType.INT64,
false, 0, 0, 0, null)
+ );
+
+ CompletableFuture<Boolean> future =
tableAlteredListener().notify(event, null);
+
+ assertThat(future, willBe(false));
+
+ SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
+
+ assertThat(schemaDescriptor.version(), is(2));
+
+ Column v1 = schemaDescriptor.column("v1");
+ assertThat(v1, is(notNullValue()));
+
+ assertThat(v1.name(), is("v1"));
+ assertThat(v1.type().spec(), is(NativeTypeSpec.INT64));
+ }
+
+ private static CatalogTableDescriptor
tableDescriptorAfterColumnAlteration() {
+ List<CatalogTableColumnDescriptor> columns = List.of(
+ new CatalogTableColumnDescriptor("k1", ColumnType.INT32,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("k2", ColumnType.STRING,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("v1", ColumnType.INT64,
false, 0, 0, 0, null)
+ );
+
+ return new CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 2, columns,
List.of("k1", "k2"), null);
+ }
+
+ @Test
+ void propagatesExceptionFromCatalogOnTableCreation() {
+ CompletableFuture<Boolean> future =
tableCreatedListener().notify(mock(CreateTableEventParameters.class), cause);
+
+ assertThat(future, willThrow(equalTo(cause)));
+ }
+
+ @Test
+ void propagatesExceptionFromCatalogOnColumnAddition() {
+ CompletableFuture<Boolean> future =
tableAlteredListener().notify(mock(AddColumnEventParameters.class), cause);
+
+ assertThat(future, willThrow(equalTo(cause)));
+ }
+
+ @Test
+ void propagatesExceptionFromCatalogOnColumnRemoval() {
+ CompletableFuture<Boolean> future =
tableAlteredListener().notify(mock(DropColumnEventParameters.class), cause);
+
+ assertThat(future, willThrow(equalTo(cause)));
+ }
+
+ @Test
+ void propagatesExceptionFromCatalogOnColumnAlteration() {
+ CompletableFuture<Boolean> future =
tableAlteredListener().notify(mock(AddColumnEventParameters.class), cause);
+
+ assertThat(future, willThrow(equalTo(cause)));
+ }
+
+ @Test
+ void latestSchemaRegistryIsUnavailableUntilSomeSchemaVersionIsProcessed() {
+ assertThat(schemaManager.schemaRegistry(TABLE_ID), is(nullValue()));
+ }
+
+ @Test
+ void latestSchemaRegistryIsAvailable() {
+ createSomeTable();
+
+ SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(TABLE_ID);
+
+ assertThat(schemaRegistry.schema().version(), is(1));
+ assertThat(schemaRegistry.schema(1).version(), is(1));
+ }
+
+ @Test
+ void schemaRegistryByCausalityTokenIsUnavailableTillTokenIsCompleted() {
+ createSomeTable();
+
+ CompletableFuture<SchemaRegistry> future =
schemaManager.schemaRegistry(CAUSALITY_TOKEN_2, TABLE_ID);
+
+ assertThat(future, willTimeoutFast());
+ }
+
+ @Test
+ void schemaRegistryByCausalityTokenIsAvailable() {
+ createSomeTable();
+
+ CompletableFuture<SchemaRegistry> future =
schemaManager.schemaRegistry(CAUSALITY_TOKEN_1, TABLE_ID);
+ assertThat(future, willCompleteSuccessfully());
+
+ SchemaRegistry schemaRegistry = future.join();
+
+ assertThat(schemaRegistry.schema().version(), is(1));
+ assertThat(schemaRegistry.schema(1).version(), is(1));
+ }
+
+ @Test
+ void previousSchemaVersionsRemainAvailable() {
+ create2TableVersions();
+
+ CompletableFuture<SchemaRegistry> future =
schemaManager.schemaRegistry(CAUSALITY_TOKEN_2, TABLE_ID);
+ assertThat(future, willCompleteSuccessfully());
+
+ SchemaRegistry schemaRegistry = future.join();
+
+ SchemaDescriptor schemaDescriptor1 = schemaRegistry.schema(1);
+ assertThat(schemaDescriptor1.version(), is(1));
+
+ SchemaDescriptor schemaDescriptor2 = schemaRegistry.schema(2);
+ assertThat(schemaDescriptor2.version(), is(2));
+ }
+
+ private void create2TableVersions() {
+ createSomeTable();
+ addSomeColumn();
+ }
+
+ private void addSomeColumn() {
+ when(catalogService.table(TABLE_ID,
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAddition());
+
+ AddColumnEventParameters event = new AddColumnEventParameters(
+ CAUSALITY_TOKEN_2,
+ CATALOG_VERSION_2,
+ TABLE_ID,
+ List.of(new CatalogTableColumnDescriptor("v2",
ColumnType.STRING, false, 0, 0, 0, null))
+ );
+
+ CompletableFuture<Boolean> future =
tableAlteredListener().notify(event, null);
+
+ assertThat(future, willBe(false));
+
+ completeCausalityToken(CAUSALITY_TOKEN_2);
+ }
+
+ @Test
+ void waitLatestSchemaReturnsLatestSchema() {
+ create2TableVersions();
+
+ SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(TABLE_ID);
+
+ SchemaDescriptor schemaDescriptor = schemaRegistry.waitLatestSchema();
+
+ assertThat(schemaDescriptor.version(), is(2));
+ }
+
+ @Test
+ void dropRegistryMakesItUnavailable() {
+ createSomeTable();
+
+ assertThat(schemaManager.dropRegistry(CAUSALITY_TOKEN_2, TABLE_ID),
willCompleteSuccessfully());
+
+ completeCausalityToken(CAUSALITY_TOKEN_2);
+
+ CompletableFuture<SchemaRegistry> future =
schemaManager.schemaRegistry(CAUSALITY_TOKEN_2, TABLE_ID);
+ assertThat(future, is(completedFuture()));
+
+ SchemaRegistry schemaRegistry = future.join();
+
+ assertThat(schemaRegistry, is(nullValue()));
+ }
+
+ @Test
+ void dropRegistryRemovesSchemasFromMetastorage() {
+ createSomeTable();
+
+ assertThat(schemaManager.dropRegistry(CAUSALITY_TOKEN_2, TABLE_ID),
willCompleteSuccessfully());
+
+ completeCausalityToken(CAUSALITY_TOKEN_2);
+
+ assertThatNoSchemasExist(TABLE_ID);
+ assertThatNoLatestSchemaVersionExists(TABLE_ID);
+ }
+
+ private void assertThatNoSchemasExist(int tableId) {
+ CompletableFuture<List<String>> schemaEntryKeysFuture = new
CompletableFuture<>();
+
+ Publisher<Entry> publisher =
metaStorageManager.prefix(ByteArray.fromString(tableId + SCHEMA_STORE_PREFIX));
+ publisher.subscribe(
+ new ListAccumulator<Entry, String>(entry -> new
String(entry.key(), UTF_8))
+ .toSubscriber(schemaEntryKeysFuture)
+ );
+
+ assertThat(schemaEntryKeysFuture, willBe(empty()));
+ }
+
+ private void assertThatNoLatestSchemaVersionExists(int tableId) {
+ CompletableFuture<Entry> future =
metaStorageManager.get(latestSchemaVersionKey(tableId));
+
+ assertThat(future, willCompleteSuccessfully());
+
+ Entry entry = future.join();
+
+ if (entry != null) {
+ assertThat(entry.value(), is(nullValue()));
+ }
+ }
+
+ private static ByteArray latestSchemaVersionKey(int tableId) {
+ return ByteArray.fromString(tableId +
LATEST_SCHEMA_VERSION_STORE_SUFFIX);
+ }
+
+ @Test
+ void loadingPreExistingSchemasWorks() throws Exception {
+ create2TableVersions();
+
+ schemaManager.stop();
+
+ when(catalogService.latestCatalogVersion()).thenReturn(2);
+
when(catalogService.tables(anyInt())).thenReturn(List.of(tableDescriptorAfterColumnAddition()));
+ doReturn(45L).when(metaStorageManager).appliedRevision();
+
+ schemaManager = new CatalogSchemaManager(registry, catalogService,
metaStorageManager);
+ schemaManager.start();
+
+ SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(TABLE_ID);
+
+ int prevSchemaVersionNotYetTouched = 1;
+
+ SchemaDescriptor schemaDescriptor =
schemaRegistry.schema(prevSchemaVersionNotYetTouched);
+ assertThat(schemaDescriptor.version(),
is(prevSchemaVersionNotYetTouched));
+ }
+}
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
new file mode 100644
index 0000000000..8b58543309
--- /dev/null
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.catalog;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.schema.AbstractSchemaConverterTest;
+import org.apache.ignite.internal.schema.BitmaskNativeType;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.DecimalNativeType;
+import
org.apache.ignite.internal.schema.DefaultValueProvider.FunctionalValueProvider;
+import org.apache.ignite.internal.schema.DefaultValueProvider.Type;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.NumberNativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TemporalNativeType;
+import org.apache.ignite.internal.schema.VarlenNativeType;
+import
org.apache.ignite.internal.schema.testutils.definition.DefaultValueGenerators;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify conversion from catalog table descriptors to schema objects.
+ */
+public class CatalogToSchemaDescriptorConverterTest extends
AbstractSchemaConverterTest {
+ private static final int TEST_LENGTH = 15;
+
+ private static final int TEST_PRECISION = 8;
+
+ private static final int TEST_SCALE = 5;
+
+ @ParameterizedTest
+ @EnumSource(NativeTypeSpec.class)
+ public void convertColumnType(NativeTypeSpec typeSpec) {
+ CatalogTableColumnDescriptor columnDescriptor =
TestColumnDescriptors.forSpec(typeSpec);
+
+ NativeType type =
CatalogToSchemaDescriptorConverter.convertType(columnDescriptor);
+
+ if (columnDescriptor.type() == ColumnType.BYTE_ARRAY) {
+ assertThat(type.spec(), is(NativeTypeSpec.BYTES));
+ } else {
+ assertThat(type.spec().name(),
equalTo(columnDescriptor.type().name()));
+ }
+
+ if (type instanceof VarlenNativeType) {
+ assertThat(((VarlenNativeType) type).length(),
equalTo(TEST_LENGTH));
+ } else if (type instanceof NumberNativeType) {
+ assertThat(((NumberNativeType) type).precision(),
equalTo(columnDescriptor.precision()));
+ } else if (type instanceof DecimalNativeType) {
+ assertThat(((DecimalNativeType) type).precision(),
equalTo(columnDescriptor.precision()));
+ assertThat(((DecimalNativeType) type).scale(),
equalTo(columnDescriptor.scale()));
+ } else if (type instanceof TemporalNativeType) {
+ assertThat(((TemporalNativeType) type).precision(),
equalTo(columnDescriptor.precision()));
+ } else if (type instanceof BitmaskNativeType) {
+ assertThat(((BitmaskNativeType) type).bits(),
equalTo(TEST_LENGTH));
+ } else {
+ assertThat("Unknown type: " + type.getClass(), type.getClass(),
equalTo(NativeType.class));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateTestArguments")
+ public void convertColumnDescriptorConstantDefault(DefaultValueArg arg) {
+ String columnName = arg.type.spec().name();
+ CatalogTableColumnDescriptor columnDescriptor =
TestColumnDescriptors.forType(arg);
+
+ Column column = CatalogToSchemaDescriptorConverter.convert(0,
columnDescriptor);
+
+ assertThat(column.name(), equalTo(columnName));
+ assertThat(column.type(), equalTo(arg.type));
+ assertThat(column.nullable(), equalTo(arg.defaultValue == null));
+ assertThat(column.defaultValueProvider().type(),
equalTo(Type.CONSTANT));
+ assertThat(column.defaultValue(), equalTo(arg.defaultValue));
+ }
+
+ @Test
+ public void convertColumnDescriptorFunctionalDefault() {
+ String columnName = "UUID";
+ String functionName = DefaultValueGenerators.GEN_RANDOM_UUID;
+ DefaultValue defaultValue = DefaultValue.functionCall(functionName);
+
+ CatalogTableColumnDescriptor columnDescriptor = new
CatalogTableColumnDescriptor(
+ NativeTypeSpec.UUID.name(),
+ NativeTypeSpec.UUID.asColumnType(),
+ false,
+ TEST_LENGTH,
+ TEST_PRECISION,
+ TEST_SCALE,
+ defaultValue
+ );
+
+ Column column = CatalogToSchemaDescriptorConverter.convert(0,
columnDescriptor);
+
+ assertThat(column.name(), equalTo(columnName));
+ assertThat(column.type(), equalTo(NativeTypes.UUID));
+ assertThat(column.defaultValueProvider().type(),
equalTo(Type.FUNCTIONAL));
+ assertThat(((FunctionalValueProvider)
column.defaultValueProvider()).name(), equalTo(functionName));
+ }
+
+ @Test
+ public void convertTableDescriptor() {
+ CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
+ 1,
+ "test",
+ 0,
+ 1,
+ List.of(
+ new CatalogTableColumnDescriptor("C1",
ColumnType.INT32, false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("K2",
ColumnType.INT32, false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("C2",
ColumnType.INT32, false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("K1",
ColumnType.INT32, false, 0, 0, 0, null)
+ ),
+ List.of("K1", "K2"),
+ List.of("K2")
+ );
+
+ SchemaDescriptor schema =
CatalogToSchemaDescriptorConverter.convert(tableDescriptor);
+
+ assertThat(schema.keyColumns().length(), equalTo(2));
+ assertThat(schema.keyColumns().column(0).name(), equalTo("K1"));
+ assertThat(schema.keyColumns().column(1).name(), equalTo("K2"));
+ assertThat(schema.valueColumns().length(), equalTo(2));
+ assertThat(schema.valueColumns().column(0).name(), equalTo("C1"));
+ assertThat(schema.valueColumns().column(1).name(), equalTo("C2"));
+ assertThat(schema.colocationColumns().length, equalTo(1));
+ assertThat(schema.colocationColumns()[0].name(), equalTo("K2"));
+ }
+
+ private static Iterable<DefaultValueArg> generateTestArguments() {
+ var paramList = new ArrayList<DefaultValueArg>();
+
+ for (var entry : DEFAULT_VALUES_TO_TEST.entrySet()) {
+ for (var defaultValue : entry.getValue()) {
+ paramList.add(
+ new DefaultValueArg(specToType(entry.getKey()),
adjust(defaultValue))
+ );
+ }
+ }
+ return paramList;
+ }
+
+ private static class TestColumnDescriptors {
+ static CatalogTableColumnDescriptor forSpec(NativeTypeSpec spec) {
+ return new CatalogTableColumnDescriptor(spec.name(),
spec.asColumnType(), false, TEST_PRECISION, TEST_SCALE, TEST_LENGTH, null);
+ }
+
+ static CatalogTableColumnDescriptor forType(DefaultValueArg arg) {
+ NativeType type = arg.type;
+
+ int length = type instanceof VarlenNativeType ?
((VarlenNativeType) type).length()
+ : type instanceof BitmaskNativeType ? ((BitmaskNativeType)
type).bits()
+ : 0;
+
+ int precision = type instanceof DecimalNativeType ?
((DecimalNativeType) type).precision()
+ : type instanceof NumberNativeType ? ((NumberNativeType)
type).precision()
+ : type instanceof TemporalNativeType ?
((TemporalNativeType) type).precision()
+ : 0;
+
+ int scale = type instanceof DecimalNativeType ?
((DecimalNativeType) type).scale() : 0;
+
+ return new CatalogTableColumnDescriptor(
+ type.spec().name(),
+ type.spec().asColumnType(),
+ arg.defaultValue == null,
+ precision,
+ scale,
+ length,
+ DefaultValue.constant(arg.defaultValue)
+ );
+ }
+ }
+}
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
index 0dd2e8c417..0807c38602 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
import java.util.ArrayList;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.schema.AbstractSchemaConverterTest;
import org.apache.ignite.internal.schema.BitmaskNativeType;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.DecimalNativeType;
diff --git
a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
index 509f510455..13e8242213 100644
---
a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
+++
b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.schema.configuration.ColumnChange;
import org.apache.ignite.internal.schema.configuration.ColumnTypeChange;
import org.apache.ignite.internal.schema.configuration.ColumnTypeView;
import org.apache.ignite.internal.schema.configuration.ColumnView;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.PrimaryKeyView;
import org.apache.ignite.internal.schema.configuration.TableChange;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -412,6 +413,8 @@ public class SchemaConfigurationConverter {
}
});
+ ((ExtendedTableChange) tblChg).changeSchemaId(1);
+
tblChg.changePrimaryKey(pkCng ->
pkCng.changeColumns(tbl.keyColumns().toArray(String[]::new))
.changeColocationColumns(tbl.colocationColumns().toArray(String[]::new)));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
index aecb8d8eaa..18b64e3644 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
@@ -574,7 +574,15 @@ public class CatalogSqlSchemaManagerTest {
columnDescriptors.add(newCol);
}
- return new CatalogTableDescriptor(id, name, zoneId,
columnDescriptors, primaryKey, colocationKey);
+ return new CatalogTableDescriptor(
+ id,
+ name,
+ zoneId,
+ CatalogTableDescriptor.INITIAL_TABLE_VERSION,
+ columnDescriptors,
+ primaryKey,
+ colocationKey
+ );
}
}