This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 09116a30f7 IGNITE-21205 Get rid of metastorage in schema manager 
(#3013)
09116a30f7 is described below

commit 09116a30f7b5c92e50375d5b588821d4ec1b705a
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Wed Jan 10 09:44:10 2024 +0300

    IGNITE-21205 Get rid of metastorage in schema manager (#3013)
---
 .../catalog/commands/CreateTableCommand.java       |   6 +-
 .../descriptors/CatalogTableDescriptor.java        |  63 +++++-
 .../descriptors/CatalogTableSchemaVersions.java    |  99 +++++++++
 .../internal/catalog/storage/AlterColumnEntry.java |  14 +-
 .../internal/catalog/storage/DropColumnsEntry.java |  11 +-
 .../internal/catalog/storage/NewColumnsEntry.java  |  11 +-
 .../MakeIndexAvailableCommandValidationTest.java   |   6 +-
 .../handler/requests/jdbc/JdbcMetadataCatalog.java |   2 +-
 .../ignite/client/handler/FakeCatalogService.java  |   2 +-
 .../RebalanceUtilUpdateAssignmentsTest.java        |   6 +-
 .../ignite/internal/schema/SchemaManager.java      | 154 ++++----------
 .../apache/ignite/internal/schema/SchemaUtils.java |   2 +-
 .../CatalogToSchemaDescriptorConverter.java        |  14 +-
 .../ignite/internal/schema/SchemaManagerTest.java  | 225 +--------------------
 .../CatalogToSchemaDescriptorConverterTest.java    |   8 +-
 .../storage/AbstractMvTableStorageTest.java        |   6 +-
 .../storage/index/AbstractIndexStorageTest.java    |   6 +-
 .../replication/PartitionReplicaListenerTest.java  |   7 +-
 .../schema/CatalogValidationSchemasSourceTest.java |  12 +-
 19 files changed, 244 insertions(+), 410 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
index f423f11578..22e8c90b61 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.catalog.commands;
 
 import static java.util.Objects.requireNonNullElse;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static 
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.ensureNoTableIndexOrSysViewExistsWithGivenName;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
@@ -111,12 +110,9 @@ public class CreateTableCommand extends 
AbstractTableCommand {
                 pkIndexId,
                 tableName,
                 zone.id(),
-                CatalogTableDescriptor.INITIAL_TABLE_VERSION,
                 
columns.stream().map(CatalogUtils::fromParams).collect(toList()),
                 primaryKeyColumns,
-                colocationColumns,
-                INITIAL_CAUSALITY_TOKEN,
-                INITIAL_CAUSALITY_TOKEN
+                colocationColumns
         );
 
         String indexName = pkIndexName(tableName);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index 89edf8bb98..b9157c2650 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
@@ -45,7 +46,7 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
 
     private final int pkIndexId;
 
-    private final int tableVersion;
+    private final CatalogTableSchemaVersions schemaVersions;
 
     private final List<CatalogTableColumnDescriptor> columns;
     private final List<String> primaryKeyColumns;
@@ -57,28 +58,51 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
     private long creationToken;
 
     /**
-     * Constructor.
+     * Constructor for new table.
+     *
+     * @param id Table id.
+     * @param pkIndexId Primary key index id.
+     * @param name Table name.
+     * @param zoneId Distribution zone ID.
+     * @param columns Table column descriptors.
+     * @param pkCols Primary key column names.
+     */
+    public CatalogTableDescriptor(
+            int id,
+            int schemaId,
+            int pkIndexId,
+            String name,
+            int zoneId,
+            List<CatalogTableColumnDescriptor> columns,
+            List<String> pkCols,
+            @Nullable List<String> colocationCols
+    ) {
+        this(id, schemaId, pkIndexId, name, zoneId, columns, pkCols, 
colocationCols,
+                new CatalogTableSchemaVersions(new TableVersion(columns)), 
INITIAL_CAUSALITY_TOKEN, INITIAL_CAUSALITY_TOKEN);
+    }
+
+    /**
+     * Internal constructor.
      *
      * @param id Table id.
      * @param pkIndexId Primary key index id.
      * @param name Table name.
      * @param zoneId Distribution zone ID.
-     * @param tableVersion Version of the table.
      * @param columns Table column descriptors.
      * @param pkCols Primary key column names.
      * @param causalityToken Token of the update of the descriptor.
      * @param creationToken Token of the creation of the table descriptor.
      */
-    public CatalogTableDescriptor(
+    private CatalogTableDescriptor(
             int id,
             int schemaId,
             int pkIndexId,
             String name,
             int zoneId,
-            int tableVersion,
             List<CatalogTableColumnDescriptor> columns,
             List<String> pkCols,
             @Nullable List<String> colocationCols,
+            CatalogTableSchemaVersions schemaVersions,
             long causalityToken,
             long creationToken
     ) {
@@ -87,13 +111,14 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
         this.schemaId = schemaId;
         this.pkIndexId = pkIndexId;
         this.zoneId = zoneId;
-        this.tableVersion = tableVersion;
         this.columns = Objects.requireNonNull(columns, "No columns defined.");
         primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key 
columns.");
         colocationColumns = colocationCols == null ? pkCols : colocationCols;
 
         this.columnsMap = 
columns.stream().collect(Collectors.toMap(CatalogTableColumnDescriptor::name, 
Function.identity()));
 
+        this.schemaVersions = schemaVersions;
+
         this.creationToken = creationToken;
 
         // TODO: IGNITE-19082 Throw proper exceptions.
@@ -103,6 +128,26 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
         assert Set.copyOf(primaryKeyColumns).containsAll(colocationColumns);
     }
 
+    /**
+     * Creates new table descriptor, using existing one as a template.
+     */
+    public CatalogTableDescriptor newDescriptor(
+            String name,
+            int tableVersion,
+            List<CatalogTableColumnDescriptor> columns,
+            long causalityToken
+    ) {
+        CatalogTableSchemaVersions newSchemaVersions = tableVersion == 
schemaVersions.latestVersion()
+                ? schemaVersions
+                : schemaVersions.append(new TableVersion(columns), 
tableVersion);
+
+        return new CatalogTableDescriptor(
+                id(), schemaId, pkIndexId, name, zoneId, columns, 
primaryKeyColumns, colocationColumns,
+                newSchemaVersions,
+                causalityToken, creationToken
+        );
+    }
+
     /**
      * Returns column descriptor for column with given name.
      */
@@ -114,6 +159,10 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
         return schemaId;
     }
 
+    public CatalogTableSchemaVersions schemaVersions() {
+        return schemaVersions;
+    }
+
     public int zoneId() {
         return zoneId;
     }
@@ -123,7 +172,7 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
     }
 
     public int tableVersion() {
-        return tableVersion;
+        return schemaVersions.latestVersion();
     }
 
     public List<String> primaryKeyColumns() {
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableSchemaVersions.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableSchemaVersions.java
new file mode 100644
index 0000000000..dafb831d69
--- /dev/null
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableSchemaVersions.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.descriptors;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class that holds a list of table version descriptors.
+ */
+public class CatalogTableSchemaVersions implements Serializable {
+    private static final long serialVersionUID = 4353352473287209850L;
+
+    /**
+     * Descriptor of a single table version.
+     */
+    public static class TableVersion implements Serializable {
+        private static final long serialVersionUID = -5185852983239967322L;
+
+        private final List<CatalogTableColumnDescriptor> columns;
+
+        TableVersion(List<CatalogTableColumnDescriptor> columns) {
+            this.columns = columns;
+        }
+
+        public List<CatalogTableColumnDescriptor> columns() {
+            return Collections.unmodifiableList(columns);
+        }
+    }
+
+    private final int base;
+    private final TableVersion[] versions;
+
+    /**
+     * Constructor.
+     *
+     * @param versions Array of table versions.
+     */
+    CatalogTableSchemaVersions(TableVersion... versions) {
+        this(CatalogTableDescriptor.INITIAL_TABLE_VERSION, versions);
+    }
+
+    private CatalogTableSchemaVersions(int base, TableVersion... versions) {
+        this.base = base;
+        this.versions = versions;
+    }
+
+    /**
+     * Returns earliest known table version.
+     */
+    public int earliestVersion() {
+        return base;
+    }
+
+    /**
+     * Returns latest known table version.
+     */
+    public int latestVersion() {
+        return base + versions.length - 1;
+    }
+
+    /**
+     * Returns an existing table version, or {@code null} if it's not found.
+     */
+    public @Nullable TableVersion get(int version) {
+        if (version < base || version >= base + versions.length) {
+            return null;
+        }
+
+        return versions[version - base];
+    }
+
+    /**
+     * Creates a new instance of {@link CatalogTableSchemaVersions} with one 
new version appended.
+     */
+    public CatalogTableSchemaVersions append(TableVersion tableVersion, int 
version) {
+        assert version == latestVersion() + 1;
+
+        return new CatalogTableSchemaVersions(base, 
ArrayUtils.concat(versions, tableVersion));
+    }
+}
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index 615387953b..75bce1b9ee 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -89,23 +89,13 @@ public class AlterColumnEntry implements UpdateEntry, 
Fireable {
                         schema.id(),
                         schema.name(),
                         Arrays.stream(schema.tables())
-                                .map(table -> table.id() != tableId
-                                        ? table
-                                        : new CatalogTableDescriptor(
-                                                table.id(),
-                                                table.schemaId(),
-                                                table.primaryKeyIndexId(),
+                                .map(table -> table.id() == tableId ? 
table.newDescriptor(
                                                 table.name(),
-                                                table.zoneId(),
                                                 table.tableVersion() + 1,
                                                 table.columns().stream()
                                                         .map(source -> 
source.name().equals(column.name()) ? column : source)
                                                         .collect(toList()),
-                                                table.primaryKeyColumns(),
-                                                table.colocationColumns(),
-                                                causalityToken,
-                                                table.creationToken()
-                                        )
+                                                causalityToken) : table
                                 )
                                 .toArray(CatalogTableDescriptor[]::new),
                         schema.indexes(),
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index 2a481153d1..682cb66766 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -87,20 +87,13 @@ public class DropColumnsEntry implements UpdateEntry, 
Fireable {
                         schema.id(),
                         schema.name(),
                         Arrays.stream(schema.tables())
-                                .map(table -> table.id() == tableId ? new 
CatalogTableDescriptor(
-                                        table.id(),
-                                        table.schemaId(),
-                                        table.primaryKeyIndexId(),
+                                .map(table -> table.id() == tableId ? 
table.newDescriptor(
                                         table.name(),
-                                        table.zoneId(),
                                         table.tableVersion() + 1,
                                         table.columns().stream()
                                                 .filter(col -> 
!columns.contains(col.name()))
                                                 .collect(toList()),
-                                        table.primaryKeyColumns(),
-                                        table.colocationColumns(),
-                                        causalityToken,
-                                        table.creationToken()) : table
+                                        causalityToken) : table
                                 )
                                 .toArray(CatalogTableDescriptor[]::new),
                         schema.indexes(),
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index 0230374f8f..c79e855c00 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -86,18 +86,11 @@ public class NewColumnsEntry implements UpdateEntry, 
Fireable {
                         schema.id(),
                         schema.name(),
                         Arrays.stream(schema.tables())
-                                .map(table -> table.id() == tableId ? new 
CatalogTableDescriptor(
-                                        table.id(),
-                                        table.schemaId(),
-                                        table.primaryKeyIndexId(),
+                                .map(table -> table.id() == tableId ? 
table.newDescriptor(
                                         table.name(),
-                                        table.zoneId(),
                                         table.tableVersion() + 1,
                                         
CollectionUtils.concat(table.columns(), descriptors),
-                                        table.primaryKeyColumns(),
-                                        table.colocationColumns(),
-                                        causalityToken,
-                                        table.creationToken()) : table
+                                        causalityToken) : table
                                 )
                                 .toArray(CatalogTableDescriptor[]::new),
                         schema.indexes(),
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java
index 0914fdec66..494e41ab76 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/MakeIndexAvailableCommandValidationTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.catalog.commands;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_LENGTH;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PRECISION;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_SCALE;
-import static 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.sql.ColumnType.INT32;
@@ -92,12 +91,9 @@ public class MakeIndexAvailableCommandValidationTest extends 
AbstractCommandVali
                 pkIndexId,
                 "TEST_TABLE",
                 zoneId,
-                INITIAL_TABLE_VERSION,
                 List.of(tableColumn(columnName)),
                 List.of(columnName),
-                null,
-                1,
-                1
+                null
         );
     }
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java
index a78eb71316..f654274307 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcMetadataCatalog.java
@@ -173,7 +173,7 @@ public class JdbcMetadataCatalog {
                 .filter(t -> tableNameAndSchemaMatches(t, schemaNameRegex, 
tlbNameRegex))
                 .flatMap(
                     tbl -> {
-                        SchemaDescriptor schema = 
CatalogToSchemaDescriptorConverter.convert(tbl);
+                        SchemaDescriptor schema = 
CatalogToSchemaDescriptorConverter.convert(tbl, tbl.tableVersion());
 
                         return 
Stream.concat(Arrays.stream(schema.keyColumns().columns()), 
Arrays.stream(schema.valueColumns().columns()))
                                 .map(column -> new Pair<>(tbl.name(), column));
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
index e27f13307f..5301f19c77 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
@@ -56,7 +56,7 @@ public class FakeCatalogService implements CatalogService {
     @Override
     public CatalogTableDescriptor table(int tableId, long timestamp) {
         return new CatalogTableDescriptor(
-                tableId, 0, 0, "table", 0, 0, 
List.of(mock(CatalogTableColumnDescriptor.class)), List.of(), null, 0, 0);
+                tableId, 0, 0, "table", 0, 
List.of(mock(CatalogTableColumnDescriptor.class)), List.of(), null);
     }
 
     @Override
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index 2b920568b1..8ca3b6b5e5 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.distributionzones.rebalance;
 
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
-import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -91,12 +90,9 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
             -1,
             "table1",
             0,
-            1,
             List.of(new CatalogTableColumnDescriptor("k1", ColumnType.INT32, 
false, 0, 0, 0, null)),
             List.of("k1"),
-            null,
-            INITIAL_CAUSALITY_TOKEN,
-            INITIAL_CAUSALITY_TOKEN
+            null
     );
 
     private static final int partNum = 2;
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 555a80d36a..10164d7ccf 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -17,45 +17,35 @@
 
 package org.apache.ignite.internal.schema;
 
-import static java.util.Collections.emptyList;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
-import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
-import java.util.stream.IntStream;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.TableEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import 
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
+import 
org.apache.ignite.internal.schema.catalog.CatalogToSchemaDescriptorConverter;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
@@ -65,10 +55,6 @@ import org.jetbrains.annotations.Nullable;
  * This class services management of table schemas.
  */
 public class SchemaManager implements IgniteComponent {
-    /** Schema history key predicate part. */
-    private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
-    private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX = 
".sch-hist-latest";
-
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -103,14 +89,35 @@ public class SchemaManager implements IgniteComponent {
     }
 
     private void registerExistingTables() {
-        // TODO: IGNITE-20051 - add proper recovery (consider tables that are 
removed now; take token and catalog version
-        // exactly matching the tables).
-
         long causalityToken = metastorageMgr.appliedRevision();
-        int catalogVersion = catalogService.latestCatalogVersion();
 
-        for (CatalogTableDescriptor tableDescriptor : 
catalogService.tables(catalogVersion)) {
-            onTableCreated(new CreateTableEventParameters(causalityToken, 
catalogVersion, tableDescriptor), null);
+        for (int catalogVer = catalogService.latestCatalogVersion(); 
catalogVer >= catalogService.earliestCatalogVersion(); catalogVer--) {
+            Collection<CatalogTableDescriptor> tables = 
catalogService.tables(catalogVer);
+
+            registriesVv.update(causalityToken, (registries, throwable) -> {
+                for (CatalogTableDescriptor tableDescriptor : tables) {
+                    int tableId = tableDescriptor.id();
+
+                    if (registries.containsKey(tableId)) {
+                        continue;
+                    }
+
+                    SchemaDescriptor prevSchema = null;
+                    CatalogTableSchemaVersions schemaVersions = 
tableDescriptor.schemaVersions();
+                    for (int tableVer = schemaVersions.earliestVersion(); 
tableVer <= schemaVersions.latestVersion(); tableVer++) {
+                        SchemaDescriptor newSchema = 
CatalogToSchemaDescriptorConverter.convert(tableDescriptor, tableVer);
+
+                        if (prevSchema != null) {
+                            
newSchema.columnMapping(SchemaUtils.columnMapper(prevSchema, newSchema));
+                        }
+
+                        prevSchema = newSchema;
+                        registries = registerSchema(registries, tableId, 
newSchema);
+                    }
+                }
+
+                return completedFuture(registries);
+            });
         }
 
         registriesVv.complete(causalityToken);
@@ -169,13 +176,12 @@ public class SchemaManager implements IgniteComponent {
 
             return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
                 if (e != null) {
-                    return failedFuture(new 
IgniteInternalException(IgniteStringFormatter.format(
+                    return failedFuture(new IgniteInternalException(format(
                             "Cannot create a schema for the table [tblId={}, 
ver={}]", tableId, newSchemaVersion), e)
                     );
                 }
 
-                return saveSchemaDescriptor(tableId, newSchema)
-                        .thenApply(t -> registerSchema(registries, tableId, 
newSchema));
+                return completedFuture(registerSchema(registries, tableId, 
newSchema));
             })).thenApply(ignored -> false);
         } finally {
             busyLock.leaveBusy();
@@ -208,38 +214,21 @@ public class SchemaManager implements IgniteComponent {
      * @return Schema representation.
      */
     private SchemaDescriptor loadSchemaDescriptor(int tblId, int ver) {
-        Entry entry = metastorageMgr.getLocally(schemaWithVerHistKey(tblId, 
ver), Long.MAX_VALUE);
+        int catalogVersion = catalogService.latestCatalogVersion();
 
-        assert !entry.tombstone() : "Table " + tblId + ", version " + ver;
+        while (catalogVersion >= catalogService.earliestCatalogVersion()) {
+            CatalogTableDescriptor tableDescriptor = 
catalogService.table(tblId, catalogVersion);
 
-        byte[] value = entry.value();
+            if (tableDescriptor == null) {
+                catalogVersion--;
 
-        assert value != null;
+                continue;
+            }
 
-        return SchemaSerializerImpl.INSTANCE.deserialize(value);
-    }
+            return CatalogToSchemaDescriptorConverter.convert(tableDescriptor, 
ver);
+        }
 
-    /**
-     * Saves a schema in the MetaStorage.
-     *
-     * @param tableId Table id.
-     * @param schema Schema descriptor.
-     * @return Future that will be completed when the schema gets saved.
-     */
-    private CompletableFuture<Void> saveSchemaDescriptor(int tableId, 
SchemaDescriptor schema) {
-        ByteArray schemaKey = schemaWithVerHistKey(tableId, schema.version());
-        ByteArray latestSchemaVersionKey = latestSchemaVersionKey(tableId);
-
-        byte[] serializedSchema = 
SchemaSerializerImpl.INSTANCE.serialize(schema);
-
-        return metastorageMgr.invoke(
-                notExists(schemaKey),
-                List.of(
-                        put(schemaKey, serializedSchema),
-                        put(latestSchemaVersionKey, 
intToBytes(schema.version()))
-                ),
-                emptyList()
-        ).thenApply(unused -> null);
+        throw new AssertionError(format("Schema descriptor is not found 
[tableId={}, schemaId={}]", tblId, ver));
     }
 
     /**
@@ -340,16 +329,10 @@ public class SchemaManager implements IgniteComponent {
      * @param tableId Table id.
      */
     public CompletableFuture<?> dropRegistry(long causalityToken, int tableId) 
{
-        return removeRegistry(causalityToken, tableId).thenCompose(unused -> {
-            return destroySchemas(tableId);
-        });
-    }
-
-    private CompletableFuture<?> removeRegistry(long causalityToken, int 
tableId) {
         return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
                 return failedFuture(new IgniteInternalException(
-                        IgniteStringFormatter.format("Cannot remove a schema 
registry for the table [tblId={}]", tableId), e));
+                        format("Cannot remove a schema registry for the table 
[tblId={}]", tableId), e));
             }
 
             Map<Integer, SchemaRegistryImpl> regs = new HashMap<>(registries);
@@ -361,23 +344,6 @@ public class SchemaManager implements IgniteComponent {
         }));
     }
 
-    private CompletableFuture<?> destroySchemas(int tableId) {
-        return latestSchemaVersion(tableId)
-                .thenCompose(latestVersion -> {
-                    if (latestVersion == null) {
-                        // Nothing to remove.
-                        return nullCompletedFuture();
-                    }
-
-                    Set<ByteArray> keysToRemove = 
IntStream.rangeClosed(CatalogTableDescriptor.INITIAL_TABLE_VERSION, 
latestVersion)
-                            .mapToObj(version -> schemaWithVerHistKey(tableId, 
version))
-                            .collect(toSet());
-                    keysToRemove.add(latestSchemaVersionKey(tableId));
-
-                    return metastorageMgr.removeAll(keysToRemove);
-                });
-    }
-
     @Override
     public void stop() throws Exception {
         if (!stopGuard.compareAndSet(false, true)) {
@@ -389,36 +355,4 @@ public class SchemaManager implements IgniteComponent {
         //noinspection ConstantConditions
         IgniteUtils.closeAllManually(registriesVv.latest().values());
     }
-
-    /**
-     * Gets the latest version of the table schema which is available in 
Metastore or {@code null} if nothing is available.
-     *
-     * @param tableId Table id.
-     * @return The latest schema version or {@code null} if nothing is 
available.
-     */
-    private CompletableFuture<Integer> latestSchemaVersion(int tableId) {
-        return metastorageMgr.get(latestSchemaVersionKey(tableId))
-                .thenApply(entry -> {
-                    if (entry == null || entry.value() == null) {
-                        return null;
-                    } else {
-                        return ByteUtils.bytesToInt(entry.value());
-                    }
-                });
-    }
-
-    /**
-     * Forms schema history key.
-     *
-     * @param tblId Table id.
-     * @param ver Schema version.
-     * @return {@link ByteArray} representation.
-     */
-    private static ByteArray schemaWithVerHistKey(int tblId, int ver) {
-        return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
-    }
-
-    private static ByteArray latestSchemaVersionKey(int tableId) {
-        return ByteArray.fromString(tableId + 
LATEST_SCHEMA_VERSION_STORE_SUFFIX);
-    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
index 72bfa76cb3..da9193349e 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
@@ -35,7 +35,7 @@ public class SchemaUtils {
      * @return Schema descriptor.
      */
     public static SchemaDescriptor 
prepareSchemaDescriptor(CatalogTableDescriptor tableDescriptor) {
-        return CatalogToSchemaDescriptorConverter.convert(tableDescriptor);
+        return CatalogToSchemaDescriptorConverter.convert(tableDescriptor, 
tableDescriptor.tableVersion());
     }
 
     /**
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
index 588baebb62..a215c0fab1 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.schema.catalog;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -28,6 +30,7 @@ import 
org.apache.ignite.internal.catalog.commands.DefaultValue.FunctionCall;
 import org.apache.ignite.internal.catalog.commands.DefaultValue.Type;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.DefaultValueGenerator;
 import org.apache.ignite.internal.schema.DefaultValueProvider;
@@ -149,7 +152,7 @@ public final class CatalogToSchemaDescriptorConverter {
      * @param tableDescriptor Descriptor to convert.
      * @return A {@link SchemaDescriptor} object representing the table 
descriptor.
      */
-    public static SchemaDescriptor convert(CatalogTableDescriptor 
tableDescriptor) {
+    public static SchemaDescriptor convert(CatalogTableDescriptor 
tableDescriptor, int tableVersion) {
         Set<String> keyColumnsNames = 
Set.copyOf(tableDescriptor.primaryKeyColumns());
 
         List<Column> keyCols = new ArrayList<>(keyColumnsNames.size());
@@ -157,7 +160,12 @@ public final class CatalogToSchemaDescriptorConverter {
 
         int idx = 0;
 
-        for (CatalogTableColumnDescriptor column : tableDescriptor.columns()) {
+        TableVersion tableVersionInstance = 
tableDescriptor.schemaVersions().get(tableVersion);
+
+        assert tableVersionInstance != null
+                : format("Cannot find table version {} in table descriptor 
{}", tableVersion, tableDescriptor);
+
+        for (CatalogTableColumnDescriptor column : 
tableVersionInstance.columns()) {
             if (keyColumnsNames.contains(column.name())) {
                 keyCols.add(convert(idx, column));
             } else {
@@ -168,7 +176,7 @@ public final class CatalogToSchemaDescriptorConverter {
         }
 
         return new SchemaDescriptor(
-                tableDescriptor.tableVersion(),
+                tableVersion,
                 keyCols.toArray(Column[]::new),
                 tableDescriptor.colocationColumns().toArray(String[]::new),
                 valCols.toArray(Column[]::new)
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
index cddac3f6a1..aa769bd006 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaManagerTest.java
@@ -17,20 +17,16 @@
 
 package org.apache.ignite.internal.schema;
 
-
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -44,7 +40,6 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
@@ -52,21 +47,15 @@ import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
 import org.apache.ignite.internal.event.EventListener;
-import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
-import 
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.type.NativeTypeSpec;
-import org.apache.ignite.internal.util.subscription.ListAccumulator;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.sql.ColumnType;
@@ -82,9 +71,6 @@ import org.mockito.quality.Strictness;
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
 class SchemaManagerTest extends BaseIgniteAbstractTest {
-    private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
-    private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX = 
".sch-hist-latest";
-
     private static final int TABLE_ID = 3;
     private static final String TABLE_NAME = "t";
 
@@ -145,28 +131,6 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
         vaultManager.stop();
     }
 
-    @Test
-    void savesSchemaOnTableCreation() {
-        createSomeTable();
-
-        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(1);
-
-        assertThat(schemaDescriptor.version(), is(1));
-        assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1"));
-
-        Column k1 = schemaDescriptor.column("k1");
-        assertThat(k1, is(notNullValue()));
-
-        assertThat(k1.name(), is("k1"));
-        assertThat(k1.type().spec(), is(NativeTypeSpec.INT16));
-
-        Column v1 = schemaDescriptor.column("v1");
-        assertThat(v1, is(notNullValue()));
-
-        assertThat(v1.name(), is("v1"));
-        assertThat(v1.type().spec(), is(NativeTypeSpec.INT32));
-    }
-
     private void createSomeTable() {
         List<CatalogTableColumnDescriptor> columns = List.of(
                 new CatalogTableColumnDescriptor("k1", ColumnType.INT16, 
false, 0, 0, 0, null),
@@ -174,7 +138,7 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
                 new CatalogTableColumnDescriptor("v1", ColumnType.INT32, 
false, 0, 0, 0, null)
         );
         CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
-                TABLE_ID, -1, -1, TABLE_NAME, 0, 1, columns, List.of("k1", 
"k2"), null, INITIAL_CAUSALITY_TOKEN, INITIAL_CAUSALITY_TOKEN
+                TABLE_ID, -1, -1, TABLE_NAME, 0, columns, List.of("k1", "k2"), 
null
         );
 
         CompletableFuture<Boolean> future = tableCreatedListener()
@@ -193,49 +157,6 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
         return Objects.requireNonNull(tableAlteredListener, 
"tableAlteredListener is not registered with CatalogService");
     }
 
-    private SchemaDescriptor getSchemaDescriptor(int schemaVersion) {
-        Entry entry = metaStorageKvStorage.get(schemaWithVerHistKey(TABLE_ID, 
schemaVersion).bytes());
-        assertThat(entry, is(notNullValue()));
-
-        byte[] value = entry.value();
-        assertThat(value, is(notNullValue()));
-
-        return SchemaSerializerImpl.INSTANCE.deserialize(value);
-    }
-
-    private static ByteArray schemaWithVerHistKey(int tblId, int ver) {
-        return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
-    }
-
-    @Test
-    void savesSchemaOnColumnAddition() {
-        createSomeTable();
-
-        when(catalogService.table(TABLE_ID, 
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAddition());
-
-        AddColumnEventParameters event = new AddColumnEventParameters(
-                CAUSALITY_TOKEN_2,
-                CATALOG_VERSION_2,
-                TABLE_ID,
-                List.of(new CatalogTableColumnDescriptor("v2", 
ColumnType.STRING, false, 0, 0, 0, null))
-        );
-
-        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(event, null);
-
-        assertThat(future, willBe(false));
-
-        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
-
-        assertThat(schemaDescriptor.version(), is(2));
-        assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1", 
"v2"));
-
-        Column v2 = schemaDescriptor.column("v2");
-        assertThat(v2, is(notNullValue()));
-
-        assertThat(v2.name(), is("v2"));
-        assertThat(v2.type().spec(), is(NativeTypeSpec.STRING));
-    }
-
     private static CatalogTableDescriptor tableDescriptorAfterColumnAddition() 
{
         List<CatalogTableColumnDescriptor> columns = List.of(
                 new CatalogTableColumnDescriptor("k1", ColumnType.INT16, 
false, 0, 0, 0, null),
@@ -250,111 +171,19 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
                 -1,
                 TABLE_NAME,
                 0,
-                2,
                 columns,
                 List.of("k1", "k2"),
-                null,
-                INITIAL_CAUSALITY_TOKEN,
-                INITIAL_CAUSALITY_TOKEN
-        );
-    }
-
-    private void completeCausalityToken(long causalityToken) {
-        
assertThat(onMetastoreRevisionCompleteHolder.get().apply(causalityToken), 
willCompleteSuccessfully());
-    }
-
-    @Test
-    void savesSchemaOnColumnRemoval() {
-        createSomeTable();
-
-        when(catalogService.table(TABLE_ID, 
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnRemoval());
-
-        DropColumnEventParameters event = new DropColumnEventParameters(
-                CAUSALITY_TOKEN_2,
-                CATALOG_VERSION_2,
-                TABLE_ID,
-                List.of("v1")
-        );
-
-        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(event, null);
-
-        assertThat(future, willBe(false));
-
-        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
-
-        assertThat(schemaDescriptor.version(), is(2));
-        assertThat(schemaDescriptor.columnNames(), contains("k1", "k2"));
-    }
-
-    private static CatalogTableDescriptor tableDescriptorAfterColumnRemoval() {
-        List<CatalogTableColumnDescriptor> columns = List.of(
-                new CatalogTableColumnDescriptor("k1", ColumnType.INT16, 
false, 0, 0, 0, null),
-                new CatalogTableColumnDescriptor("k2", ColumnType.STRING, 
false, 0, 0, 0, null)
-        );
-
-        return new CatalogTableDescriptor(
-                TABLE_ID,
-                -1,
-                -1,
+                null
+        ).newDescriptor(
                 TABLE_NAME,
-                0,
-                2,
+                INITIAL_TABLE_VERSION + 1,
                 columns,
-                List.of("k1", "k2"),
-                null,
-                INITIAL_CAUSALITY_TOKEN,
                 INITIAL_CAUSALITY_TOKEN
         );
     }
 
-    @Test
-    void savesSchemaOnColumnAlteration() {
-        createSomeTable();
-
-        when(catalogService.table(TABLE_ID, 
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAlteration());
-
-        AlterColumnEventParameters event = new AlterColumnEventParameters(
-                CAUSALITY_TOKEN_2,
-                CATALOG_VERSION_2,
-                TABLE_ID,
-                new CatalogTableColumnDescriptor("v1", ColumnType.INT64, 
false, 0, 0, 0, null)
-        );
-
-        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(event, null);
-
-        assertThat(future, willBe(false));
-
-        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
-
-        assertThat(schemaDescriptor.version(), is(2));
-
-        Column v1 = schemaDescriptor.column("v1");
-        assertThat(v1, is(notNullValue()));
-
-        assertThat(v1.name(), is("v1"));
-        assertThat(v1.type().spec(), is(NativeTypeSpec.INT64));
-    }
-
-    private static CatalogTableDescriptor 
tableDescriptorAfterColumnAlteration() {
-        List<CatalogTableColumnDescriptor> columns = List.of(
-                new CatalogTableColumnDescriptor("k1", ColumnType.INT32, 
false, 0, 0, 0, null),
-                new CatalogTableColumnDescriptor("k2", ColumnType.STRING, 
false, 0, 0, 0, null),
-                new CatalogTableColumnDescriptor("v1", ColumnType.INT64, 
false, 0, 0, 0, null)
-        );
-
-        return new CatalogTableDescriptor(
-                TABLE_ID,
-                -1,
-                -1,
-                TABLE_NAME,
-                0,
-                2,
-                columns,
-                List.of("k1", "k2"),
-                null,
-                INITIAL_CAUSALITY_TOKEN,
-                INITIAL_CAUSALITY_TOKEN
-        );
+    private void completeCausalityToken(long causalityToken) {
+        
assertThat(onMetastoreRevisionCompleteHolder.get().apply(causalityToken), 
willCompleteSuccessfully());
     }
 
     @Test
@@ -476,46 +305,6 @@ class SchemaManagerTest extends BaseIgniteAbstractTest {
         assertThat(schemaRegistry, is(nullValue()));
     }
 
-    @Test
-    void dropRegistryRemovesSchemasFromMetastorage() {
-        createSomeTable();
-
-        assertThat(schemaManager.dropRegistry(CAUSALITY_TOKEN_2, TABLE_ID), 
willCompleteSuccessfully());
-
-        completeCausalityToken(CAUSALITY_TOKEN_2);
-
-        assertThatNoSchemasExist(TABLE_ID);
-        assertThatNoLatestSchemaVersionExists(TABLE_ID);
-    }
-
-    private void assertThatNoSchemasExist(int tableId) {
-        CompletableFuture<List<String>> schemaEntryKeysFuture = new 
CompletableFuture<>();
-
-        Publisher<Entry> publisher = 
metaStorageManager.prefix(ByteArray.fromString(tableId + SCHEMA_STORE_PREFIX));
-        publisher.subscribe(
-                new ListAccumulator<Entry, String>(entry -> new 
String(entry.key(), UTF_8))
-                        .toSubscriber(schemaEntryKeysFuture)
-        );
-
-        assertThat(schemaEntryKeysFuture, willBe(empty()));
-    }
-
-    private void assertThatNoLatestSchemaVersionExists(int tableId) {
-        CompletableFuture<Entry> future = 
metaStorageManager.get(latestSchemaVersionKey(tableId));
-
-        assertThat(future, willCompleteSuccessfully());
-
-        Entry entry = future.join();
-
-        if (entry != null) {
-            assertThat(entry.value(), is(nullValue()));
-        }
-    }
-
-    private static ByteArray latestSchemaVersionKey(int tableId) {
-        return ByteArray.fromString(tableId + 
LATEST_SCHEMA_VERSION_STORE_SUFFIX);
-    }
-
     @Test
     void loadingPreExistingSchemasWorks() throws Exception {
         create2TableVersions();
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
index 5f7a2499f0..b32ada814c 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.schema.catalog;
 
-import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static org.apache.ignite.internal.schema.SchemaTestUtils.specToType;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -134,7 +133,6 @@ public class CatalogToSchemaDescriptorConverterTest extends 
AbstractSchemaConver
                 -1,
                 "test",
                 0,
-                1,
                 List.of(
                         new CatalogTableColumnDescriptor("C1", 
ColumnType.INT32, false, 0, 0, 0, null),
                         new CatalogTableColumnDescriptor("K2", 
ColumnType.INT32, false, 0, 0, 0, null),
@@ -142,12 +140,10 @@ public class CatalogToSchemaDescriptorConverterTest 
extends AbstractSchemaConver
                         new CatalogTableColumnDescriptor("K1", 
ColumnType.INT32, false, 0, 0, 0, null)
                 ),
                 List.of("K1", "K2"),
-                List.of("K2"),
-                INITIAL_CAUSALITY_TOKEN,
-                INITIAL_CAUSALITY_TOKEN
+                List.of("K2")
         );
 
-        SchemaDescriptor schema = 
CatalogToSchemaDescriptorConverter.convert(tableDescriptor);
+        SchemaDescriptor schema = 
CatalogToSchemaDescriptorConverter.convert(tableDescriptor, 
tableDescriptor.tableVersion());
 
         assertThat(schema.keyColumns().length(), equalTo(2));
         assertThat(schema.keyColumns().column(0).name(), equalTo("K1"));
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 1ea34a901a..3b9c3bfe52 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static 
org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST;
 import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
 import static 
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
@@ -775,7 +774,6 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
                 hashIndexId,
                 TABLE_NAME,
                 zoneId,
-                1,
                 List.of(
                         
CatalogUtils.fromParams(ColumnParams.builder().name("INTKEY").type(INT32).build()),
                         
CatalogUtils.fromParams(ColumnParams.builder().name("STRKEY").length(100).type(STRING).build()),
@@ -783,9 +781,7 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
                         
CatalogUtils.fromParams(ColumnParams.builder().name("STRVAL").length(100).type(STRING).build())
                 ),
                 List.of("INTKEY"),
-                null,
-                INITIAL_CAUSALITY_TOKEN,
-                INITIAL_CAUSALITY_TOKEN
+                null
         );
 
         CatalogSortedIndexDescriptor sortedIndex = new 
CatalogSortedIndexDescriptor(
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
index 23a95a62d9..76ef74662b 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.storage.index;
 
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
-import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static 
org.apache.ignite.internal.storage.BaseMvStoragesTest.getOrCreateMvPartition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -158,12 +157,9 @@ public abstract class AbstractIndexStorageTest<S extends 
IndexStorage, D extends
                 1,
                 TABLE_NAME,
                 zoneId,
-                1,
                 Stream.concat(Stream.of(pkColumn), 
ALL_TYPES_COLUMN_PARAMS.stream()).map(CatalogUtils::fromParams).collect(toList()),
                 List.of("pk"),
-                null,
-                INITIAL_CAUSALITY_TOKEN,
-                INITIAL_CAUSALITY_TOKEN
+                null
         );
 
         when(catalogService.table(eq(TABLE_NAME), 
anyLong())).thenReturn(tableDescriptor);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 6cbf88d8c4..58c24ba8a9 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -20,7 +20,6 @@ package 
org.apache.ignite.internal.table.distributed.replication;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
 import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
@@ -333,7 +332,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private KvMarshaller<TestKey, TestValue> kvMarshallerVersion2;
 
     private final CatalogTableDescriptor tableDescriptor = new 
CatalogTableDescriptor(
-            TABLE_ID, 1, 2, "table", 1, CURRENT_SCHEMA_VERSION,
+            TABLE_ID, 1, 2, "table", 1,
             List.of(
                     new CatalogTableColumnDescriptor("intKey", 
ColumnType.INT32, false, 0, 0, 0, null),
                     new CatalogTableColumnDescriptor("strKey", 
ColumnType.STRING, false, 0, 0, 0, null),
@@ -341,9 +340,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                     new CatalogTableColumnDescriptor("strVal", 
ColumnType.STRING, false, 0, 0, 0, null)
             ),
             List.of("intKey", "strKey"),
-            null,
-            INITIAL_CAUSALITY_TOKEN,
-            INITIAL_CAUSALITY_TOKEN
+            null
     );
 
     /** Partition replication listener to test. */
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
index 48847f641f..5369b569dd 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.table.distributed.schema;
 
-import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -35,6 +34,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -113,9 +113,15 @@ class CatalogValidationSchemasSourceTest extends 
BaseIgniteAbstractTest {
                 new CatalogTableColumnDescriptor("v1", ColumnType.INT32, 
false, 0, 0, 0, null)
         );
 
-        return new CatalogTableDescriptor(
-                tableId, -1, -1, "test", 0, tableVersion, columns, 
List.of("k1"), null, INITIAL_CAUSALITY_TOKEN, INITIAL_CAUSALITY_TOKEN
+        CatalogTableDescriptor descriptor = new CatalogTableDescriptor(
+                tableId, -1, -1, "test", 0, columns, List.of("k1"), null
         );
+
+        for (int ver = CatalogTableDescriptor.INITIAL_TABLE_VERSION + 1; ver 
<= tableVersion; ver++) {
+            descriptor = descriptor.newDescriptor("test", ver, columns, 
CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN);
+        }
+
+        return descriptor;
     }
 
     @Test

Reply via email to