This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 09116a30f7 IGNITE-21205 Get rid of metastorage in schema manager (#3013) 09116a30f7 is described below commit 09116a30f7b5c92e50375d5b588821d4ec1b705a Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Wed Jan 10 09:44:10 2024 +0300 IGNITE-21205 Get rid of metastorage in schema manager (#3013) --- .../catalog/commands/CreateTableCommand.java | 6 +- .../descriptors/CatalogTableDescriptor.java | 63 +++++- .../descriptors/CatalogTableSchemaVersions.java | 99 +++++++++ .../internal/catalog/storage/AlterColumnEntry.java | 14 +- .../internal/catalog/storage/DropColumnsEntry.java | 11 +- .../internal/catalog/storage/NewColumnsEntry.java | 11 +- .../MakeIndexAvailableCommandValidationTest.java | 6 +- .../handler/requests/jdbc/JdbcMetadataCatalog.java | 2 +- .../ignite/client/handler/FakeCatalogService.java | 2 +- .../RebalanceUtilUpdateAssignmentsTest.java | 6 +- .../ignite/internal/schema/SchemaManager.java | 154 ++++---------- .../apache/ignite/internal/schema/SchemaUtils.java | 2 +- .../CatalogToSchemaDescriptorConverter.java | 14 +- .../ignite/internal/schema/SchemaManagerTest.java | 225 +-------------------- .../CatalogToSchemaDescriptorConverterTest.java | 8 +- .../storage/AbstractMvTableStorageTest.java | 6 +- .../storage/index/AbstractIndexStorageTest.java | 6 +- .../replication/PartitionReplicaListenerTest.java | 7 +- .../schema/CatalogValidationSchemasSourceTest.java | 12 +- 19 files changed, 244 insertions(+), 410 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java index f423f11578..22e8c90b61 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.catalog.commands; import static java.util.Objects.requireNonNullElse; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.ensureNoTableIndexOrSysViewExistsWithGivenName; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow; @@ -111,12 +110,9 @@ public class CreateTableCommand extends AbstractTableCommand { pkIndexId, tableName, zone.id(), - CatalogTableDescriptor.INITIAL_TABLE_VERSION, columns.stream().map(CatalogUtils::fromParams).collect(toList()), primaryKeyColumns, - colocationColumns, - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN + colocationColumns ); String indexName = pkIndexName(tableName); diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java index 89edf8bb98..b9157c2650 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion; import org.apache.ignite.internal.tostring.IgniteToStringExclude; import org.apache.ignite.internal.tostring.S; import org.jetbrains.annotations.Nullable; @@ -45,7 +46,7 @@ public class CatalogTableDescriptor extends CatalogObjectDescriptor { private final int pkIndexId; - private final int tableVersion; + private final CatalogTableSchemaVersions schemaVersions; private final List<CatalogTableColumnDescriptor> columns; private final List<String> primaryKeyColumns; @@ -57,28 +58,51 @@ public class CatalogTableDescriptor extends CatalogObjectDescriptor { private long creationToken; /** - * Constructor. + * Constructor for new table. + * + * @param id Table id. + * @param pkIndexId Primary key index id. + * @param name Table name. + * @param zoneId Distribution zone ID. + * @param columns Table column descriptors. + * @param pkCols Primary key column names. + */ + public CatalogTableDescriptor( + int id, + int schemaId, + int pkIndexId, + String name, + int zoneId, + List<CatalogTableColumnDescriptor> columns, + List<String> pkCols, + @Nullable List<String> colocationCols + ) { + this(id, schemaId, pkIndexId, name, zoneId, columns, pkCols, colocationCols, + new CatalogTableSchemaVersions(new TableVersion(columns)), INITIAL_CAUSALITY_TOKEN, INITIAL_CAUSALITY_TOKEN); + } + + /** + * Internal constructor. * * @param id Table id. * @param pkIndexId Primary key index id. * @param name Table name. * @param zoneId Distribution zone ID. - * @param tableVersion Version of the table. * @param columns Table column descriptors. * @param pkCols Primary key column names. * @param causalityToken Token of the update of the descriptor. * @param creationToken Token of the creation of the table descriptor. */ - public CatalogTableDescriptor( + private CatalogTableDescriptor( int id, int schemaId, int pkIndexId, String name, int zoneId, - int tableVersion, List<CatalogTableColumnDescriptor> columns, List<String> pkCols, @Nullable List<String> colocationCols, + CatalogTableSchemaVersions schemaVersions, long causalityToken, long creationToken ) { @@ -87,13 +111,14 @@ public class CatalogTableDescriptor extends CatalogObjectDescriptor { this.schemaId = schemaId; this.pkIndexId = pkIndexId; this.zoneId = zoneId; - this.tableVersion = tableVersion; this.columns = Objects.requireNonNull(columns, "No columns defined."); primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns."); colocationColumns = colocationCols == null ? pkCols : colocationCols; this.columnsMap = columns.stream().collect(Collectors.toMap(CatalogTableColumnDescriptor::name, Function.identity())); + this.schemaVersions = schemaVersions; + this.creationToken = creationToken; // TODO: IGNITE-19082 Throw proper exceptions. @@ -103,6 +128,26 @@ public class CatalogTableDescriptor extends CatalogObjectDescriptor { assert Set.copyOf(primaryKeyColumns).containsAll(colocationColumns); } + /** + * Creates new table descriptor, using existing one as a template. + */ + public CatalogTableDescriptor newDescriptor( + String name, + int tableVersion, + List<CatalogTableColumnDescriptor> columns, + long causalityToken + ) { + CatalogTableSchemaVersions newSchemaVersions = tableVersion == schemaVersions.latestVersion() + ? schemaVersions + : schemaVersions.append(new TableVersion(columns), tableVersion); + + return new CatalogTableDescriptor( + id(), schemaId, pkIndexId, name, zoneId, columns, primaryKeyColumns, colocationColumns, + newSchemaVersions, + causalityToken, creationToken + ); + } + /** * Returns column descriptor for column with given name. */ @@ -114,6 +159,10 @@ public class CatalogTableDescriptor extends CatalogObjectDescriptor { return schemaId; } + public CatalogTableSchemaVersions schemaVersions() { + return schemaVersions; + } + public int zoneId() { return zoneId; } @@ -123,7 +172,7 @@ public class CatalogTableDescriptor extends CatalogObjectDescriptor { } public int tableVersion() { - return tableVersion; + return schemaVersions.latestVersion(); } public List<String> primaryKeyColumns() { diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableSchemaVersions.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableSchemaVersions.java new file mode 100644 index 0000000000..dafb831d69 --- /dev/null +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableSchemaVersions.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.catalog.descriptors; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.ignite.internal.util.ArrayUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Class that holds a list of table version descriptors. + */ +public class CatalogTableSchemaVersions implements Serializable { + private static final long serialVersionUID = 4353352473287209850L; + + /** + * Descriptor of a single table version. + */ + public static class TableVersion implements Serializable { + private static final long serialVersionUID = -5185852983239967322L; + + private final List<CatalogTableColumnDescriptor> columns; + + TableVersion(List<CatalogTableColumnDescriptor> columns) { + this.columns = columns; + } + + public List<CatalogTableColumnDescriptor> columns() { + return Collections.unmodifiableList(columns); + } + } + + private final int base; + private final TableVersion[] versions; + + /** + * Constructor. + * + * @param versions Array of table versions. + */ + CatalogTableSchemaVersions(TableVersion... versions) { + this(CatalogTableDescriptor.INITIAL_TABLE_VERSION, versions); + } + + private CatalogTableSchemaVersions(int base, TableVersion... versions) { + this.base = base; + this.versions = versions; + } + + /** + * Returns earliest known table version. + */ + public int earliestVersion() { + return base; + } + + /** + * Returns latest known table version. + */ + public int latestVersion() { + return base + versions.length - 1; + } + + /** + * Returns an existing table version, or {@code null} if it's not found. + */ + public @Nullable TableVersion get(int version) { + if (version < base || version >= base + versions.length) { + return null; + } + + return versions[version - base]; + } + + /** + * Creates a new instance of {@link CatalogTableSchemaVersions} with one new version appended. + */ + public CatalogTableSchemaVersions append(TableVersion tableVersion, int version) { + assert version == latestVersion() + 1; + + return new CatalogTableSchemaVersions(base, ArrayUtils.concat(versions, tableVersion)); + } +} diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java index 615387953b..75bce1b9ee 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java @@ -89,23 +89,13 @@ public class AlterColumnEntry implements UpdateEntry, Fireable { schema.id(), schema.name(), Arrays.stream(schema.tables()) - .map(table -> table.id() != tableId - ? table - : new CatalogTableDescriptor( - table.id(), - table.schemaId(), - table.primaryKeyIndexId(), + .map(table -> table.id() == tableId ? table.newDescriptor( table.name(), - table.zoneId(), table.tableVersion() + 1, table.columns().stream() .map(source -> source.name().equals(column.name()) ? column : source) .collect(toList()), - table.primaryKeyColumns(), - table.colocationColumns(), - causalityToken, - table.creationToken() - ) + causalityToken) : table ) .toArray(CatalogTableDescriptor[]::new), schema.indexes(), diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java index 2a481153d1..682cb66766 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java @@ -87,20 +87,13 @@ public class DropColumnsEntry implements UpdateEntry, Fireable { schema.id(), schema.name(), Arrays.stream(schema.tables()) - .map(table -> table.id() == tableId ? new CatalogTableDescriptor( - table.id(), - table.schemaId(), - table.primaryKeyIndexId(), + .map(table -> table.id() == tableId ? table.newDescriptor( table.name(), - table.zoneId(), table.tableVersion() + 1, table.columns().stream() .filter(col -> !columns.contains(col.name())) .collect(toList()), - table.primaryKeyColumns(), - table.colocationColumns(), - causalityToken, - table.creationToken()) : table + causalityToken) : table ) .toArray(CatalogTableDescriptor[]::new), schema.indexes(), diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java index 0230374f8f..c79e855c00 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java @@ -86,18 +86,11 @@ public class NewColumnsEntry implements UpdateEntry, Fireable { schema.id(), schema.name(), Arrays.stream(schema.tables()) - .map(table -> table.id() == tableId ? new CatalogTableDescriptor( - table.id(), - table.schemaId(), - table.primaryKeyIndexId(), + .map(table -> table.id() == tableId ? table.newDescriptor( table.name(), - table.zoneId(), table.tableVersion() + 1, CollectionUtils.concat(table.columns(), descriptors), - table.primaryKeyColumns(), - table.colocationColumns(), - causalityToken, - table.creationToken()) : table + causalityToken) : table ) .toArray(CatalogTableDescriptor[]::new), schema.indexes(), diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java index 0914fdec66..494e41ab76 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.catalog.commands; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE; -import static org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.sql.ColumnType.INT32; @@ -92,12 +91,9 @@ public class MakeIndexAvailableCommandValidationTest extends AbstractCommandVali pkIndexId, "TEST_TABLE", zoneId, - INITIAL_TABLE_VERSION, List.of(tableColumn(columnName)), List.of(columnName), - null, - 1, - 1 + null ); } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java index a78eb71316..f654274307 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java @@ -173,7 +173,7 @@ public class JdbcMetadataCatalog { .filter(t -> tableNameAndSchemaMatches(t, schemaNameRegex, tlbNameRegex)) .flatMap( tbl -> { - SchemaDescriptor schema = CatalogToSchemaDescriptorConverter.convert(tbl); + SchemaDescriptor schema = CatalogToSchemaDescriptorConverter.convert(tbl, tbl.tableVersion()); return Stream.concat(Arrays.stream(schema.keyColumns().columns()), Arrays.stream(schema.valueColumns().columns())) .map(column -> new Pair<>(tbl.name(), column)); diff --git a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java index e27f13307f..5301f19c77 100644 --- a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java +++ b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java @@ -56,7 +56,7 @@ public class FakeCatalogService implements CatalogService { @Override public CatalogTableDescriptor table(int tableId, long timestamp) { return new CatalogTableDescriptor( - tableId, 0, 0, "table", 0, 0, List.of(mock(CatalogTableColumnDescriptor.class)), List.of(), null, 0, 0); + tableId, 0, 0, "table", 0, List.of(mock(CatalogTableColumnDescriptor.class)), List.of(), null); } @Override diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java index 2b920568b1..8ca3b6b5e5 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.distributionzones.rebalance; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition; -import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.util.ByteUtils.toBytes; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -91,12 +90,9 @@ public class RebalanceUtilUpdateAssignmentsTest extends IgniteAbstractTest { -1, "table1", 0, - 1, List.of(new CatalogTableColumnDescriptor("k1", ColumnType.INT32, false, 0, 0, 0, null)), List.of("k1"), - null, - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN + null ); private static final int partNum = 2; diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java index 555a80d36a..10164d7ccf 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java @@ -17,45 +17,35 @@ package org.apache.ignite.internal.schema; -import static java.util.Collections.emptyList; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; -import static java.util.stream.Collectors.toSet; -import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; -import static org.apache.ignite.internal.metastorage.dsl.Operations.put; -import static org.apache.ignite.internal.util.ByteUtils.intToBytes; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.LongFunction; -import java.util.stream.IntStream; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; import org.apache.ignite.internal.catalog.events.TableEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; -import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.manager.IgniteComponent; -import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; -import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl; +import org.apache.ignite.internal.schema.catalog.CatalogToSchemaDescriptorConverter; import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl; -import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.lang.IgniteException; @@ -65,10 +55,6 @@ import org.jetbrains.annotations.Nullable; * This class services management of table schemas. */ public class SchemaManager implements IgniteComponent { - /** Schema history key predicate part. */ - private static final String SCHEMA_STORE_PREFIX = ".sch-hist."; - private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX = ".sch-hist-latest"; - /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); @@ -103,14 +89,35 @@ public class SchemaManager implements IgniteComponent { } private void registerExistingTables() { - // TODO: IGNITE-20051 - add proper recovery (consider tables that are removed now; take token and catalog version - // exactly matching the tables). - long causalityToken = metastorageMgr.appliedRevision(); - int catalogVersion = catalogService.latestCatalogVersion(); - for (CatalogTableDescriptor tableDescriptor : catalogService.tables(catalogVersion)) { - onTableCreated(new CreateTableEventParameters(causalityToken, catalogVersion, tableDescriptor), null); + for (int catalogVer = catalogService.latestCatalogVersion(); catalogVer >= catalogService.earliestCatalogVersion(); catalogVer--) { + Collection<CatalogTableDescriptor> tables = catalogService.tables(catalogVer); + + registriesVv.update(causalityToken, (registries, throwable) -> { + for (CatalogTableDescriptor tableDescriptor : tables) { + int tableId = tableDescriptor.id(); + + if (registries.containsKey(tableId)) { + continue; + } + + SchemaDescriptor prevSchema = null; + CatalogTableSchemaVersions schemaVersions = tableDescriptor.schemaVersions(); + for (int tableVer = schemaVersions.earliestVersion(); tableVer <= schemaVersions.latestVersion(); tableVer++) { + SchemaDescriptor newSchema = CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer); + + if (prevSchema != null) { + newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema)); + } + + prevSchema = newSchema; + registries = registerSchema(registries, tableId, newSchema); + } + } + + return completedFuture(registries); + }); } registriesVv.complete(causalityToken); @@ -169,13 +176,12 @@ public class SchemaManager implements IgniteComponent { return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> { if (e != null) { - return failedFuture(new IgniteInternalException(IgniteStringFormatter.format( + return failedFuture(new IgniteInternalException(format( "Cannot create a schema for the table [tblId={}, ver={}]", tableId, newSchemaVersion), e) ); } - return saveSchemaDescriptor(tableId, newSchema) - .thenApply(t -> registerSchema(registries, tableId, newSchema)); + return completedFuture(registerSchema(registries, tableId, newSchema)); })).thenApply(ignored -> false); } finally { busyLock.leaveBusy(); @@ -208,38 +214,21 @@ public class SchemaManager implements IgniteComponent { * @return Schema representation. */ private SchemaDescriptor loadSchemaDescriptor(int tblId, int ver) { - Entry entry = metastorageMgr.getLocally(schemaWithVerHistKey(tblId, ver), Long.MAX_VALUE); + int catalogVersion = catalogService.latestCatalogVersion(); - assert !entry.tombstone() : "Table " + tblId + ", version " + ver; + while (catalogVersion >= catalogService.earliestCatalogVersion()) { + CatalogTableDescriptor tableDescriptor = catalogService.table(tblId, catalogVersion); - byte[] value = entry.value(); + if (tableDescriptor == null) { + catalogVersion--; - assert value != null; + continue; + } - return SchemaSerializerImpl.INSTANCE.deserialize(value); - } + return CatalogToSchemaDescriptorConverter.convert(tableDescriptor, ver); + } - /** - * Saves a schema in the MetaStorage. - * - * @param tableId Table id. - * @param schema Schema descriptor. - * @return Future that will be completed when the schema gets saved. - */ - private CompletableFuture<Void> saveSchemaDescriptor(int tableId, SchemaDescriptor schema) { - ByteArray schemaKey = schemaWithVerHistKey(tableId, schema.version()); - ByteArray latestSchemaVersionKey = latestSchemaVersionKey(tableId); - - byte[] serializedSchema = SchemaSerializerImpl.INSTANCE.serialize(schema); - - return metastorageMgr.invoke( - notExists(schemaKey), - List.of( - put(schemaKey, serializedSchema), - put(latestSchemaVersionKey, intToBytes(schema.version())) - ), - emptyList() - ).thenApply(unused -> null); + throw new AssertionError(format("Schema descriptor is not found [tableId={}, schemaId={}]", tblId, ver)); } /** @@ -340,16 +329,10 @@ public class SchemaManager implements IgniteComponent { * @param tableId Table id. */ public CompletableFuture<?> dropRegistry(long causalityToken, int tableId) { - return removeRegistry(causalityToken, tableId).thenCompose(unused -> { - return destroySchemas(tableId); - }); - } - - private CompletableFuture<?> removeRegistry(long causalityToken, int tableId) { return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> { if (e != null) { return failedFuture(new IgniteInternalException( - IgniteStringFormatter.format("Cannot remove a schema registry for the table [tblId={}]", tableId), e)); + format("Cannot remove a schema registry for the table [tblId={}]", tableId), e)); } Map<Integer, SchemaRegistryImpl> regs = new HashMap<>(registries); @@ -361,23 +344,6 @@ public class SchemaManager implements IgniteComponent { })); } - private CompletableFuture<?> destroySchemas(int tableId) { - return latestSchemaVersion(tableId) - .thenCompose(latestVersion -> { - if (latestVersion == null) { - // Nothing to remove. - return nullCompletedFuture(); - } - - Set<ByteArray> keysToRemove = IntStream.rangeClosed(CatalogTableDescriptor.INITIAL_TABLE_VERSION, latestVersion) - .mapToObj(version -> schemaWithVerHistKey(tableId, version)) - .collect(toSet()); - keysToRemove.add(latestSchemaVersionKey(tableId)); - - return metastorageMgr.removeAll(keysToRemove); - }); - } - @Override public void stop() throws Exception { if (!stopGuard.compareAndSet(false, true)) { @@ -389,36 +355,4 @@ public class SchemaManager implements IgniteComponent { //noinspection ConstantConditions IgniteUtils.closeAllManually(registriesVv.latest().values()); } - - /** - * Gets the latest version of the table schema which is available in Metastore or {@code null} if nothing is available. - * - * @param tableId Table id. - * @return The latest schema version or {@code null} if nothing is available. - */ - private CompletableFuture<Integer> latestSchemaVersion(int tableId) { - return metastorageMgr.get(latestSchemaVersionKey(tableId)) - .thenApply(entry -> { - if (entry == null || entry.value() == null) { - return null; - } else { - return ByteUtils.bytesToInt(entry.value()); - } - }); - } - - /** - * Forms schema history key. - * - * @param tblId Table id. - * @param ver Schema version. - * @return {@link ByteArray} representation. - */ - private static ByteArray schemaWithVerHistKey(int tblId, int ver) { - return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver); - } - - private static ByteArray latestSchemaVersionKey(int tableId) { - return ByteArray.fromString(tableId + LATEST_SCHEMA_VERSION_STORE_SUFFIX); - } } diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java index 72bfa76cb3..da9193349e 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java @@ -35,7 +35,7 @@ public class SchemaUtils { * @return Schema descriptor. */ public static SchemaDescriptor prepareSchemaDescriptor(CatalogTableDescriptor tableDescriptor) { - return CatalogToSchemaDescriptorConverter.convert(tableDescriptor); + return CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableDescriptor.tableVersion()); } /** diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java index 588baebb62..a215c0fab1 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.schema.catalog; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,6 +30,7 @@ import org.apache.ignite.internal.catalog.commands.DefaultValue.FunctionCall; import org.apache.ignite.internal.catalog.commands.DefaultValue.Type; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.DefaultValueGenerator; import org.apache.ignite.internal.schema.DefaultValueProvider; @@ -149,7 +152,7 @@ public final class CatalogToSchemaDescriptorConverter { * @param tableDescriptor Descriptor to convert. * @return A {@link SchemaDescriptor} object representing the table descriptor. */ - public static SchemaDescriptor convert(CatalogTableDescriptor tableDescriptor) { + public static SchemaDescriptor convert(CatalogTableDescriptor tableDescriptor, int tableVersion) { Set<String> keyColumnsNames = Set.copyOf(tableDescriptor.primaryKeyColumns()); List<Column> keyCols = new ArrayList<>(keyColumnsNames.size()); @@ -157,7 +160,12 @@ public final class CatalogToSchemaDescriptorConverter { int idx = 0; - for (CatalogTableColumnDescriptor column : tableDescriptor.columns()) { + TableVersion tableVersionInstance = tableDescriptor.schemaVersions().get(tableVersion); + + assert tableVersionInstance != null + : format("Cannot find table version {} in table descriptor {}", tableVersion, tableDescriptor); + + for (CatalogTableColumnDescriptor column : tableVersionInstance.columns()) { if (keyColumnsNames.contains(column.name())) { keyCols.add(convert(idx, column)); } else { @@ -168,7 +176,7 @@ public final class CatalogToSchemaDescriptorConverter { } return new SchemaDescriptor( - tableDescriptor.tableVersion(), + tableVersion, keyCols.toArray(Column[]::new), tableDescriptor.colocationColumns().toArray(String[]::new), valCols.toArray(Column[]::new) diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java index cddac3f6a1..aa769bd006 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java @@ -17,20 +17,16 @@ package org.apache.ignite.internal.schema; - -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; +import static org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -44,7 +40,6 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Flow.Publisher; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongFunction; @@ -52,21 +47,15 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.events.AddColumnEventParameters; -import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters; import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; import org.apache.ignite.internal.catalog.events.DropColumnEventParameters; import org.apache.ignite.internal.event.EventListener; -import org.apache.ignite.internal.lang.ByteArray; -import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; -import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.apache.ignite.internal.type.NativeTypeSpec; -import org.apache.ignite.internal.util.subscription.ListAccumulator; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; import org.apache.ignite.sql.ColumnType; @@ -82,9 +71,6 @@ import org.mockito.quality.Strictness; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) class SchemaManagerTest extends BaseIgniteAbstractTest { - private static final String SCHEMA_STORE_PREFIX = ".sch-hist."; - private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX = ".sch-hist-latest"; - private static final int TABLE_ID = 3; private static final String TABLE_NAME = "t"; @@ -145,28 +131,6 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { vaultManager.stop(); } - @Test - void savesSchemaOnTableCreation() { - createSomeTable(); - - SchemaDescriptor schemaDescriptor = getSchemaDescriptor(1); - - assertThat(schemaDescriptor.version(), is(1)); - assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1")); - - Column k1 = schemaDescriptor.column("k1"); - assertThat(k1, is(notNullValue())); - - assertThat(k1.name(), is("k1")); - assertThat(k1.type().spec(), is(NativeTypeSpec.INT16)); - - Column v1 = schemaDescriptor.column("v1"); - assertThat(v1, is(notNullValue())); - - assertThat(v1.name(), is("v1")); - assertThat(v1.type().spec(), is(NativeTypeSpec.INT32)); - } - private void createSomeTable() { List<CatalogTableColumnDescriptor> columns = List.of( new CatalogTableColumnDescriptor("k1", ColumnType.INT16, false, 0, 0, 0, null), @@ -174,7 +138,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { new CatalogTableColumnDescriptor("v1", ColumnType.INT32, false, 0, 0, 0, null) ); CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor( - TABLE_ID, -1, -1, TABLE_NAME, 0, 1, columns, List.of("k1", "k2"), null, INITIAL_CAUSALITY_TOKEN, INITIAL_CAUSALITY_TOKEN + TABLE_ID, -1, -1, TABLE_NAME, 0, columns, List.of("k1", "k2"), null ); CompletableFuture<Boolean> future = tableCreatedListener() @@ -193,49 +157,6 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { return Objects.requireNonNull(tableAlteredListener, "tableAlteredListener is not registered with CatalogService"); } - private SchemaDescriptor getSchemaDescriptor(int schemaVersion) { - Entry entry = metaStorageKvStorage.get(schemaWithVerHistKey(TABLE_ID, schemaVersion).bytes()); - assertThat(entry, is(notNullValue())); - - byte[] value = entry.value(); - assertThat(value, is(notNullValue())); - - return SchemaSerializerImpl.INSTANCE.deserialize(value); - } - - private static ByteArray schemaWithVerHistKey(int tblId, int ver) { - return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver); - } - - @Test - void savesSchemaOnColumnAddition() { - createSomeTable(); - - when(catalogService.table(TABLE_ID, CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAddition()); - - AddColumnEventParameters event = new AddColumnEventParameters( - CAUSALITY_TOKEN_2, - CATALOG_VERSION_2, - TABLE_ID, - List.of(new CatalogTableColumnDescriptor("v2", ColumnType.STRING, false, 0, 0, 0, null)) - ); - - CompletableFuture<Boolean> future = tableAlteredListener().notify(event, null); - - assertThat(future, willBe(false)); - - SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2); - - assertThat(schemaDescriptor.version(), is(2)); - assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1", "v2")); - - Column v2 = schemaDescriptor.column("v2"); - assertThat(v2, is(notNullValue())); - - assertThat(v2.name(), is("v2")); - assertThat(v2.type().spec(), is(NativeTypeSpec.STRING)); - } - private static CatalogTableDescriptor tableDescriptorAfterColumnAddition() { List<CatalogTableColumnDescriptor> columns = List.of( new CatalogTableColumnDescriptor("k1", ColumnType.INT16, false, 0, 0, 0, null), @@ -250,111 +171,19 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { -1, TABLE_NAME, 0, - 2, columns, List.of("k1", "k2"), - null, - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN - ); - } - - private void completeCausalityToken(long causalityToken) { - assertThat(onMetastoreRevisionCompleteHolder.get().apply(causalityToken), willCompleteSuccessfully()); - } - - @Test - void savesSchemaOnColumnRemoval() { - createSomeTable(); - - when(catalogService.table(TABLE_ID, CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnRemoval()); - - DropColumnEventParameters event = new DropColumnEventParameters( - CAUSALITY_TOKEN_2, - CATALOG_VERSION_2, - TABLE_ID, - List.of("v1") - ); - - CompletableFuture<Boolean> future = tableAlteredListener().notify(event, null); - - assertThat(future, willBe(false)); - - SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2); - - assertThat(schemaDescriptor.version(), is(2)); - assertThat(schemaDescriptor.columnNames(), contains("k1", "k2")); - } - - private static CatalogTableDescriptor tableDescriptorAfterColumnRemoval() { - List<CatalogTableColumnDescriptor> columns = List.of( - new CatalogTableColumnDescriptor("k1", ColumnType.INT16, false, 0, 0, 0, null), - new CatalogTableColumnDescriptor("k2", ColumnType.STRING, false, 0, 0, 0, null) - ); - - return new CatalogTableDescriptor( - TABLE_ID, - -1, - -1, + null + ).newDescriptor( TABLE_NAME, - 0, - 2, + INITIAL_TABLE_VERSION + 1, columns, - List.of("k1", "k2"), - null, - INITIAL_CAUSALITY_TOKEN, INITIAL_CAUSALITY_TOKEN ); } - @Test - void savesSchemaOnColumnAlteration() { - createSomeTable(); - - when(catalogService.table(TABLE_ID, CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAlteration()); - - AlterColumnEventParameters event = new AlterColumnEventParameters( - CAUSALITY_TOKEN_2, - CATALOG_VERSION_2, - TABLE_ID, - new CatalogTableColumnDescriptor("v1", ColumnType.INT64, false, 0, 0, 0, null) - ); - - CompletableFuture<Boolean> future = tableAlteredListener().notify(event, null); - - assertThat(future, willBe(false)); - - SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2); - - assertThat(schemaDescriptor.version(), is(2)); - - Column v1 = schemaDescriptor.column("v1"); - assertThat(v1, is(notNullValue())); - - assertThat(v1.name(), is("v1")); - assertThat(v1.type().spec(), is(NativeTypeSpec.INT64)); - } - - private static CatalogTableDescriptor tableDescriptorAfterColumnAlteration() { - List<CatalogTableColumnDescriptor> columns = List.of( - new CatalogTableColumnDescriptor("k1", ColumnType.INT32, false, 0, 0, 0, null), - new CatalogTableColumnDescriptor("k2", ColumnType.STRING, false, 0, 0, 0, null), - new CatalogTableColumnDescriptor("v1", ColumnType.INT64, false, 0, 0, 0, null) - ); - - return new CatalogTableDescriptor( - TABLE_ID, - -1, - -1, - TABLE_NAME, - 0, - 2, - columns, - List.of("k1", "k2"), - null, - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN - ); + private void completeCausalityToken(long causalityToken) { + assertThat(onMetastoreRevisionCompleteHolder.get().apply(causalityToken), willCompleteSuccessfully()); } @Test @@ -476,46 +305,6 @@ class SchemaManagerTest extends BaseIgniteAbstractTest { assertThat(schemaRegistry, is(nullValue())); } - @Test - void dropRegistryRemovesSchemasFromMetastorage() { - createSomeTable(); - - assertThat(schemaManager.dropRegistry(CAUSALITY_TOKEN_2, TABLE_ID), willCompleteSuccessfully()); - - completeCausalityToken(CAUSALITY_TOKEN_2); - - assertThatNoSchemasExist(TABLE_ID); - assertThatNoLatestSchemaVersionExists(TABLE_ID); - } - - private void assertThatNoSchemasExist(int tableId) { - CompletableFuture<List<String>> schemaEntryKeysFuture = new CompletableFuture<>(); - - Publisher<Entry> publisher = metaStorageManager.prefix(ByteArray.fromString(tableId + SCHEMA_STORE_PREFIX)); - publisher.subscribe( - new ListAccumulator<Entry, String>(entry -> new String(entry.key(), UTF_8)) - .toSubscriber(schemaEntryKeysFuture) - ); - - assertThat(schemaEntryKeysFuture, willBe(empty())); - } - - private void assertThatNoLatestSchemaVersionExists(int tableId) { - CompletableFuture<Entry> future = metaStorageManager.get(latestSchemaVersionKey(tableId)); - - assertThat(future, willCompleteSuccessfully()); - - Entry entry = future.join(); - - if (entry != null) { - assertThat(entry.value(), is(nullValue())); - } - } - - private static ByteArray latestSchemaVersionKey(int tableId) { - return ByteArray.fromString(tableId + LATEST_SCHEMA_VERSION_STORE_SUFFIX); - } - @Test void loadingPreExistingSchemasWorks() throws Exception { create2TableVersions(); diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java index 5f7a2499f0..b32ada814c 100644 --- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java +++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.schema.catalog; -import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.schema.SchemaTestUtils.specToType; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -134,7 +133,6 @@ public class CatalogToSchemaDescriptorConverterTest extends AbstractSchemaConver -1, "test", 0, - 1, List.of( new CatalogTableColumnDescriptor("C1", ColumnType.INT32, false, 0, 0, 0, null), new CatalogTableColumnDescriptor("K2", ColumnType.INT32, false, 0, 0, 0, null), @@ -142,12 +140,10 @@ public class CatalogToSchemaDescriptorConverterTest extends AbstractSchemaConver new CatalogTableColumnDescriptor("K1", ColumnType.INT32, false, 0, 0, 0, null) ), List.of("K1", "K2"), - List.of("K2"), - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN + List.of("K2") ); - SchemaDescriptor schema = CatalogToSchemaDescriptorConverter.convert(tableDescriptor); + SchemaDescriptor schema = CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableDescriptor.tableVersion()); assertThat(schema.keyColumns().length(), equalTo(2)); assertThat(schema.keyColumns().column(0).name(), equalTo("K1")); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java index 1ea34a901a..3b9c3bfe52 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST; import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow; import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS; @@ -775,7 +774,6 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { hashIndexId, TABLE_NAME, zoneId, - 1, List.of( CatalogUtils.fromParams(ColumnParams.builder().name("INTKEY").type(INT32).build()), CatalogUtils.fromParams(ColumnParams.builder().name("STRKEY").length(100).type(STRING).build()), @@ -783,9 +781,7 @@ public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest { CatalogUtils.fromParams(ColumnParams.builder().name("STRVAL").length(100).type(STRING).build()) ), List.of("INTKEY"), - null, - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN + null ); CatalogSortedIndexDescriptor sortedIndex = new CatalogSortedIndexDescriptor( diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java index 23a95a62d9..76ef74662b 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage.index; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; -import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.storage.BaseMvStoragesTest.getOrCreateMvPartition; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -158,12 +157,9 @@ public abstract class AbstractIndexStorageTest<S extends IndexStorage, D extends 1, TABLE_NAME, zoneId, - 1, Stream.concat(Stream.of(pkColumn), ALL_TYPES_COLUMN_PARAMS.stream()).map(CatalogUtils::fromParams).collect(toList()), List.of("pk"), - null, - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN + null ); when(catalogService.table(eq(TABLE_NAME), anyLong())).thenReturn(tableDescriptor); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 6cbf88d8c4..58c24ba8a9 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.table.distributed.replication; import static java.util.Collections.singletonList; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow; import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast; @@ -333,7 +332,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { private KvMarshaller<TestKey, TestValue> kvMarshallerVersion2; private final CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor( - TABLE_ID, 1, 2, "table", 1, CURRENT_SCHEMA_VERSION, + TABLE_ID, 1, 2, "table", 1, List.of( new CatalogTableColumnDescriptor("intKey", ColumnType.INT32, false, 0, 0, 0, null), new CatalogTableColumnDescriptor("strKey", ColumnType.STRING, false, 0, 0, 0, null), @@ -341,9 +340,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { new CatalogTableColumnDescriptor("strVal", ColumnType.STRING, false, 0, 0, 0, null) ), List.of("intKey", "strKey"), - null, - INITIAL_CAUSALITY_TOKEN, - INITIAL_CAUSALITY_TOKEN + null ); /** Partition replication listener to test. */ diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java index 48847f641f..5369b569dd 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table.distributed.schema; -import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; @@ -35,6 +34,7 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; @@ -113,9 +113,15 @@ class CatalogValidationSchemasSourceTest extends BaseIgniteAbstractTest { new CatalogTableColumnDescriptor("v1", ColumnType.INT32, false, 0, 0, 0, null) ); - return new CatalogTableDescriptor( - tableId, -1, -1, "test", 0, tableVersion, columns, List.of("k1"), null, INITIAL_CAUSALITY_TOKEN, INITIAL_CAUSALITY_TOKEN + CatalogTableDescriptor descriptor = new CatalogTableDescriptor( + tableId, -1, -1, "test", 0, columns, List.of("k1"), null ); + + for (int ver = CatalogTableDescriptor.INITIAL_TABLE_VERSION + 1; ver <= tableVersion; ver++) { + descriptor = descriptor.newDescriptor("test", ver, columns, CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN); + } + + return descriptor; } @Test