This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a690ba4a9 IGNITE-19501 SchemaManager should use CatalogService for 
building BinaryRow descriptors (#2356)
3a690ba4a9 is described below

commit 3a690ba4a977de0311c27c6e375b6290facf20be
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Aug 1 14:05:43 2023 +0400

    IGNITE-19501 SchemaManager should use CatalogService for building BinaryRow 
descriptors (#2356)
---
 .../internal/catalog/commands/CatalogUtils.java    |   1 +
 .../descriptors/CatalogTableDescriptor.java        |  11 +
 .../catalog/events/AddColumnEventParameters.java   |  11 +-
 .../catalog/events/AlterColumnEventParameters.java |  14 +-
 .../catalog/events/CreateTableEventParameters.java |   4 +-
 .../catalog/events/DropColumnEventParameters.java  |  11 +-
 .../catalog/events/DropTableEventParameters.java   |  13 +-
 ...ntParameters.java => TableEventParameters.java} |  12 +-
 .../internal/catalog/storage/AlterColumnEntry.java |   1 +
 .../internal/catalog/storage/DropColumnsEntry.java |   1 +
 .../internal/catalog/storage/NewColumnsEntry.java  |   1 +
 .../internal/catalog/CatalogManagerSelfTest.java   |  79 ++++
 .../DistributionZoneRebalanceEngineTest.java       |   3 +
 .../RebalanceUtilUpdateAssignmentsTest.java        |   1 +
 modules/schema/build.gradle                        |   4 +
 .../internal/schema/CatalogDescriptorUtils.java    |   2 +
 ...chemaManager.java => CatalogSchemaManager.java} | 330 +++++++------
 .../ignite/internal/schema/SchemaManager.java      |   2 +-
 .../apache/ignite/internal/schema/SchemaUtils.java |  12 +
 .../CatalogToSchemaDescriptorConverter.java        | 179 ++++++++
 .../AbstractSchemaConverterTest.java               |  10 +-
 .../internal/schema/CatalogSchemaManagerTest.java  | 509 +++++++++++++++++++++
 .../CatalogToSchemaDescriptorConverterTest.java    | 199 ++++++++
 ...nfigurationToSchemaDescriptorConverterTest.java |   1 +
 .../testutils/SchemaConfigurationConverter.java    |   3 +
 .../engine/schema/CatalogSqlSchemaManagerTest.java |  10 +-
 26 files changed, 1219 insertions(+), 205 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index 7823b09d47..d085f4ca0d 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -60,6 +60,7 @@ public class CatalogUtils {
                 id,
                 params.tableName(),
                 zoneId,
+                CatalogTableDescriptor.INITIAL_TABLE_VERSION,
                 
params.columns().stream().map(CatalogUtils::fromParams).collect(toList()),
                 params.primaryKeyColumns(),
                 params.colocationColumns()
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index 58a744d74c..2948aa90f7 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -35,8 +35,12 @@ import org.jetbrains.annotations.Nullable;
 public class CatalogTableDescriptor extends CatalogObjectDescriptor {
     private static final long serialVersionUID = -2021394971104316570L;
 
+    public static final int INITIAL_TABLE_VERSION = 1;
+
     private final int zoneId;
 
+    private final int tableVersion;
+
     private final List<CatalogTableColumnDescriptor> columns;
     private final List<String> primaryKeyColumns;
     private final List<String> colocationColumns;
@@ -50,6 +54,7 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
      * @param id Table id.
      * @param name Table name.
      * @param zoneId Distribution zone ID.
+     * @param tableVersion Version of the table.
      * @param columns Table column descriptors.
      * @param pkCols Primary key column names.
      * @param colocationCols Colocation column names.
@@ -58,6 +63,7 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
             int id,
             String name,
             int zoneId,
+            int tableVersion,
             List<CatalogTableColumnDescriptor> columns,
             List<String> pkCols,
             @Nullable List<String> colocationCols
@@ -65,6 +71,7 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
         super(id, Type.TABLE, name);
 
         this.zoneId = zoneId;
+        this.tableVersion = tableVersion;
         this.columns = Objects.requireNonNull(columns, "No columns defined.");
         primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key 
columns.");
         colocationColumns = colocationCols == null ? pkCols : colocationCols;
@@ -89,6 +96,10 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
         return zoneId;
     }
 
+    public int tableVersion() {
+        return tableVersion;
+    }
+
     public List<String> primaryKeyColumns() {
         return primaryKeyColumns;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
index 6f0226d80f..382dd8f72c 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
@@ -23,9 +23,8 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
 /**
  * Add column event parameters contains descriptors of added columns.
  */
-public class AddColumnEventParameters extends CatalogEventParameters {
+public class AddColumnEventParameters extends TableEventParameters {
 
-    private final int tableId;
     private final List<CatalogTableColumnDescriptor> columnDescriptors;
 
     /**
@@ -42,17 +41,11 @@ public class AddColumnEventParameters extends 
CatalogEventParameters {
             int tableId,
             List<CatalogTableColumnDescriptor> columnDescriptors
     ) {
-        super(causalityToken, catalogVersion);
+        super(causalityToken, catalogVersion, tableId);
 
-        this.tableId = tableId;
         this.columnDescriptors = columnDescriptors;
     }
 
-    /** Returns table id. */
-    public int tableId() {
-        return tableId;
-    }
-
     /**
      * Returns descriptors of columns to add.
      */
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
index fdd124198d..b73f057495 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
@@ -22,9 +22,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
 /**
  * Create table event parameters contains a column descriptor for the modified 
column.
  */
-public class AlterColumnEventParameters extends CatalogEventParameters {
-
-    private final int tableId;
+public class AlterColumnEventParameters extends TableEventParameters {
 
     private final CatalogTableColumnDescriptor columnDescriptor;
 
@@ -33,21 +31,15 @@ public class AlterColumnEventParameters extends 
CatalogEventParameters {
      *
      * @param causalityToken Causality token.
      * @param catalogVersion Catalog version.
-     * @param tableId Returns an id the table to be modified.
+     * @param tableId ID of the table to be modified.
      * @param columnDescriptor Descriptor for the column to be replaced.
      */
     public AlterColumnEventParameters(long causalityToken, int catalogVersion, 
int tableId, CatalogTableColumnDescriptor columnDescriptor) {
-        super(causalityToken, catalogVersion);
+        super(causalityToken, catalogVersion, tableId);
 
-        this.tableId = tableId;
         this.columnDescriptor = columnDescriptor;
     }
 
-    /** Returns an id of a modified table. */
-    public int tableId() {
-        return tableId;
-    }
-
     /** Returns column descriptor for the column to be replaced. */
     public CatalogTableColumnDescriptor columnDescriptor() {
         return columnDescriptor;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
index a8cd6f6df3..dada194914 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
@@ -22,7 +22,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 /**
  * Create table event parameters contains a table descriptor for newly created 
table.
  */
-public class CreateTableEventParameters extends CatalogEventParameters {
+public class CreateTableEventParameters extends TableEventParameters {
 
     private final CatalogTableDescriptor tableDescriptor;
 
@@ -34,7 +34,7 @@ public class CreateTableEventParameters extends 
CatalogEventParameters {
      * @param tableDescriptor Newly created table descriptor.
      */
     public CreateTableEventParameters(long causalityToken, int catalogVersion, 
CatalogTableDescriptor tableDescriptor) {
-        super(causalityToken, catalogVersion);
+        super(causalityToken, catalogVersion, tableDescriptor.id());
 
         this.tableDescriptor = tableDescriptor;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
index 46a17becb5..0b9d98a1ee 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
@@ -22,9 +22,8 @@ import java.util.Collection;
 /**
  * Drop column event parameters contains descriptors of dropped columns.
  */
-public class DropColumnEventParameters extends CatalogEventParameters {
+public class DropColumnEventParameters extends TableEventParameters {
 
-    private final int tableId;
     private final Collection<String> columns;
 
     /**
@@ -36,17 +35,11 @@ public class DropColumnEventParameters extends 
CatalogEventParameters {
      * @param columns Names of columns to drop.
      */
     public DropColumnEventParameters(long causalityToken, int catalogVersion, 
int tableId, Collection<String> columns) {
-        super(causalityToken, catalogVersion);
+        super(causalityToken, catalogVersion, tableId);
 
-        this.tableId = tableId;
         this.columns = columns;
     }
 
-    /** Returns table id. */
-    public int tableId() {
-        return tableId;
-    }
-
     /**
      * Returns names of columns to drop.
      */
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
index 241e849a2d..aab6e9b790 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
@@ -20,9 +20,7 @@ package org.apache.ignite.internal.catalog.events;
 /**
  * Drop table event parameters contains an id of dropped table.
  */
-public class DropTableEventParameters extends CatalogEventParameters {
-
-    private final int tableId;
+public class DropTableEventParameters extends TableEventParameters {
 
     /**
      * Constructor.
@@ -32,13 +30,6 @@ public class DropTableEventParameters extends 
CatalogEventParameters {
      * @param tableId An id of dropped table.
      */
     public DropTableEventParameters(long causalityToken, int catalogVersion, 
int tableId) {
-        super(causalityToken, catalogVersion);
-
-        this.tableId = tableId;
-    }
-
-    /** Returns an id of dropped table. */
-    public int tableId() {
-        return tableId;
+        super(causalityToken, catalogVersion, tableId);
     }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
similarity index 78%
copy from 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
copy to 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
index 241e849a2d..089a1be64e 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/TableEventParameters.java
@@ -18,10 +18,9 @@
 package org.apache.ignite.internal.catalog.events;
 
 /**
- * Drop table event parameters contains an id of dropped table.
+ * Event that is related to a table.
  */
-public class DropTableEventParameters extends CatalogEventParameters {
-
+public abstract class TableEventParameters extends CatalogEventParameters {
     private final int tableId;
 
     /**
@@ -29,15 +28,14 @@ public class DropTableEventParameters extends 
CatalogEventParameters {
      *
      * @param causalityToken Causality token.
      * @param catalogVersion Catalog version.
-     * @param tableId An id of dropped table.
+     * @param tableId ID of the table to which the event relates.
      */
-    public DropTableEventParameters(long causalityToken, int catalogVersion, 
int tableId) {
+    public TableEventParameters(long causalityToken, int catalogVersion, int 
tableId) {
         super(causalityToken, catalogVersion);
-
         this.tableId = tableId;
     }
 
-    /** Returns an id of dropped table. */
+    /** Returns an id of a modified table. */
     public int tableId() {
         return tableId;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index 910a363d81..5ef3f9965d 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -92,6 +92,7 @@ public class AlterColumnEntry implements UpdateEntry, 
Fireable {
                                                 table.id(),
                                                 table.name(),
                                                 table.zoneId(),
+                                                table.tableVersion() + 1,
                                                 table.columns().stream()
                                                         .map(source -> 
source.name().equals(column.name()) ? column : source)
                                                         .collect(toList()),
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index e29c250283..a3c5dcadd0 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -89,6 +89,7 @@ public class DropColumnsEntry implements UpdateEntry, 
Fireable {
                                         table.id(),
                                         table.name(),
                                         table.zoneId(),
+                                        table.tableVersion() + 1,
                                         table.columns().stream()
                                                 .filter(col -> 
!columns.contains(col.name()))
                                                 .collect(toList()),
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index 1e1aff017e..936e7c0bf5 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -89,6 +89,7 @@ public class NewColumnsEntry implements UpdateEntry, Fireable 
{
                                         table.id(),
                                         table.name(),
                                         table.zoneId(),
+                                        table.tableVersion() + 1,
                                         
CollectionUtils.concat(table.columns(), descriptors),
                                         table.primaryKeyColumns(),
                                         table.colocationColumns()) : table
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index a1ef062a31..1a2c34c4d6 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -1873,6 +1873,85 @@ public class CatalogManagerSelfTest extends 
BaseIgniteAbstractTest {
         assertThat(manager.indexes(2), hasItems(index(2, 
createPkIndexName(TABLE_NAME)), index(2, INDEX_NAME)));
     }
 
+    @Test
+    public void createTableProducesTableVersion1() {
+        createSomeTable(TABLE_NAME);
+
+        CatalogTableDescriptor table = manager.table(TABLE_NAME, 
Long.MAX_VALUE);
+
+        assertThat(table.tableVersion(), is(1));
+    }
+
+    @Test
+    public void addColumnIncrementsTableVersion() {
+        createSomeTable(TABLE_NAME);
+
+        CompletableFuture<Void> future = manager.addColumn(
+                AlterTableAddColumnParams.builder()
+                        .schemaName(SCHEMA_NAME)
+                        .tableName(TABLE_NAME)
+                        
.columns(List.of(ColumnParams.builder().name("val2").type(ColumnType.INT32).build()))
+                        .build()
+        );
+        assertThat(future, willCompleteSuccessfully());
+
+        CatalogTableDescriptor table = manager.table(TABLE_NAME, 
Long.MAX_VALUE);
+
+        assertThat(table.tableVersion(), is(2));
+    }
+
+    @Test
+    public void dropColumnIncrementsTableVersion() {
+        createSomeTable(TABLE_NAME);
+
+        CompletableFuture<Void> future = manager.dropColumn(
+                AlterTableDropColumnParams.builder()
+                        .schemaName(SCHEMA_NAME)
+                        .tableName(TABLE_NAME)
+                        .columns(Set.of("val1"))
+                        .build()
+        );
+        assertThat(future, willCompleteSuccessfully());
+
+        CatalogTableDescriptor table = manager.table(TABLE_NAME, 
Long.MAX_VALUE);
+
+        assertThat(table.tableVersion(), is(2));
+    }
+
+    @Test
+    public void alterColumnIncrementsTableVersion() {
+        createSomeTable(TABLE_NAME);
+
+        CompletableFuture<Void> future = manager.alterColumn(
+                AlterColumnParams.builder()
+                        .schemaName(SCHEMA_NAME)
+                        .tableName(TABLE_NAME)
+                        .columnName("val1")
+                        .type(ColumnType.INT64)
+                        .build()
+        );
+        assertThat(future, willCompleteSuccessfully());
+
+        CatalogTableDescriptor table = manager.table(TABLE_NAME, 
Long.MAX_VALUE);
+
+        assertThat(table.tableVersion(), is(2));
+    }
+
+    private void createSomeTable(String tableName) {
+        CreateTableParams params = CreateTableParams.builder()
+                .schemaName(SCHEMA_NAME)
+                .tableName(tableName)
+                .zone(ZONE_NAME)
+                .columns(List.of(
+                        
ColumnParams.builder().name("key1").type(ColumnType.INT32).build(),
+                        
ColumnParams.builder().name("val1").type(ColumnType.INT32).build()
+                ))
+                .primaryKeyColumns(List.of("key1"))
+                .build();
+
+        assertThat(manager.createTable(params), willCompleteSuccessfully());
+    }
+
     private CompletableFuture<Void> changeColumn(
             String tab,
             String col,
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 6382aba231..75adff45d3 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -461,6 +462,8 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
                             }));
 
                             tableChange.changePrimaryKey(primaryKeyChange -> 
primaryKeyChange.changeColumns("k1"));
+
+                            ((ExtendedTableChange) 
tableChange).changeSchemaId(1);
                         }));
             });
         });
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index 93800f158e..1187527d71 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -98,6 +98,7 @@ public class RebalanceUtilUpdateAssignmentsTest extends 
IgniteAbstractTest {
             1,
             "table1",
             0,
+            1,
             List.of(new CatalogTableColumnDescriptor("k1", ColumnType.INT32, 
false, 0, 0, 0, null)),
             List.of("k1"),
             null
diff --git a/modules/schema/build.gradle b/modules/schema/build.gradle
index 92e8100a45..0fd37e6cdd 100644
--- a/modules/schema/build.gradle
+++ b/modules/schema/build.gradle
@@ -41,11 +41,15 @@ dependencies {
     testAnnotationProcessor libs.jmh.annotation.processor
     testImplementation project(':ignite-configuration')
     testImplementation project(':ignite-core')
+    testImplementation project(':ignite-vault')
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-configuration')))
+    testImplementation(testFixtures(project(':ignite-metastorage')))
+    testImplementation(testFixtures(project(':ignite-vault')))
     testImplementation libs.hamcrest.core
     testImplementation libs.hamcrest.optional
     testImplementation libs.mockito.core
+    testImplementation libs.mockito.junit
     testImplementation libs.jmh.core
     testImplementation libs.javax.annotations
 
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
index 8acc7aa194..101cddf769 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogDescriptorUtils.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.schema.configuration.ColumnTypeView;
 import org.apache.ignite.internal.schema.configuration.ColumnView;
 import 
org.apache.ignite.internal.schema.configuration.ConfigurationToSchemaDescriptorConverter;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
 import org.apache.ignite.internal.schema.configuration.PrimaryKeyView;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import 
org.apache.ignite.internal.schema.configuration.ValueSerializationHelper;
@@ -65,6 +66,7 @@ public class CatalogDescriptorUtils {
                 config.id(),
                 config.name(),
                 config.zoneId(),
+                ((ExtendedTableView) config).schemaId(),
                 
config.columns().stream().map(CatalogDescriptorUtils::toTableColumnDescriptor).collect(toList()),
                 List.of(primaryKeyConfig.columns()),
                 List.of(primaryKeyConfig.colocationColumns())
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
similarity index 56%
copy from 
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
copy to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
index 51f0d71c8a..becabab496 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
@@ -17,42 +17,45 @@
 
 package org.apache.ignite.internal.schema;
 
+import static java.util.Collections.emptyList;
+import static java.util.Objects.requireNonNullElse;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
-import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Flow.Subscriber;
-import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
-import org.apache.ignite.configuration.NamedListView;
-import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
+import org.apache.ignite.internal.catalog.events.TableEventParameters;
 import org.apache.ignite.internal.causality.IncrementalVersionedValue;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.manager.Producer;
-import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.schema.configuration.ColumnView;
-import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.event.SchemaEvent;
 import org.apache.ignite.internal.schema.event.SchemaEventParameters;
 import 
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
 import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -61,16 +64,14 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * The class services a management of table schemas.
+ * This class services management of table schemas.
  */
-public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters> implements IgniteComponent {
-    private static final IgniteLogger LOGGER = 
Loggers.forClass(SchemaManager.class);
-
-    /** Initial version for schemas. */
-    public static final int INITIAL_SCHEMA_VERSION = 1;
+public class CatalogSchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters> implements IgniteComponent {
+    private static final IgniteLogger LOGGER = 
Loggers.forClass(CatalogSchemaManager.class);
 
     /** Schema history key predicate part. */
     private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
+    private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX = 
".sch-hist-latest";
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -78,61 +79,93 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Prevents double stopping of the component. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    /** Tables configuration. */
-    private final TablesConfiguration tablesCfg;
+    private final CatalogService catalogService;
 
-    /** Versioned store for tables by name. */
+    /** Versioned store for tables by ID. */
     private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>> 
registriesVv;
 
     /** Meta storage manager. */
     private final MetaStorageManager metastorageMgr;
 
     /** Constructor. */
-    public SchemaManager(
+    public CatalogSchemaManager(
             Consumer<LongFunction<CompletableFuture<?>>> registry,
-            TablesConfiguration tablesCfg,
+            CatalogService catalogService,
             MetaStorageManager metastorageMgr
     ) {
         this.registriesVv = new IncrementalVersionedValue<>(registry, 
HashMap::new);
-        this.tablesCfg = tablesCfg;
+        this.catalogService = catalogService;
         this.metastorageMgr = metastorageMgr;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void start() {
-        tablesCfg.tables().any().columns().listen(this::onSchemaChange);
+        catalogService.listen(CatalogEvent.TABLE_CREATE, this::onTableCreated);
+        catalogService.listen(CatalogEvent.TABLE_ALTER, this::onTableAltered);
+
+        registerExistingTables();
     }
 
-    /**
-     * Listener of schema configuration changes.
-     *
-     * @param ctx Configuration context.
-     * @return A future.
-     */
-    private CompletableFuture<?> 
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
-        if (!busyLock.enterBusy()) {
-            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
+    private void registerExistingTables() {
+        // TODO: IGNITE-20051 - add proper recovery (consider tables that are 
removed now; take token and catalog version
+        // exactly matching the tables).
+
+        long causalityToken = metastorageMgr.appliedRevision();
+        int catalogVersion = catalogService.latestCatalogVersion();
+
+        for (CatalogTableDescriptor tableDescriptor : 
catalogService.tables(catalogVersion)) {
+            onTableCreated(new CreateTableEventParameters(causalityToken, 
catalogVersion, tableDescriptor), null);
         }
 
-        try {
-            ExtendedTableView tblCfg = ctx.newValue(ExtendedTableView.class);
+        registriesVv.complete(causalityToken);
+    }
+
+    private CompletableFuture<Boolean> onTableCreated(CatalogEventParameters 
event, @Nullable Throwable ex) {
+        if (ex != null) {
+            return failedFuture(ex);
+        }
+
+        CreateTableEventParameters creationEvent = 
(CreateTableEventParameters) event;
+
+        return onTableCreatedOrAltered(creationEvent.tableDescriptor(), 
creationEvent.causalityToken());
+    }
+
+    private CompletableFuture<Boolean> onTableAltered(CatalogEventParameters 
event, @Nullable Throwable ex) {
+        if (ex != null) {
+            return failedFuture(ex);
+        }
+
+        assert event instanceof TableEventParameters;
+
+        TableEventParameters tableEvent = ((TableEventParameters) event);
+
+        CatalogTableDescriptor tableDescriptor = 
catalogService.table(tableEvent.tableId(), tableEvent.catalogVersion());
+
+        assert tableDescriptor != null;
 
-            int newSchemaVersion = tblCfg.schemaId();
+        return onTableCreatedOrAltered(tableDescriptor, 
event.causalityToken());
+    }
 
-            int tblId = tblCfg.id();
+    private CompletableFuture<Boolean> 
onTableCreatedOrAltered(CatalogTableDescriptor tableDescriptor, long 
causalityToken) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
 
-            if (searchSchemaByVersion(tblId, newSchemaVersion) != null) {
+        try {
+            int tableId = tableDescriptor.id();
+            int newSchemaVersion = tableDescriptor.tableVersion();
+
+            if (searchSchemaByVersion(tableId, newSchemaVersion) != null) {
                 return completedFuture(null);
             }
 
-            SchemaDescriptor newSchema = 
SchemaUtils.prepareSchemaDescriptor(newSchemaVersion, tblCfg);
+            SchemaDescriptor newSchema = 
SchemaUtils.prepareSchemaDescriptor(tableDescriptor);
 
-            // This is intentionally a blocking call to enforce configuration 
listener execution order. Unfortunately it is not possible
+            // This is intentionally a blocking call to enforce catalog 
listener execution order. Unfortunately it is not possible
             // to execute this method asynchronously, because the schema 
descriptor is needed to fire the CREATE event as a synchronous part
-            // of the configuration listener.
+            // of the catalog listener.
             try {
-                setColumnMapping(newSchema, tblId);
+                setColumnMapping(newSchema, tableId);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
 
@@ -141,10 +174,8 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
                 return failedFuture(e);
             }
 
-            long causalityToken = ctx.storageRevision();
-
             // Fire event early, because dependent listeners have to register 
VersionedValues' update futures
-            var eventParams = new SchemaEventParameters(causalityToken, tblId, 
newSchema);
+            var eventParams = new SchemaEventParameters(causalityToken, 
tableId, newSchema);
 
             fireEvent(SchemaEvent.CREATE, eventParams)
                     .whenComplete((v, e) -> {
@@ -156,37 +187,20 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
             return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
                 if (e != null) {
                     return failedFuture(new 
IgniteInternalException(IgniteStringFormatter.format(
-                            "Cannot create a schema for the table [tblId={}, 
ver={}]", tblId, newSchemaVersion), e)
+                            "Cannot create a schema for the table [tblId={}, 
ver={}]", tableId, newSchemaVersion), e)
                     );
                 }
 
-                return saveSchemaDescriptor(tblId, newSchema)
-                        .thenApply(t -> registerSchema(tblId, newSchema, 
registries));
-            }));
+                return saveSchemaDescriptor(tableId, newSchema)
+                        .thenApply(t -> registerSchema(registries, tableId, 
newSchema));
+            })).thenApply(ignored -> false);
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    private Map<Integer, SchemaRegistryImpl> registerSchema(int tblId, 
SchemaDescriptor newSchema,
-            Map<Integer, SchemaRegistryImpl> registries) {
-        SchemaRegistryImpl reg = registries.get(tblId);
-
-        if (reg == null) {
-            Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
-
-            copy.put(tblId, createSchemaRegistry(tblId, newSchema));
-
-            return copy;
-        } else {
-            reg.onSchemaRegistered(newSchema);
-
-            return registries;
-        }
-    }
-
     private void setColumnMapping(SchemaDescriptor schema, int tableId) throws 
ExecutionException, InterruptedException {
-        if (schema.version() == INITIAL_SCHEMA_VERSION) {
+        if (schema.version() == CatalogTableDescriptor.INITIAL_TABLE_VERSION) {
             return;
         }
 
@@ -197,32 +211,79 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
         if (prevSchema == null) {
             // This is intentionally a blocking call, because this method is 
used in a synchronous part of the configuration listener.
             // See the call site for more details.
-            prevSchema = schemaByVersion(tableId, prevVersion).get();
+            prevSchema = loadSchemaDescriptor(tableId, prevVersion).get();
         }
 
         schema.columnMapping(SchemaUtils.columnMapper(prevSchema, schema));
     }
 
     /**
-     * Gets a schema descriptor from the configuration storage.
+     * Loads the table schema descriptor by version from Metastore.
      *
-     * @param tableId Table ID.
-     * @param schemaVer Schema version.
-     * @return Schema descriptor.
+     * @param tblId Table id.
+     * @param ver Schema version.
+     * @return Schema representation if schema found, {@code null} otherwise.
      */
-    private CompletableFuture<SchemaDescriptor> loadSchemaDescriptor(int 
tableId, int schemaVer) {
-        CompletableFuture<Entry> ent = 
metastorageMgr.get(schemaWithVerHistKey(tableId, schemaVer));
+    private CompletableFuture<SchemaDescriptor> loadSchemaDescriptor(int 
tblId, int ver) {
+        return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
+                .thenApply(entry -> {
+                    byte[] value = entry.value();
 
-        return ent.thenApply(e -> 
SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
+                    assert value != null;
+
+                    return SchemaSerializerImpl.INSTANCE.deserialize(value);
+                });
     }
 
-    /** Saves a schema descriptor to the configuration storage. */
-    private CompletableFuture<Boolean> saveSchemaDescriptor(int tableId, 
SchemaDescriptor schema) {
-        ByteArray key = schemaWithVerHistKey(tableId, schema.version());
+    /**
+     * Saves a schema in the MetaStorage.
+     *
+     * @param tableId Table id.
+     * @param schema Schema descriptor.
+     * @return Future that will be completed when the schema gets saved.
+     */
+    private CompletableFuture<Void> saveSchemaDescriptor(int tableId, 
SchemaDescriptor schema) {
+        ByteArray schemaKey = schemaWithVerHistKey(tableId, schema.version());
+        ByteArray latestSchemaVersionKey = latestSchemaVersionKey(tableId);
 
         byte[] serializedSchema = 
SchemaSerializerImpl.INSTANCE.serialize(schema);
 
-        return metastorageMgr.invoke(notExists(key), put(key, 
serializedSchema), noop());
+        return metastorageMgr.invoke(
+                notExists(schemaKey),
+                List.of(
+                        put(schemaKey, serializedSchema),
+                        put(latestSchemaVersionKey, 
intToBytes(schema.version()))
+                ),
+                emptyList()
+        ).thenApply(unused -> null);
+    }
+
+    /**
+     * Registers the new schema in the registries.
+     *
+     * @param registries Registries before registering this schema.
+     * @param tableId ID of the table to which the schema belongs.
+     * @param schema The schema to register.
+     * @return Registries after registering this schema.
+     */
+    private Map<Integer, SchemaRegistryImpl> registerSchema(
+            Map<Integer, SchemaRegistryImpl> registries,
+            int tableId,
+            SchemaDescriptor schema
+    ) {
+        SchemaRegistryImpl reg = registries.get(tableId);
+
+        if (reg == null) {
+            Map<Integer, SchemaRegistryImpl> copy = new HashMap<>(registries);
+
+            copy.put(tableId, createSchemaRegistry(tableId, schema));
+
+            return copy;
+        } else {
+            reg.onSchemaRegistered(schema);
+
+            return registries;
+        }
     }
 
     /**
@@ -235,7 +296,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     private SchemaRegistryImpl createSchemaRegistry(int tableId, 
SchemaDescriptor initialSchema) {
         return new SchemaRegistryImpl(
                 ver -> inBusyLock(busyLock, () -> 
loadSchemaDescriptor(tableId, ver)),
-                () -> inBusyLock(busyLock, () -> latestSchemaVersion(tableId)),
+                () -> inBusyLock(busyLock, () -> 
latestSchemaVersionOrDefault(tableId)),
                 initialSchema
         );
     }
@@ -290,12 +351,18 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     }
 
     /**
-     * Drop schema registry for the given table id.
+     * Drops schema registry for the given table id (along with the 
corresponding schemas).
      *
      * @param causalityToken Causality token.
      * @param tableId Table id.
      */
     public CompletableFuture<?> dropRegistry(long causalityToken, int tableId) 
{
+        return removeRegistry(causalityToken, tableId).thenCompose(unused -> {
+            return destroySchemas(tableId);
+        });
+    }
+
+    private CompletableFuture<?> removeRegistry(long causalityToken, int 
tableId) {
         return registriesVv.update(causalityToken, (registries, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
                 return failedFuture(new IgniteInternalException(
@@ -310,7 +377,23 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
         }));
     }
 
-    /** {@inheritDoc} */
+    private CompletableFuture<?> destroySchemas(int tableId) {
+        return latestSchemaVersion(tableId)
+                .thenCompose(latestVersion -> {
+                    if (latestVersion == null) {
+                        // Nothing to remove.
+                        return completedFuture(null);
+                    }
+
+                    Set<ByteArray> keysToRemove = 
IntStream.rangeClosed(CatalogTableDescriptor.INITIAL_TABLE_VERSION, 
latestVersion)
+                            .mapToObj(version -> schemaWithVerHistKey(tableId, 
version))
+                            .collect(toSet());
+                    keysToRemove.add(latestSchemaVersionKey(tableId));
+
+                    return metastorageMgr.removeAll(keysToRemove);
+                });
+    }
+
     @Override
     public void stop() throws Exception {
         if (!stopGuard.compareAndSet(false, true)) {
@@ -318,77 +401,36 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
         }
 
         busyLock.block();
-
-        IgniteUtils.closeAllManually(registriesVv.latest().values().stream());
     }
 
     /**
-     * Gets the latest version of the table schema which available in 
Metastore.
+     * Gets the latest version of the table schema which is available in 
Metastore or the default (1) if nothing is available.
      *
-     * @param tblId Table id.
+     * @param tableId Table id.
      * @return The latest schema version.
      */
-    private CompletableFuture<Integer> latestSchemaVersion(int tblId) {
-        var latestVersionFuture = new CompletableFuture<Integer>();
-
-        metastorageMgr.prefix(schemaHistPrefix(tblId)).subscribe(new 
Subscriber<>() {
-            private int lastVer = INITIAL_SCHEMA_VERSION;
-
-            @Override
-            public void onSubscribe(Subscription subscription) {
-                // Request unlimited demand.
-                subscription.request(Long.MAX_VALUE);
-            }
-
-            @Override
-            public void onNext(Entry item) {
-                String key = new String(item.key(), StandardCharsets.UTF_8);
-                int descVer = extractVerFromSchemaKey(key);
-
-                if (descVer > lastVer) {
-                    lastVer = descVer;
-                }
-            }
-
-            @Override
-            public void onError(Throwable throwable) {
-                latestVersionFuture.completeExceptionally(throwable);
-            }
-
-            @Override
-            public void onComplete() {
-                latestVersionFuture.complete(lastVer);
-            }
-        });
-
-        return latestVersionFuture;
+    private CompletableFuture<Integer> latestSchemaVersionOrDefault(int 
tableId) {
+        return latestSchemaVersion(tableId)
+                .thenApply(versionOrNull -> requireNonNullElse(versionOrNull, 
CatalogTableDescriptor.INITIAL_TABLE_VERSION));
     }
 
     /**
-     * Gets the defined version of the table schema which available in 
Metastore.
+     * Gets the latest version of the table schema which is available in 
Metastore or {@code null} if nothing is available.
      *
-     * @param tblId Table id.
-     * @return Schema representation if schema found, {@code null} otherwise.
+     * @param tableId Table id.
+     * @return The latest schema version or {@code null} if nothing is 
available.
      */
-    private CompletableFuture<SchemaDescriptor> schemaByVersion(int tblId, int 
ver) {
-        return metastorageMgr.get(schemaWithVerHistKey(tblId, ver))
+    private CompletableFuture<Integer> latestSchemaVersion(int tableId) {
+        return metastorageMgr.get(latestSchemaVersionKey(tableId))
                 .thenApply(entry -> {
-                    byte[] value = entry.value();
-
-                    assert value != null;
-
-                    return SchemaSerializerImpl.INSTANCE.deserialize(value);
+                    if (entry == null || entry.value() == null) {
+                        return null;
+                    } else {
+                        return ByteUtils.bytesToInt(entry.value());
+                    }
                 });
     }
 
-    private int extractVerFromSchemaKey(String key) {
-        int pos = key.lastIndexOf('.');
-        assert pos != -1 : "Unexpected key: " + key;
-
-        key = key.substring(pos + 1);
-        return Integer.parseInt(key);
-    }
-
     /**
      * Forms schema history key.
      *
@@ -400,13 +442,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
         return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
     }
 
-    /**
-     * Forms schema history predicate.
-     *
-     * @param tblId Table id.
-     * @return {@link ByteArray} representation.
-     */
-    private static ByteArray schemaHistPrefix(int tblId) {
-        return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX);
+    private static ByteArray latestSchemaVersionKey(int tableId) {
+        return ByteArray.fromString(tableId + 
LATEST_SCHEMA_VERSION_STORE_SUFFIX);
     }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 51f0d71c8a..e23b1ad562 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -81,7 +81,7 @@ public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters>
     /** Tables configuration. */
     private final TablesConfiguration tablesCfg;
 
-    /** Versioned store for tables by name. */
+    /** Versioned store for tables by ID. */
     private final IncrementalVersionedValue<Map<Integer, SchemaRegistryImpl>> 
registriesVv;
 
     /** Meta storage manager. */
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
index aa0bbc51b3..a420ec4cbd 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.schema;
 
 import java.util.Arrays;
 import java.util.Comparator;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import 
org.apache.ignite.internal.schema.catalog.CatalogToSchemaDescriptorConverter;
 import 
org.apache.ignite.internal.schema.configuration.ConfigurationToSchemaDescriptorConverter;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.mapping.ColumnMapper;
@@ -39,6 +41,16 @@ public class SchemaUtils {
         return ConfigurationToSchemaDescriptorConverter.convert(schemaVer, 
tableView);
     }
 
+    /**
+     * Creates schema descriptor for the table with specified descriptor.
+     *
+     * @param tableDescriptor Table descriptor.
+     * @return Schema descriptor.
+     */
+    public static SchemaDescriptor 
prepareSchemaDescriptor(CatalogTableDescriptor tableDescriptor) {
+        return CatalogToSchemaDescriptorConverter.convert(tableDescriptor);
+    }
+
     /**
      * Prepares column mapper.
      *
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
new file mode 100644
index 0000000000..d6a26fedf6
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverter.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.catalog;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.commands.DefaultValue.ConstantValue;
+import org.apache.ignite.internal.catalog.commands.DefaultValue.FunctionCall;
+import org.apache.ignite.internal.catalog.commands.DefaultValue.Type;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.DefaultValueGenerator;
+import org.apache.ignite.internal.schema.DefaultValueProvider;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Utility class to convert table descriptor from the Catalog as well as all 
related objects to a {@link SchemaDescriptor} domain.
+ */
+public final class CatalogToSchemaDescriptorConverter {
+    private static final Map<String, NativeType> FIX_SIZED_TYPES;
+
+    static {
+        List<NativeType> types = 
IgniteUtils.collectStaticFields(NativeTypes.class, NativeType.class);
+
+        Map<String, NativeType> tmp = new HashMap<>(types.size(), 1.0f);
+
+        for (NativeType type : types) {
+            if (!type.spec().fixedLength()) {
+                continue;
+            }
+
+            tmp.put(type.spec().name(), type);
+        }
+
+        FIX_SIZED_TYPES = Map.copyOf(tmp);
+    }
+
+    /**
+     * Converts type of a column descriptor to a {@link NativeType}.
+     *
+     * @param columnDescriptor Descriptor to convert.
+     * @return A {@link NativeType} object represented by given descriptor.
+     */
+    public static NativeType convertType(CatalogTableColumnDescriptor 
columnDescriptor) {
+        String typeName = columnDescriptor.type().name();
+        NativeType res = FIX_SIZED_TYPES.get(typeName);
+
+        if (res != null) {
+            return res;
+        }
+
+        switch (typeName) {
+            case "BITMASK":
+                int bitmaskLen = columnDescriptor.length();
+
+                return NativeTypes.bitmaskOf(bitmaskLen);
+
+            case "STRING":
+                int strLen = columnDescriptor.length();
+
+                return NativeTypes.stringOf(strLen);
+
+            case "BYTES":
+            case "BYTE_ARRAY":
+                int blobLen = columnDescriptor.length();
+
+                return NativeTypes.blobOf(blobLen);
+
+            case "DECIMAL":
+                int prec = columnDescriptor.precision();
+                int scale = columnDescriptor.scale();
+
+                return NativeTypes.decimalOf(prec, scale);
+
+            case "NUMBER":
+                return NativeTypes.numberOf(columnDescriptor.precision());
+
+            case "TIME":
+                return NativeTypes.time(columnDescriptor.precision());
+
+            case "DATETIME":
+                return NativeTypes.datetime(columnDescriptor.precision());
+
+            case "TIMESTAMP":
+                return NativeTypes.timestamp(columnDescriptor.precision());
+
+            default:
+                throw new IllegalArgumentException("Unknown type " + typeName);
+        }
+    }
+
+    /**
+     * Converts given column view to a {@link Column}.
+     *
+     * @param columnOrder Number of the current column.
+     * @param columnDescriptor Descriptor to convert.
+     * @return A {@link Column} object representing the table column 
descriptor.
+     */
+    public static Column convert(int columnOrder, CatalogTableColumnDescriptor 
columnDescriptor) {
+        NativeType type = convertType(columnDescriptor);
+
+        DefaultValue defaultValue = columnDescriptor.defaultValue();
+
+        DefaultValueProvider defaultValueProvider;
+
+        if (defaultValue == null) {
+            defaultValueProvider = DefaultValueProvider.NULL_PROVIDER;
+        } else if (defaultValue.type() == Type.CONSTANT) {
+            ConstantValue constantValue = (ConstantValue) defaultValue;
+            defaultValueProvider = 
DefaultValueProvider.constantProvider(constantValue.value());
+        } else if (defaultValue.type() == Type.FUNCTION_CALL) {
+            FunctionCall functionCall = (FunctionCall) defaultValue;
+            defaultValueProvider = DefaultValueProvider.forValueGenerator(
+                    DefaultValueGenerator.valueOf(functionCall.functionName())
+            );
+        } else {
+            throw new IllegalStateException("Unknown value supplier class " + 
defaultValue.getClass().getName());
+        }
+
+        return new Column(columnOrder, columnDescriptor.name(), type, 
columnDescriptor.nullable(), defaultValueProvider);
+    }
+
+    /**
+     * Converts given table descriptor to a {@link SchemaDescriptor}.
+     *
+     * @param tableDescriptor Descriptor to convert.
+     * @return A {@link SchemaDescriptor} object representing the table 
descriptor.
+     */
+    public static SchemaDescriptor convert(CatalogTableDescriptor 
tableDescriptor) {
+        Set<String> keyColumnsNames = 
Set.copyOf(tableDescriptor.primaryKeyColumns());
+
+        List<Column> keyCols = new ArrayList<>(keyColumnsNames.size());
+        List<Column> valCols = new 
ArrayList<>(tableDescriptor.columns().size() - keyColumnsNames.size());
+
+        int idx = 0;
+
+        for (CatalogTableColumnDescriptor column : tableDescriptor.columns()) {
+            if (keyColumnsNames.contains(column.name())) {
+                keyCols.add(convert(idx, column));
+            } else {
+                valCols.add(convert(idx, column));
+            }
+
+            idx++;
+        }
+
+        return new SchemaDescriptor(
+                tableDescriptor.tableVersion(),
+                keyCols.toArray(Column[]::new),
+                tableDescriptor.colocationColumns().toArray(String[]::new),
+                valCols.toArray(Column[]::new)
+        );
+    }
+
+    private CatalogToSchemaDescriptorConverter() { }
+}
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractSchemaConverterTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/AbstractSchemaConverterTest.java
similarity index 95%
rename from 
modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractSchemaConverterTest.java
rename to 
modules/schema/src/test/java/org/apache/ignite/internal/schema/AbstractSchemaConverterTest.java
index b7c7a5154d..81b7a67055 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/AbstractSchemaConverterTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/AbstractSchemaConverterTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.schema.configuration;
+package org.apache.ignite.internal.schema;
 
 import static java.math.RoundingMode.HALF_UP;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -35,10 +35,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.internal.schema.DecimalNativeType;
-import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.schema.NativeTypeSpec;
-import org.apache.ignite.internal.schema.NativeTypes;
 import 
org.apache.ignite.internal.schema.testutils.definition.ColumnType.DecimalColumnType;
 import org.apache.ignite.internal.util.ArrayUtils;
 
@@ -184,8 +180,8 @@ public class AbstractSchemaConverterTest {
      * Class represents a default value of particular type.
      */
     protected static class DefaultValueArg {
-        final NativeType type;
-        final Object defaultValue;
+        public final NativeType type;
+        public final Object defaultValue;
 
         /**
          * Constructor.
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
new file mode 100644
index 0000000000..f1404eb2b8
--- /dev/null
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.LongFunction;
+import org.apache.ignite.internal.catalog.CatalogService;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
+import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import 
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
+import org.apache.ignite.internal.util.subscription.ListAccumulator;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class CatalogSchemaManagerTest {
+    private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
+    private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX = 
".sch-hist-latest";
+
+    private static final int TABLE_ID = 3;
+    private static final String TABLE_NAME = "t";
+
+    private static final long CAUSALITY_TOKEN_1 = 42;
+    private static final long CAUSALITY_TOKEN_2 = 45;
+
+    private static final int CATALOG_VERSION_1 = 10;
+    private static final int CATALOG_VERSION_2 = 11;
+
+    private final AtomicReference<LongFunction<CompletableFuture<?>>> 
onMetastoreRevisionCompleteHolder = new AtomicReference<>();
+
+    private final Consumer<LongFunction<CompletableFuture<?>>> registry = 
onMetastoreRevisionCompleteHolder::set;
+
+    @Mock
+    private CatalogService catalogService;
+
+    private VaultManager vaultManager;
+
+    private MetaStorageManager metaStorageManager;
+
+    private final SimpleInMemoryKeyValueStorage metaStorageKvStorage = new 
SimpleInMemoryKeyValueStorage("test");
+
+    private CatalogSchemaManager schemaManager;
+
+    private EventListener<CatalogEventParameters> tableCreatedListener;
+    private EventListener<CatalogEventParameters> tableAlteredListener;
+
+    private final Exception cause = new Exception("Oops");
+
+    @BeforeEach
+    void setUp() {
+        vaultManager = new VaultManager(new InMemoryVaultService());
+        vaultManager.start();
+
+        metaStorageManager = 
spy(StandaloneMetaStorageManager.create(vaultManager, metaStorageKvStorage));
+        metaStorageManager.start();
+
+        doAnswer(invocation -> {
+            tableCreatedListener = invocation.getArgument(1);
+            return null;
+        }).when(catalogService).listen(eq(CatalogEvent.TABLE_CREATE), any());
+
+        doAnswer(invocation -> {
+            tableAlteredListener = invocation.getArgument(1);
+            return null;
+        }).when(catalogService).listen(eq(CatalogEvent.TABLE_ALTER), any());
+
+        schemaManager = new CatalogSchemaManager(registry, catalogService, 
metaStorageManager);
+        schemaManager.start();
+
+        assertThat("Watches were not deployed", 
metaStorageManager.deployWatches(), willCompleteSuccessfully());
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        schemaManager.stop();
+        metaStorageManager.stop();
+        vaultManager.stop();
+    }
+
+    @Test
+    void savesSchemaOnTableCreation() {
+        createSomeTable();
+
+        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(1);
+
+        assertThat(schemaDescriptor.version(), is(1));
+        assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1"));
+
+        Column k1 = schemaDescriptor.column("k1");
+        assertThat(k1, is(notNullValue()));
+
+        assertThat(k1.name(), is("k1"));
+        assertThat(k1.type().spec(), is(NativeTypeSpec.INT16));
+
+        Column v1 = schemaDescriptor.column("v1");
+        assertThat(v1, is(notNullValue()));
+
+        assertThat(v1.name(), is("v1"));
+        assertThat(v1.type().spec(), is(NativeTypeSpec.INT32));
+    }
+
+    private void createSomeTable() {
+        List<CatalogTableColumnDescriptor> columns = List.of(
+                new CatalogTableColumnDescriptor("k1", ColumnType.INT16, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("k2", ColumnType.STRING, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("v1", ColumnType.INT32, 
false, 0, 0, 0, null)
+        );
+        CatalogTableDescriptor tableDescriptor = new 
CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 1, columns, List.of("k1", 
"k2"), null);
+
+        CompletableFuture<Boolean> future = tableCreatedListener()
+                .notify(new CreateTableEventParameters(CAUSALITY_TOKEN_1, 
CATALOG_VERSION_1, tableDescriptor), null);
+
+        assertThat(future, willBe(false));
+
+        completeCausalityToken(CAUSALITY_TOKEN_1);
+    }
+
+    private EventListener<CatalogEventParameters> tableCreatedListener() {
+        return Objects.requireNonNull(tableCreatedListener, 
"tableCreatedListener is not registered with CatalogService");
+    }
+
+    private EventListener<CatalogEventParameters> tableAlteredListener() {
+        return Objects.requireNonNull(tableAlteredListener, 
"tableAlteredListener is not registered with CatalogService");
+    }
+
+    private SchemaDescriptor getSchemaDescriptor(int schemaVersion) {
+        Entry entry = metaStorageKvStorage.get(schemaWithVerHistKey(TABLE_ID, 
schemaVersion).bytes());
+        assertThat(entry, is(notNullValue()));
+
+        byte[] value = entry.value();
+        assertThat(value, is(notNullValue()));
+
+        return SchemaSerializerImpl.INSTANCE.deserialize(value);
+    }
+
+    private static ByteArray schemaWithVerHistKey(int tblId, int ver) {
+        return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
+    }
+
+    @Test
+    void savesSchemaOnColumnAddition() {
+        createSomeTable();
+
+        when(catalogService.table(TABLE_ID, 
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAddition());
+
+        AddColumnEventParameters event = new AddColumnEventParameters(
+                CAUSALITY_TOKEN_2,
+                CATALOG_VERSION_2,
+                TABLE_ID,
+                List.of(new CatalogTableColumnDescriptor("v2", 
ColumnType.STRING, false, 0, 0, 0, null))
+        );
+
+        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(event, null);
+
+        assertThat(future, willBe(false));
+
+        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
+
+        assertThat(schemaDescriptor.version(), is(2));
+        assertThat(schemaDescriptor.columnNames(), contains("k1", "k2", "v1", 
"v2"));
+
+        Column v2 = schemaDescriptor.column("v2");
+        assertThat(v2, is(notNullValue()));
+
+        assertThat(v2.name(), is("v2"));
+        assertThat(v2.type().spec(), is(NativeTypeSpec.STRING));
+    }
+
+    private static CatalogTableDescriptor tableDescriptorAfterColumnAddition() 
{
+        List<CatalogTableColumnDescriptor> columns = List.of(
+                new CatalogTableColumnDescriptor("k1", ColumnType.INT16, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("k2", ColumnType.STRING, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("v1", ColumnType.INT32, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("v2", ColumnType.STRING, 
false, 0, 0, 0, null)
+        );
+
+        return new CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 2, columns, 
List.of("k1", "k2"), null);
+    }
+
+    private void completeCausalityToken(long causalityToken) {
+        
assertThat(onMetastoreRevisionCompleteHolder.get().apply(causalityToken), 
willCompleteSuccessfully());
+    }
+
+    @Test
+    void savesSchemaOnColumnRemoval() {
+        createSomeTable();
+
+        when(catalogService.table(TABLE_ID, 
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnRemoval());
+
+        DropColumnEventParameters event = new DropColumnEventParameters(
+                CAUSALITY_TOKEN_2,
+                CATALOG_VERSION_2,
+                TABLE_ID,
+                List.of("v1")
+        );
+
+        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(event, null);
+
+        assertThat(future, willBe(false));
+
+        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
+
+        assertThat(schemaDescriptor.version(), is(2));
+        assertThat(schemaDescriptor.columnNames(), contains("k1", "k2"));
+    }
+
+    private static CatalogTableDescriptor tableDescriptorAfterColumnRemoval() {
+        List<CatalogTableColumnDescriptor> columns = List.of(
+                new CatalogTableColumnDescriptor("k1", ColumnType.INT16, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("k2", ColumnType.STRING, 
false, 0, 0, 0, null)
+        );
+
+        return new CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 2, columns, 
List.of("k1", "k2"), null);
+    }
+
+    @Test
+    void savesSchemaOnColumnAlteration() {
+        createSomeTable();
+
+        when(catalogService.table(TABLE_ID, 
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAlteration());
+
+        AlterColumnEventParameters event = new AlterColumnEventParameters(
+                CAUSALITY_TOKEN_2,
+                CATALOG_VERSION_2,
+                TABLE_ID,
+                new CatalogTableColumnDescriptor("v1", ColumnType.INT64, 
false, 0, 0, 0, null)
+        );
+
+        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(event, null);
+
+        assertThat(future, willBe(false));
+
+        SchemaDescriptor schemaDescriptor = getSchemaDescriptor(2);
+
+        assertThat(schemaDescriptor.version(), is(2));
+
+        Column v1 = schemaDescriptor.column("v1");
+        assertThat(v1, is(notNullValue()));
+
+        assertThat(v1.name(), is("v1"));
+        assertThat(v1.type().spec(), is(NativeTypeSpec.INT64));
+    }
+
+    private static CatalogTableDescriptor 
tableDescriptorAfterColumnAlteration() {
+        List<CatalogTableColumnDescriptor> columns = List.of(
+                new CatalogTableColumnDescriptor("k1", ColumnType.INT32, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("k2", ColumnType.STRING, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("v1", ColumnType.INT64, 
false, 0, 0, 0, null)
+        );
+
+        return new CatalogTableDescriptor(TABLE_ID, TABLE_NAME, 0, 2, columns, 
List.of("k1", "k2"), null);
+    }
+
+    @Test
+    void propagatesExceptionFromCatalogOnTableCreation() {
+        CompletableFuture<Boolean> future = 
tableCreatedListener().notify(mock(CreateTableEventParameters.class), cause);
+
+        assertThat(future, willThrow(equalTo(cause)));
+    }
+
+    @Test
+    void propagatesExceptionFromCatalogOnColumnAddition() {
+        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(mock(AddColumnEventParameters.class), cause);
+
+        assertThat(future, willThrow(equalTo(cause)));
+    }
+
+    @Test
+    void propagatesExceptionFromCatalogOnColumnRemoval() {
+        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(mock(DropColumnEventParameters.class), cause);
+
+        assertThat(future, willThrow(equalTo(cause)));
+    }
+
+    @Test
+    void propagatesExceptionFromCatalogOnColumnAlteration() {
+        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(mock(AddColumnEventParameters.class), cause);
+
+        assertThat(future, willThrow(equalTo(cause)));
+    }
+
+    @Test
+    void latestSchemaRegistryIsUnavailableUntilSomeSchemaVersionIsProcessed() {
+        assertThat(schemaManager.schemaRegistry(TABLE_ID), is(nullValue()));
+    }
+
+    @Test
+    void latestSchemaRegistryIsAvailable() {
+        createSomeTable();
+
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(TABLE_ID);
+
+        assertThat(schemaRegistry.schema().version(), is(1));
+        assertThat(schemaRegistry.schema(1).version(), is(1));
+    }
+
+    @Test
+    void schemaRegistryByCausalityTokenIsUnavailableTillTokenIsCompleted() {
+        createSomeTable();
+
+        CompletableFuture<SchemaRegistry> future = 
schemaManager.schemaRegistry(CAUSALITY_TOKEN_2, TABLE_ID);
+
+        assertThat(future, willTimeoutFast());
+    }
+
+    @Test
+    void schemaRegistryByCausalityTokenIsAvailable() {
+        createSomeTable();
+
+        CompletableFuture<SchemaRegistry> future = 
schemaManager.schemaRegistry(CAUSALITY_TOKEN_1, TABLE_ID);
+        assertThat(future, willCompleteSuccessfully());
+
+        SchemaRegistry schemaRegistry = future.join();
+
+        assertThat(schemaRegistry.schema().version(), is(1));
+        assertThat(schemaRegistry.schema(1).version(), is(1));
+    }
+
+    @Test
+    void previousSchemaVersionsRemainAvailable() {
+        create2TableVersions();
+
+        CompletableFuture<SchemaRegistry> future = 
schemaManager.schemaRegistry(CAUSALITY_TOKEN_2, TABLE_ID);
+        assertThat(future, willCompleteSuccessfully());
+
+        SchemaRegistry schemaRegistry = future.join();
+
+        SchemaDescriptor schemaDescriptor1 = schemaRegistry.schema(1);
+        assertThat(schemaDescriptor1.version(), is(1));
+
+        SchemaDescriptor schemaDescriptor2 = schemaRegistry.schema(2);
+        assertThat(schemaDescriptor2.version(), is(2));
+    }
+
+    private void create2TableVersions() {
+        createSomeTable();
+        addSomeColumn();
+    }
+
+    private void addSomeColumn() {
+        when(catalogService.table(TABLE_ID, 
CATALOG_VERSION_2)).thenReturn(tableDescriptorAfterColumnAddition());
+
+        AddColumnEventParameters event = new AddColumnEventParameters(
+                CAUSALITY_TOKEN_2,
+                CATALOG_VERSION_2,
+                TABLE_ID,
+                List.of(new CatalogTableColumnDescriptor("v2", 
ColumnType.STRING, false, 0, 0, 0, null))
+        );
+
+        CompletableFuture<Boolean> future = 
tableAlteredListener().notify(event, null);
+
+        assertThat(future, willBe(false));
+
+        completeCausalityToken(CAUSALITY_TOKEN_2);
+    }
+
+    @Test
+    void waitLatestSchemaReturnsLatestSchema() {
+        create2TableVersions();
+
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(TABLE_ID);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.waitLatestSchema();
+
+        assertThat(schemaDescriptor.version(), is(2));
+    }
+
+    @Test
+    void dropRegistryMakesItUnavailable() {
+        createSomeTable();
+
+        assertThat(schemaManager.dropRegistry(CAUSALITY_TOKEN_2, TABLE_ID), 
willCompleteSuccessfully());
+
+        completeCausalityToken(CAUSALITY_TOKEN_2);
+
+        CompletableFuture<SchemaRegistry> future = 
schemaManager.schemaRegistry(CAUSALITY_TOKEN_2, TABLE_ID);
+        assertThat(future, is(completedFuture()));
+
+        SchemaRegistry schemaRegistry = future.join();
+
+        assertThat(schemaRegistry, is(nullValue()));
+    }
+
+    @Test
+    void dropRegistryRemovesSchemasFromMetastorage() {
+        createSomeTable();
+
+        assertThat(schemaManager.dropRegistry(CAUSALITY_TOKEN_2, TABLE_ID), 
willCompleteSuccessfully());
+
+        completeCausalityToken(CAUSALITY_TOKEN_2);
+
+        assertThatNoSchemasExist(TABLE_ID);
+        assertThatNoLatestSchemaVersionExists(TABLE_ID);
+    }
+
+    private void assertThatNoSchemasExist(int tableId) {
+        CompletableFuture<List<String>> schemaEntryKeysFuture = new 
CompletableFuture<>();
+
+        Publisher<Entry> publisher = 
metaStorageManager.prefix(ByteArray.fromString(tableId + SCHEMA_STORE_PREFIX));
+        publisher.subscribe(
+                new ListAccumulator<Entry, String>(entry -> new 
String(entry.key(), UTF_8))
+                        .toSubscriber(schemaEntryKeysFuture)
+        );
+
+        assertThat(schemaEntryKeysFuture, willBe(empty()));
+    }
+
+    private void assertThatNoLatestSchemaVersionExists(int tableId) {
+        CompletableFuture<Entry> future = 
metaStorageManager.get(latestSchemaVersionKey(tableId));
+
+        assertThat(future, willCompleteSuccessfully());
+
+        Entry entry = future.join();
+
+        if (entry != null) {
+            assertThat(entry.value(), is(nullValue()));
+        }
+    }
+
+    private static ByteArray latestSchemaVersionKey(int tableId) {
+        return ByteArray.fromString(tableId + 
LATEST_SCHEMA_VERSION_STORE_SUFFIX);
+    }
+
+    @Test
+    void loadingPreExistingSchemasWorks() throws Exception {
+        create2TableVersions();
+
+        schemaManager.stop();
+
+        when(catalogService.latestCatalogVersion()).thenReturn(2);
+        
when(catalogService.tables(anyInt())).thenReturn(List.of(tableDescriptorAfterColumnAddition()));
+        doReturn(45L).when(metaStorageManager).appliedRevision();
+
+        schemaManager = new CatalogSchemaManager(registry, catalogService, 
metaStorageManager);
+        schemaManager.start();
+
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(TABLE_ID);
+
+        int prevSchemaVersionNotYetTouched = 1;
+
+        SchemaDescriptor schemaDescriptor = 
schemaRegistry.schema(prevSchemaVersionNotYetTouched);
+        assertThat(schemaDescriptor.version(), 
is(prevSchemaVersionNotYetTouched));
+    }
+}
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
new file mode 100644
index 0000000000..8b58543309
--- /dev/null
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.catalog;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.schema.AbstractSchemaConverterTest;
+import org.apache.ignite.internal.schema.BitmaskNativeType;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.DecimalNativeType;
+import 
org.apache.ignite.internal.schema.DefaultValueProvider.FunctionalValueProvider;
+import org.apache.ignite.internal.schema.DefaultValueProvider.Type;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.NumberNativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TemporalNativeType;
+import org.apache.ignite.internal.schema.VarlenNativeType;
+import 
org.apache.ignite.internal.schema.testutils.definition.DefaultValueGenerators;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify conversion from catalog table descriptors to schema objects.
+ */
+public class CatalogToSchemaDescriptorConverterTest extends 
AbstractSchemaConverterTest {
+    private static final int TEST_LENGTH = 15;
+
+    private static final int TEST_PRECISION = 8;
+
+    private static final int TEST_SCALE = 5;
+
+    @ParameterizedTest
+    @EnumSource(NativeTypeSpec.class)
+    public void convertColumnType(NativeTypeSpec typeSpec) {
+        CatalogTableColumnDescriptor columnDescriptor = 
TestColumnDescriptors.forSpec(typeSpec);
+
+        NativeType type = 
CatalogToSchemaDescriptorConverter.convertType(columnDescriptor);
+
+        if (columnDescriptor.type() == ColumnType.BYTE_ARRAY) {
+            assertThat(type.spec(), is(NativeTypeSpec.BYTES));
+        } else {
+            assertThat(type.spec().name(), 
equalTo(columnDescriptor.type().name()));
+        }
+
+        if (type instanceof VarlenNativeType) {
+            assertThat(((VarlenNativeType) type).length(), 
equalTo(TEST_LENGTH));
+        } else if (type instanceof NumberNativeType) {
+            assertThat(((NumberNativeType) type).precision(), 
equalTo(columnDescriptor.precision()));
+        } else if (type instanceof DecimalNativeType) {
+            assertThat(((DecimalNativeType) type).precision(), 
equalTo(columnDescriptor.precision()));
+            assertThat(((DecimalNativeType) type).scale(), 
equalTo(columnDescriptor.scale()));
+        } else if (type instanceof TemporalNativeType) {
+            assertThat(((TemporalNativeType) type).precision(), 
equalTo(columnDescriptor.precision()));
+        } else if (type instanceof BitmaskNativeType) {
+            assertThat(((BitmaskNativeType) type).bits(), 
equalTo(TEST_LENGTH));
+        } else {
+            assertThat("Unknown type: " + type.getClass(), type.getClass(), 
equalTo(NativeType.class));
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("generateTestArguments")
+    public void convertColumnDescriptorConstantDefault(DefaultValueArg arg) {
+        String columnName = arg.type.spec().name();
+        CatalogTableColumnDescriptor columnDescriptor = 
TestColumnDescriptors.forType(arg);
+
+        Column column = CatalogToSchemaDescriptorConverter.convert(0, 
columnDescriptor);
+
+        assertThat(column.name(), equalTo(columnName));
+        assertThat(column.type(), equalTo(arg.type));
+        assertThat(column.nullable(), equalTo(arg.defaultValue == null));
+        assertThat(column.defaultValueProvider().type(), 
equalTo(Type.CONSTANT));
+        assertThat(column.defaultValue(), equalTo(arg.defaultValue));
+    }
+
+    @Test
+    public void convertColumnDescriptorFunctionalDefault() {
+        String columnName = "UUID";
+        String functionName = DefaultValueGenerators.GEN_RANDOM_UUID;
+        DefaultValue defaultValue = DefaultValue.functionCall(functionName);
+
+        CatalogTableColumnDescriptor columnDescriptor = new 
CatalogTableColumnDescriptor(
+                NativeTypeSpec.UUID.name(),
+                NativeTypeSpec.UUID.asColumnType(),
+                false,
+                TEST_LENGTH,
+                TEST_PRECISION,
+                TEST_SCALE,
+                defaultValue
+        );
+
+        Column column = CatalogToSchemaDescriptorConverter.convert(0, 
columnDescriptor);
+
+        assertThat(column.name(), equalTo(columnName));
+        assertThat(column.type(), equalTo(NativeTypes.UUID));
+        assertThat(column.defaultValueProvider().type(), 
equalTo(Type.FUNCTIONAL));
+        assertThat(((FunctionalValueProvider) 
column.defaultValueProvider()).name(), equalTo(functionName));
+    }
+
+    @Test
+    public void convertTableDescriptor() {
+        CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
+                1,
+                "test",
+                0,
+                1,
+                List.of(
+                        new CatalogTableColumnDescriptor("C1", 
ColumnType.INT32, false, 0, 0, 0, null),
+                        new CatalogTableColumnDescriptor("K2", 
ColumnType.INT32, false, 0, 0, 0, null),
+                        new CatalogTableColumnDescriptor("C2", 
ColumnType.INT32, false, 0, 0, 0, null),
+                        new CatalogTableColumnDescriptor("K1", 
ColumnType.INT32, false, 0, 0, 0, null)
+                ),
+                List.of("K1", "K2"),
+                List.of("K2")
+        );
+
+        SchemaDescriptor schema = 
CatalogToSchemaDescriptorConverter.convert(tableDescriptor);
+
+        assertThat(schema.keyColumns().length(), equalTo(2));
+        assertThat(schema.keyColumns().column(0).name(), equalTo("K1"));
+        assertThat(schema.keyColumns().column(1).name(), equalTo("K2"));
+        assertThat(schema.valueColumns().length(), equalTo(2));
+        assertThat(schema.valueColumns().column(0).name(), equalTo("C1"));
+        assertThat(schema.valueColumns().column(1).name(), equalTo("C2"));
+        assertThat(schema.colocationColumns().length, equalTo(1));
+        assertThat(schema.colocationColumns()[0].name(), equalTo("K2"));
+    }
+
+    private static Iterable<DefaultValueArg> generateTestArguments() {
+        var paramList = new ArrayList<DefaultValueArg>();
+
+        for (var entry : DEFAULT_VALUES_TO_TEST.entrySet()) {
+            for (var defaultValue : entry.getValue()) {
+                paramList.add(
+                        new DefaultValueArg(specToType(entry.getKey()), 
adjust(defaultValue))
+                );
+            }
+        }
+        return paramList;
+    }
+
+    private static class TestColumnDescriptors {
+        static CatalogTableColumnDescriptor forSpec(NativeTypeSpec spec) {
+            return new CatalogTableColumnDescriptor(spec.name(), 
spec.asColumnType(), false, TEST_PRECISION, TEST_SCALE, TEST_LENGTH, null);
+        }
+
+        static CatalogTableColumnDescriptor forType(DefaultValueArg arg) {
+            NativeType type = arg.type;
+
+            int length = type instanceof VarlenNativeType ? 
((VarlenNativeType) type).length()
+                    : type instanceof BitmaskNativeType ? ((BitmaskNativeType) 
type).bits()
+                    : 0;
+
+            int precision = type instanceof DecimalNativeType ? 
((DecimalNativeType) type).precision()
+                    : type instanceof NumberNativeType ? ((NumberNativeType) 
type).precision()
+                    : type instanceof TemporalNativeType ? 
((TemporalNativeType) type).precision()
+                    : 0;
+
+            int scale = type instanceof DecimalNativeType ? 
((DecimalNativeType) type).scale() : 0;
+
+            return new CatalogTableColumnDescriptor(
+                    type.spec().name(),
+                    type.spec().asColumnType(),
+                    arg.defaultValue == null,
+                    precision,
+                    scale,
+                    length,
+                    DefaultValue.constant(arg.defaultValue)
+            );
+        }
+    }
+}
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
index 0dd2e8c417..0807c38602 100644
--- 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/ConfigurationToSchemaDescriptorConverterTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 import java.util.ArrayList;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.schema.AbstractSchemaConverterTest;
 import org.apache.ignite.internal.schema.BitmaskNativeType;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.DecimalNativeType;
diff --git 
a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
 
b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
index 509f510455..13e8242213 100644
--- 
a/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
+++ 
b/modules/schema/src/testFixtures/java/org/apache/ignite/internal/schema/testutils/SchemaConfigurationConverter.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.schema.configuration.ColumnChange;
 import org.apache.ignite.internal.schema.configuration.ColumnTypeChange;
 import org.apache.ignite.internal.schema.configuration.ColumnTypeView;
 import org.apache.ignite.internal.schema.configuration.ColumnView;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
 import org.apache.ignite.internal.schema.configuration.PrimaryKeyView;
 import org.apache.ignite.internal.schema.configuration.TableChange;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
@@ -412,6 +413,8 @@ public class SchemaConfigurationConverter {
             }
         });
 
+        ((ExtendedTableChange) tblChg).changeSchemaId(1);
+
         tblChg.changePrimaryKey(pkCng -> 
pkCng.changeColumns(tbl.keyColumns().toArray(String[]::new))
                 
.changeColocationColumns(tbl.colocationColumns().toArray(String[]::new)));
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
index aecb8d8eaa..18b64e3644 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
@@ -574,7 +574,15 @@ public class CatalogSqlSchemaManagerTest {
                 columnDescriptors.add(newCol);
             }
 
-            return new CatalogTableDescriptor(id, name, zoneId, 
columnDescriptors, primaryKey, colocationKey);
+            return new CatalogTableDescriptor(
+                    id,
+                    name,
+                    zoneId,
+                    CatalogTableDescriptor.INITIAL_TABLE_VERSION,
+                    columnDescriptors,
+                    primaryKey,
+                    colocationKey
+            );
         }
     }
 


Reply via email to