This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 81e5476  IGNITE-14077: Distributed schema manager. (#91)
81e5476 is described below

commit 81e54764023d60cd87896a5d550c542584c0fbe5
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Wed May 19 12:07:34 2021 +0300

    IGNITE-14077: Distributed schema manager. (#91)
---
 .../schemas/table/TableConfigurationSchema.java    |   2 +-
 .../main/java/org/apache/ignite/table/Tuple.java   |  18 +
 .../configuration/internal/rest/JsonConverter.java |   2 +-
 .../org/apache/ignite/internal/manager/Event.java  |   2 +-
 .../ignite/internal/manager/EventParameters.java   |   2 +-
 .../apache/ignite/internal/manager/Producer.java   |  12 +-
 .../runner/app/DynamicTableCreationTest.java       | 162 +++++++
 .../internal/runner/app/TableCreationTest.java     | 190 ++++++++
 .../apache/ignite/internal/app/IgnitionImpl.java   |   4 +-
 modules/schema/pom.xml                             |   7 +-
 .../ignite/internal/schema/SchemaDescriptor.java   |  15 +
 .../ignite/internal/schema/SchemaManager.java      | 303 +++++++++++-
 .../ignite/internal/schema/SchemaRegistry.java}    |   8 +-
 .../ignite/internal/schema/event/SchemaEvent.java} |  16 +-
 .../schema/event/SchemaEventParameters.java}       |  42 +-
 .../SchemaRegistrationConflictException.java}      |  23 +-
 .../schema/registry/SchemaRegistryException.java}  |  25 +-
 .../schema/registry/SchemaRegistryImpl.java        | 147 ++++++
 .../internal/schema/registry/package-info.java}    |   8 +-
 .../internal/schema/SchemaRegistryImplTest.java    | 528 +++++++++++++++++++++
 .../ignite/distributed/ITDistributedTableTest.java |   4 +-
 .../schema/marshaller/TupleMarshaller.java         |   2 +-
 .../ignite/internal/table/AbstractTableView.java   |   9 +-
 .../ignite/internal/table/KVBinaryViewImpl.java    |  11 +-
 .../apache/ignite/internal/table/KVViewImpl.java   |   9 +-
 .../ignite/internal/table/RecordViewImpl.java      |   9 +-
 .../ignite/internal/table/RowChunkAdapter.java     |  16 +
 .../apache/ignite/internal/table/TableImpl.java    |  21 +-
 .../ignite/internal/table/TupleBuilderImpl.java    |  12 +
 .../ignite/internal/table/TupleMarshallerImpl.java |  13 +-
 .../internal/table/distributed/TableManager.java   |  84 +++-
 .../internal/table/event/TableEventParameters.java |  14 +-
 .../table/impl/DummySchemaManagerImpl.java         |   4 +-
 33 files changed, 1565 insertions(+), 159 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
 
b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
index 6e5ae3e..0f76eb3 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
@@ -43,7 +43,7 @@ public class TableConfigurationSchema {
     /** Count of table partition replicas. */
     @Min(1)
     @Value(hasDefault = true)
-    public int replicas = 0;
+    public int replicas = 1;
 
     /** Columns configuration. */
     @NamedConfigValue
diff --git a/modules/api/src/main/java/org/apache/ignite/table/Tuple.java 
b/modules/api/src/main/java/org/apache/ignite/table/Tuple.java
index 40fda9d..7b75ebe 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/Tuple.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/Tuple.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.table;
 
+import java.util.BitSet;
+import java.util.UUID;
 import org.apache.ignite.binary.BinaryObject;
 
 /**
@@ -97,4 +99,20 @@ public interface Tuple {
      * @return Column value.
      */
     String stringValue(String colName);
+
+    /**
+     * Gets {@code UUID} column value.
+     *
+     * @param colName Column name.
+     * @return Column value.
+     */
+    UUID uuidValue(String colName);
+
+    /**
+     * Gets {@code BitSet} column value.
+     *
+     * @param colName Column name.
+     * @return Column value.
+     */
+    BitSet bitmaskValue(String colName);
 }
diff --git 
a/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
 
b/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
index 93eb39a..a62d21e 100644
--- 
a/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
+++ 
b/modules/configuration/src/main/java/org/apache/ignite/configuration/internal/rest/JsonConverter.java
@@ -204,7 +204,7 @@ public class JsonConverter {
                     }
                     else {
                         throw new IllegalArgumentException(
-                            "'" + join(path) + "' configuration doesn't have 
'" + key + "' subconfiguration"
+                            "'" + join(path) + "' configuration doesn't 
expects '" + key + "' subconfiguration"
                         );
                     }
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java 
b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
index c245b5b..39d0bbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.manager;
 
 /**
  * The event cas whcih is produced by event producer component.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * @see Producer#onEvent(Event, EventParameters, Throwable)
  */
 public interface Event {
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
index 140047d..9a76509 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.manager;
 /**
  * Event parameters.
  * This type passed to the event listener.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * @see Producer#onEvent(Event, EventParameters, Throwable)
  */
 public interface EventParameters {
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java 
b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
index c198bba..f9639af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -27,7 +27,7 @@ import java.util.function.BiPredicate;
  */
 public abstract class Producer<T extends Event, P extends EventParameters> {
     /** All listeners. */
-    private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P, 
Exception>>> listeners = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P, 
Throwable>>> listeners = new ConcurrentHashMap<>();
 
     /**
      * Registers an event listener.
@@ -37,7 +37,7 @@ public abstract class Producer<T extends Event, P extends 
EventParameters> {
      * @param evt Event.
      * @param closure Closure.
      */
-    public void listen(T evt, BiPredicate<P, Exception> closure) {
+    public void listen(T evt, BiPredicate<P, Throwable> closure) {
         listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>())
             .offer(closure);
     }
@@ -49,15 +49,15 @@ public abstract class Producer<T extends Event, P extends 
EventParameters> {
      * @param params Event parameters.
      * @param err Exception when it was happened, or {@code null} otherwise.
      */
-    protected void onEvent(T evt, P params, Exception err) {
-        ConcurrentLinkedQueue<BiPredicate<P, Exception>> queue = 
listeners.get(evt);
+    protected void onEvent(T evt, P params, Throwable err) {
+        ConcurrentLinkedQueue<BiPredicate<P, Throwable>> queue = 
listeners.get(evt);
 
         if (queue == null)
             return;
 
-        BiPredicate<P, Exception> closure;
+        BiPredicate<P, Throwable> closure;
 
-        Iterator<BiPredicate<P, Exception>> iter = queue.iterator();
+        Iterator<BiPredicate<P, Throwable>> iter = queue.iterator();
 
         while (iter.hasNext()) {
             closure = iter.next();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
new file mode 100644
index 0000000..13b8217
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Ignition interface tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-14389";)
+class DynamicTableCreationTest {
+    /** Nodes bootstrap configuration. */
+    private final String[] nodesBootstrapCfg =
+        {
+            "{\n" +
+                "  \"node\": {\n" +
+                "    \"name\":node0,\n" +
+                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+                "  },\n" +
+                "  \"network\": {\n" +
+                "    \"port\":3344,\n" +
+                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
+                "  }\n" +
+                "}",
+
+            "{\n" +
+                "  \"node\": {\n" +
+                "    \"name\":node1,\n" +
+                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+                "  },\n" +
+                "  \"network\": {\n" +
+                "    \"port\":3345,\n" +
+                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
+                "  }\n" +
+                "}",
+
+            "{\n" +
+                "  \"node\": {\n" +
+                "    \"name\":node2,\n" +
+                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+                "  },\n" +
+                "  \"network\": {\n" +
+                "    \"port\":3346,\n" +
+                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
+                "  }\n" +
+                "}",
+        };
+
+    /**
+     * Check dynamic table creation.
+     */
+    @Test
+    void testDynamicSimpleTableCreation() {
+        List<Ignite> clusterNodex = new ArrayList<>();
+
+        for (String nodeBootstrapCfg : nodesBootstrapCfg)
+            clusterNodex.add(IgnitionManager.start(nodeBootstrapCfg));
+
+        assertEquals(3, clusterNodex.size());
+
+        // Create table on node 0.
+        clusterNodex.get(0).tables().createTable("tbl1", tbl -> tbl
+            .changeReplicas(1)
+            .changePartitions(10)
+            .changeColumns(cols -> cols
+                .create("key", c -> c.changeType(t -> t.changeType("INT64")))
+                .create("val", c -> c.changeType(t -> t.changeType("INT64")))
+            )
+            .changeIndices(idxs -> idxs
+                .create("PK", idx -> idx
+                    .changeType("PRIMARY")
+                    .changeColumns(c -> c
+                        .create("key", t -> {
+                        }))
+                    .changeAffinityColumns(new String[] {"key"}))
+            ));
+
+        // Put data on node 1.
+        Table tbl1 = clusterNodex.get(1).tables().table("tbl1");
+        tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val", 
111L).build());
+
+        // Get data on node 2.
+        Table tbl2 = clusterNodex.get(2).tables().table("tbl1");
+        assertEquals(111L, tbl2.get(tbl2.tupleBuilder().set("key", 
1L).build()));
+    }
+
+    /**
+     * Check dynamic table creation.
+     */
+    @Test
+    void testDynamicTableCreation() {
+        List<Ignite> clusterNodes = new ArrayList<>();
+
+        for (String nodeBootstrapCfg : nodesBootstrapCfg)
+            clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg));
+
+        assertEquals(3, clusterNodes.size());
+
+        // Create table on node 0.
+        clusterNodes.get(0).tables().createTable("tbl1", tbl -> tbl
+            .changeReplicas(1)
+            .changePartitions(10)
+            .changeColumns(cols -> cols
+                .create("key", c -> c.changeType(t -> t.changeType("UUID")))
+                .create("affKey", c -> c.changeType(t -> 
t.changeType("INT64")))
+                .create("valStr", c -> c.changeType(t -> 
t.changeType("STRING")))
+                .create("valInt", c -> c.changeType(t -> 
t.changeType("INT32")))
+                .create("valNullable", c -> c.changeType(t -> 
t.changeType("INT8")).changeNullable(true))
+            )
+            .changeIndices(idxs -> idxs
+                .create("PK", idx -> idx
+                    .changeType("PRIMARY")
+                    .changeColumns(c -> c
+                        .create("key", t -> {
+                        })
+                        .create("affKey", t -> {
+                        }))
+                    .changeAffinityColumns(new String[] {"affKey"}))
+            ));
+
+        final UUID uuid = UUID.randomUUID();
+
+        // Put data on node 1.
+        Table tbl1 = clusterNodes.get(1).tables().table("tbl1");
+        tbl1.insert(tbl1.tupleBuilder().set("key", uuid).set("affKey", 42L)
+            .set("valStr", "String value").set("valInt", 
73L).set("valNullable", null).build());
+
+        // Get data on node 2.
+        Table tbl2 = clusterNodes.get(2).tables().table("tbl1");
+        final Tuple val = tbl2.get(tbl1.tupleBuilder().set("key", 
uuid).set("affKey", 42L).build());
+
+        assertEquals("String value", val.value("valStr"));
+        assertEquals(73L, (Long)val.value("valInt"));
+        assertNull(val.value("valNullable"));
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
new file mode 100644
index 0000000..035caad
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableCreationTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Ignition interface tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-14578";)
+class TableCreationTest {
+    /** Nodes bootstrap configuration with preconfigured tables. */
+    private final String[] nodesBootstrapCfg =
+        {
+            "{\n" +
+                "  \"node\": {\n" +
+                "    \"name\":node0,\n" +
+                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+                "  },\n" +
+                "  \"network\": {\n" +
+                "    \"port\":3344,\n" +
+                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
+                "  },\n" +
+                "  \"table\": {\n" +
+                "       \"tables\": {\n" +
+                "           \"tbl1\": {\n" +
+                "               \"partitions\":10,\n" +
+                "               \"replicas\":2,\n" +
+                "               \"columns\": { \n" +
+                "                   \"key\": {\n" +
+                "                       \"type\": {" +
+                "                           \"type\":UUID\n" +
+                "                       },\n" +
+                "                       \"nullable\":false\n" +
+                "                   },\n" +
+                "                   \"affKey\": {\n" +
+                "                       \"type\": {" +
+                "                           \"type\":INT64\n" +
+                "                       },\n" +
+                "                       \"nullable\":false\n" +
+                "                   },\n" +
+                "                   \"valString\": {\n" +
+                "                       \"type\": {" +
+                "                           \"type\":String\n" +
+                "                       },\n" +
+                "                       \"nullable\":false\n" +
+                "                   },\n" +
+                "                   \"valInt\": {\n" +
+                "                       \"type\": {" +
+                "                           \"type\":INT32\n" +
+                "                       },\n" +
+                "                       \"nullable\":false\n" +
+                "                   },\n" +
+                "                   \"valNullable\": {\n" +
+                "                       \"type\": {" +
+                "                           \"type\":String\n" +
+                "                       },\n" +
+                "                       \"nullable\":true\n" +
+                "                   }\n" +
+                "               },\n" + /* Columns. */
+                "               \"indices\": {\n" +
+                "                   \"PK\": {\n" +
+                "                       \"type\":PRIMARY,\n" +
+                "                       \"columns\": {\n" +
+                "                           \"key\": {\n" +
+                "                               \"asc\":true\n" +
+                "                           },\n" +
+                "                           \"affKey\": {}\n" +
+                "                       },\n" + /* Columns. */
+                "                       \"affinityColumns\":[ \"affKey\" ]\n" +
+                "                   }\n" +
+                "               }\n" + /* Indices. */
+                "           },\n" + /* Table. */
+                "\n" +
+                "           \"tbl2\": {\n" + // Table minimal configuration.
+                "               \"columns\": { \n" +
+                "                   \"key\": {\n" +
+                "                       \"type\": {" +
+                "                           \"type\":INT64\n" +
+                "                       },\n" +
+                "                   },\n" +
+                "                   \"val\": {\n" +
+                "                       \"type\": {" +
+                "                           \"type\":INT64\n" +
+                "                       },\n" +
+                "                   }\n" +
+                "               },\n" + /* Columns. */
+                "               \"indices\": {\n" +
+                "                   \"PK\": {\n" +
+                "                       \"type\":PRIMARY,\n" +
+                "                       \"columns\": {\n" +
+                "                           \"key\": {}\n" +
+                "                       },\n" + /* Columns. */
+                "                   }\n" +
+                "               }\n" + /* Indices. */
+                "           }\n" + /* Table. */
+                "       }\n" + /* Tables. */
+                "  }\n" + /* Root. */
+                "}",
+
+            "{\n" +
+                "  \"node\": {\n" +
+                "    \"name\":node1,\n" +
+                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+                "  },\n" +
+                "  \"network\": {\n" +
+                "    \"port\":3345,\n" +
+                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
+                "  }\n" +
+                "}",
+
+            "{\n" +
+                "  \"node\": {\n" +
+                "    \"name\":node2,\n" +
+                "    \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+                "  },\n" +
+                "  \"network\": {\n" +
+                "    \"port\":3346,\n" +
+                "    \"netClusterNodes\":[ \"localhost:3344\", 
\"localhost:3345\", \"localhost:3346\" ]\n" +
+                "  }\n" +
+                "}",
+        };
+
+    /**
+     * Check table creation via bootstrap configuration with pre-configured 
table.
+     */
+    @Test
+    void testInitialSimpleTableConfiguration() {
+        List<Ignite> clusterNodes = new ArrayList<>();
+
+        for (String nodeBootstrapCfg : nodesBootstrapCfg)
+            clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg));
+
+        assertEquals(3, clusterNodes.size());
+
+        clusterNodes.forEach(Assertions::assertNotNull);
+
+        { /* Table 1.*/
+            Table tbl1 = clusterNodes.get(1).tables().table("tbl1");
+            tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val", 
111L).build());
+
+            Table tbl2 = clusterNodes.get(2).tables().table("tbl1");
+            assertEquals(111L, tbl2.get(tbl2.tupleBuilder().set("key", 
1L).build()));
+        }
+
+        { /* Table 2. */
+            final UUID uuid = UUID.randomUUID();
+
+            // Put data on node 1.
+            Table tbl1 = clusterNodes.get(1).tables().table("tbl1");
+            tbl1.insert(tbl1.tupleBuilder().set("key", uuid).set("affKey", 42L)
+                .set("valStr", "String value").set("valInt", 
73L).set("valNullable", null).build());
+
+            // Get data on node 2.
+            Table tbl2 = clusterNodes.get(2).tables().table("tbl1");
+            final Tuple val = tbl2.get(tbl1.tupleBuilder().set("key", 
uuid).set("affKey", 42L).build());
+
+            assertEquals("String value", val.value("valStr"));
+            assertEquals(73L, (Long)val.value("valInt"));
+            assertNull(val.value("valNullable"));
+        }
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index d35524b..01db2d3 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -108,7 +108,7 @@ public class IgnitionImpl implements Ignition {
                 locConfigurationMgr.bootstrap(jsonStrBootstrapCfg);
             }
             catch (Exception e) {
-                LOG.warn("Unable to parse user specific configuration, default 
configuration will be used", e);
+                LOG.warn("Unable to parse user specific configuration, default 
configuration will be used: " + e.getMessage());
             }
         else if (jsonStrBootstrapCfg != null)
             LOG.warn("User specific configuration will be ignored, cause vault 
was bootstrapped with pds configuration");
@@ -161,7 +161,7 @@ public class IgnitionImpl implements Ignition {
         // Affinity manager startup.
         AffinityManager affinityMgr = new AffinityManager(configurationMgr, 
metaStorageMgr, baselineMgr, vaultMgr);
 
-        SchemaManager schemaMgr = new SchemaManager(configurationMgr);
+        SchemaManager schemaMgr = new SchemaManager(configurationMgr, 
metaStorageMgr, vaultMgr);
 
         // Distributed table manager startup.
         IgniteTables distributedTblMgr = new TableManager(
diff --git a/modules/schema/pom.xml b/modules/schema/pom.xml
index 03acb79..e2043f8 100644
--- a/modules/schema/pom.xml
+++ b/modules/schema/pom.xml
@@ -40,12 +40,17 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-bytecode</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
         </dependency>
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-bytecode</artifactId>
+            <artifactId>ignite-metastorage</artifactId>
         </dependency>
 
         <!-- 3rd party dependencies -->
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
index e0e0dfb..8e1e91d 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
@@ -46,6 +46,19 @@ public class SchemaDescriptor {
      * @param valCols Value columns.
      */
     public SchemaDescriptor(int ver, Column[] keyCols, Column[] valCols) {
+        this(ver, keyCols, null, valCols);
+    }
+
+    /**
+     * @param ver Schema version.
+     * @param keyCols Key columns.
+     * @param affCols Affinity column names.
+     * @param valCols Value columns.
+     */
+    public SchemaDescriptor(int ver, Column[] keyCols, @Nullable String[] 
affCols, Column[] valCols) {
+        assert keyCols.length > 0 : "No key columns are conigured.";
+        assert valCols.length > 0 : "No value columns are conigured.";
+
         this.ver = ver;
         this.keyCols = new Columns(0, keyCols);
         this.valCols = new Columns(keyCols.length, valCols);
@@ -54,6 +67,8 @@ public class SchemaDescriptor {
 
         Arrays.stream(this.keyCols.columns()).forEach(c -> 
colMap.put(c.name(), c));
         Arrays.stream(this.valCols.columns()).forEach(c -> 
colMap.put(c.name(), c));
+
+        //TODO: https://issues.apache.org/jira/browse/IGNITE-14388 Add 
affinity columns support.
     }
 
     /**
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index da099e4..fbb6c4d 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -17,60 +17,315 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.event.SchemaEvent;
+import org.apache.ignite.internal.schema.event.SchemaEventParameters;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
+ *
+ * Schemas MUST be registered in a version ascending order incrementing by 
{@code 1} with NO gaps,
+ * otherwise an exception will be thrown. The version numbering starts from 
the {@code 1}.
+ * <p>
+ * After some table maintenance process some first versions may become 
outdated and can be safely cleaned up
+ * if the process guarantees the table no longer has a data of these versions.
+ *
+ * @implSpec The changes in between two arbitrary actual versions MUST NOT be 
lost.
+ * Thus, schema versions can only be removed from the beginning.
+ * @implSpec Initial schema history MAY be registered without the first 
outdated versions
+ * that could be cleaned up earlier.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific 
configuration.*/
+public class SchemaManager extends Producer<SchemaEvent, 
SchemaEventParameters> {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Schema history item key suffix. */
+    private static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+    /** Configuration manager in order to handle and listen schema specific 
configuration. */
     private final ConfigurationManager configurationMgr;
 
-    /** Schema. */
-    private final SchemaDescriptor schema;
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageMgr;
+
+    /** Vault manager. */
+    private final VaultManager vaultMgr;
+
+    /** Schema registries. */
+    private final Map<UUID, SchemaRegistryImpl> schemaRegs = new 
ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Metastorage manager.
+     * @param vaultMgr Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageMgr,
+        VaultManager vaultMgr
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageMgr = metaStorageMgr;
+        this.vaultMgr = vaultMgr;
+
+        metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new 
WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> 
events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = 
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+                    int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+                    if (verPos == -1) {
+                        final UUID tblId = UUID.fromString(keyTail);
+
+                        SchemaRegistry reg = schemaRegistryForTable(tblId);
+
+                        assert reg != null : "Table schema was not initialized 
or table has been dropped: " + tblId;
+
+                        if (evt.oldEntry() == null)
+                            onEvent(SchemaEvent.INITIALIZED, new 
SchemaEventParameters(tblId, reg), null);
+                        else if (evt.newEntry() == null) {
+                            schemaRegs.remove(tblId);
+
+                            onEvent(SchemaEvent.DROPPED, new 
SchemaEventParameters(tblId, null), null);
+                        }
+
+                        return true; // Ignore last table schema version.
+                    }
+                    else {
+                        UUID tblId = UUID.fromString(keyTail.substring(0, 
verPos));
+
+                        SchemaRegistryImpl reg = schemaRegs.get(tblId);
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                        if (reg == null)
+                            schemaRegs.put(tblId, (reg = new 
SchemaRegistryImpl(v -> tableSchema(tblId, v))));
+
+                        if (evt.oldEntry() == null)
+                            
reg.onSchemaRegistered((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+                        else if (evt.newEntry() == null)
+                            
reg.onSchemaDropped(Integer.parseInt(keyTail.substring(verPos + 
INTERNAL_VER_SUFFIX.length())));
+                        else
+                            throw new SchemaRegistryException("Schema of 
concrete version can't be changed.");
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
             }
+        });
+    }
+
+    /**
+     * Creates schema registry for the table with existed schema or
+     * registers initial schema from configuration.
+     *
+     * @param tblId Table id.
+     * @param tblName Table name.
+     * @return Operation future.
+     */
+    public CompletableFuture<Boolean> initSchemaForTable(final UUID tblId, 
String tblName) {
+        return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+            thenCompose(entry -> {
+                TableConfiguration tblConfig = 
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+
+                assert entry.empty();
+
+                final int schemaVer = 1;
+
+                final Key lastVerKey = new Key(INTERNAL_PREFIX + tblId);
+                final Key schemaKey = new Key(INTERNAL_PREFIX + tblId + 
INTERNAL_VER_SUFFIX + schemaVer);
+
+                final SchemaDescriptor desc = 
createSchemaDescriptor(schemaVer, tblConfig);
+
+                return metaStorageMgr.invoke(
+                    Conditions.key(lastVerKey).value().eq(entry.value()), // 
Won't to rewrite if the version goes ahead.
+                    List.of(
+                        //TODO: IGNITE-14679 Serialize schema.
+                        Operations.put(schemaKey, ByteUtils.toBytes(desc)),
+                        Operations.put(lastVerKey, 
ByteUtils.longToBytes(schemaVer))
+                    ),
+                    List.of(Operations.noop()));
+            });
+    }
+
+    /**
+     * Return table schema of certain version from history.
+     *
+     * @param tblId Table id.
+     * @param schemaVer Schema version.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor tableSchema(UUID tblId, int schemaVer) {
+        try {
+            return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId + 
INTERNAL_VER_SUFFIX + schemaVer))
+                .thenApply(e -> e.empty() ? null : 
(SchemaDescriptor)ByteUtils.fromBytes(e.value())).get();
+        }
+        catch (InterruptedException | ExecutionException e) {
+            throw new SchemaRegistryException("Can't read schema from vault: 
ver=" + schemaVer, e);
+        }
+    }
+
+    /**
+     * Creates schema descriptor from configuration.
+     *
+     * @param ver Schema version.
+     * @param tblConfig Table config.
+     * @return Schema descriptor.
+     */
+    private SchemaDescriptor createSchemaDescriptor(int ver, 
TableConfiguration tblConfig) {
+        final TableIndexConfiguration pkCfg = 
tblConfig.indices().get(PrimaryIndex.PRIMARY_KEY_INDEX_NAME);
+
+        assert pkCfg != null;
+
+        final Set<String> keyColNames = 
Stream.of(pkCfg.colNames().value()).collect(Collectors.toSet());
+        final NamedListView<ColumnView> cols = tblConfig.columns().value();
+
+        final ArrayList<Column> keyCols = new ArrayList<>(keyColNames.size());
+        final ArrayList<Column> valCols = new ArrayList<>(cols.size() - 
keyColNames.size());
+
+        cols.namedListKeys().stream()
+            .map(cols::get)
+            //TODO: IGNITE-14290 replace with helper class call.
+            .map(col -> new Column(col.name(), createType(col.type()), 
col.nullable()))
+            .forEach(c -> (keyColNames.contains(c.name()) ? keyCols : 
valCols).add(c));
+
+        return new SchemaDescriptor(
+            ver,
+            keyCols.toArray(Column[]::new),
+            pkCfg.affinityColumns().value(),
+            valCols.toArray(Column[]::new)
         );
     }
 
     /**
-     * Gets a current schema for the table specified.
+     * Create type from config.
      *
-     * @param tableId Table id.
-     * @return Schema.
+     * TODO: IGNITE-14290 replace with helper class call.
+     *
+     * @param type Type view.
+     * @return Native type.
      */
-    public SchemaDescriptor schema(UUID tableId) {
-        return schema;
+    private NativeType createType(ColumnTypeView type) {
+        switch (type.type().toLowerCase()) {
+            case "byte":
+                return NativeType.BYTE;
+            case "short":
+                return NativeType.SHORT;
+            case "int":
+                return NativeType.INTEGER;
+            case "long":
+                return NativeType.LONG;
+            case "float":
+                return NativeType.FLOAT;
+            case "double":
+                return NativeType.DOUBLE;
+            case "uuid":
+                return NativeType.UUID;
+            case "bitmask":
+                return Bitmask.of(type.length());
+            case "string":
+                return NativeType.STRING;
+            case "bytes":
+                return NativeType.BYTES;
+
+            default:
+                throw new IllegalStateException("Unsupported column type: " + 
type.type());
+        }
     }
 
     /**
-     * Gets a schema for specific version.
+     * Compares schemas.
      *
+     * @param expected Expected schema.
+     * @param actual Actual schema.
+     * @return {@code True} if schemas are equal, {@code false} otherwise.
+     */
+    public static boolean equalSchemas(SchemaDescriptor expected, 
SchemaDescriptor actual) {
+        if (expected.keyColumns().length() != actual.keyColumns().length() ||
+            expected.valueColumns().length() != actual.valueColumns().length())
+            return false;
+
+        for (int i = 0; i < expected.length(); i++) {
+            if (!expected.column(i).equals(actual.column(i)))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param tableId Table id.
-     * @param ver Schema version.
-     * @return Schema.
+     * @return Schema registry for the table.
      */
-    public SchemaDescriptor schema(UUID tableId, long ver) {
-        assert ver >= 0;
+    private SchemaRegistry schemaRegistryForTable(UUID tableId) {
+        final SchemaRegistry reg = schemaRegs.get(tableId);
+
+        if (reg == null)
+            throw new SchemaRegistryException("No schema was ever registered 
for the table: " + tableId);
+
+        return reg;
+    }
+
+    /**
+     * Unregistered all schemas associated with a table identifier.
+     *
+     * @param tableId Table identifier.
+     * @return Future which will complete when all versions of schema will be 
unregistered.
+     */
+    public CompletableFuture<Boolean> unregisterSchemas(UUID tableId) {
+        CompletableFuture<Void> fut = metaStorageMgr.remove(new 
Key(INTERNAL_PREFIX + tableId));
+
+        String schemaPrefix = INTERNAL_PREFIX + tableId + INTERNAL_VER_SUFFIX;
 
-        assert schema.version() == ver;
+        List<Key> keys = new ArrayList<>();
+        try (Cursor<Entry> cursor = metaStorageMgr.range(new 
Key(schemaPrefix), null)) {
+            cursor.forEach(entry -> keys.add(entry.key()));
+        }
+        catch (Exception e) {
+            LOG.error("Can't remove schemas for the table [tblId=" + tableId + 
']');
+        }
 
-        return schema;
+        return fut.thenCompose(r -> 
metaStorageMgr.removeAll(keys)).thenApply(v -> true);
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
similarity index 85%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
copy to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index d63d71c..8da7278 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
-
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+package org.apache.ignite.internal.schema;
 
 /**
- * Table schema manager interface.
+ * Table schema registry interface.
  */
-public interface TableSchemaView {
+public interface SchemaRegistry {
     /**
      * @return Current schema.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
similarity index 71%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
copy to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
index 140047d..d81e0d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
@@ -15,12 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.schema.event;
+
+import org.apache.ignite.internal.manager.Event;
 
 /**
- * Event parameters.
- * This type passed to the event listener.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * Schema management events.
  */
-public interface EventParameters {
+public enum SchemaEvent implements Event {
+    /** This event is fired when a schema was initialized. */
+    INITIALIZED,
+
+    /** This event is fired when a schema was dropped. */
+    DROPPED
 }
+
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
similarity index 55%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
rename to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
index dd74868..2982afc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
@@ -15,38 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.schema.event;
 
 import java.util.UUID;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 
 /**
- * Schema view implementation.
+ * Schema event parameters. There are properties which associate with a 
concrete schema.
  */
-public class TableSchemaViewImpl implements TableSchemaView {
+public class SchemaEventParameters implements EventParameters {
     /** Table identifier. */
     private final UUID tableId;
 
-    /** Schema manager. */
-    private final SchemaManager schemaManager;
+    /** Schema registry. */
+    private final SchemaRegistry reg;
 
     /**
      * @param tableId Table identifier.
-     * @param schemaManager Schema manager.
+     * @param reg Schema registry for the table.
      */
-    public TableSchemaViewImpl(UUID tableId, SchemaManager schemaManager) {
+    public SchemaEventParameters(UUID tableId, SchemaRegistry reg) {
         this.tableId = tableId;
-        this.schemaManager = schemaManager;
+        this.reg = reg;
     }
 
-    /** {@inheritDoc} */
-    @Override public SchemaDescriptor schema() {
-        return schemaManager.schema(tableId);
+    /**
+     * Get the table identifier.
+     *
+     * @return Table id.
+     */
+    public UUID tableId() {
+        return tableId;
     }
 
-    /** {@inheritDoc} */
-    @Override public SchemaDescriptor schema(int ver) {
-        return schemaManager.schema(tableId, ver);
+    /**
+     * Get schema registry for the table.
+     *
+     * @return Schema registry.
+     */
+    public SchemaRegistry schemaRegistry() {
+        return reg;
     }
-}
+}
\ No newline at end of file
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistrationConflictException.java
similarity index 65%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
copy to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistrationConflictException.java
index d63d71c..d9298ed 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistrationConflictException.java
@@ -15,22 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.schema.registry;
 
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
- * Table schema manager interface.
+ * Schema registration conflict exception is thown if
+ * registering schema's number was alredy registered earlier.
  */
-public interface TableSchemaView {
+public class SchemaRegistrationConflictException extends 
IgniteInternalException {
     /**
-     * @return Current schema.
+     * Constructor.
+     *
+     * @param msg Message.
      */
-    SchemaDescriptor schema();
-
-    /**
-     * @param ver Schema version.
-     * @return Schema of given version.
-     */
-    SchemaDescriptor schema(int ver);
+    public SchemaRegistrationConflictException(String msg) {
+        super(msg);
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryException.java
similarity index 61%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
rename to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryException.java
index d63d71c..efd657a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryException.java
@@ -15,22 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.schema.registry;
 
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
- * Table schema manager interface.
+ * Schema registration exception.
  */
-public interface TableSchemaView {
+public class SchemaRegistryException extends IgniteInternalException {
     /**
-     * @return Current schema.
+     * Constructor with error message.
+     *
+     * @param msg Message.
      */
-    SchemaDescriptor schema();
+    public SchemaRegistryException(String msg) {
+        super(msg);
+    }
 
     /**
-     * @param ver Schema version.
-     * @return Schema of given version.
+     * Constructor with error message and cause.
+     *
+     * @param cause Cause.
      */
-    SchemaDescriptor schema(int ver);
+    public SchemaRegistryException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
new file mode 100644
index 0000000..41f2340
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.registry;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Caching registry of actual schema descriptors for a table.
+ */
+public class SchemaRegistryImpl implements SchemaRegistry {
+    /** Initial schema version. */
+    public static final int INITIAL_SCHEMA_VERSION = -1;
+
+    /** Cached schemas. */
+    private final ConcurrentSkipListMap<Integer, SchemaDescriptor> schemaCache 
= new ConcurrentSkipListMap<>();
+
+    /** Last registered version. */
+    private volatile int lastVer;
+
+    /** Schema store. */
+    private final Function<Integer, SchemaDescriptor> history;
+
+    /**
+     * Default constructor.
+     *
+     * @param history Schema history.
+     */
+    public SchemaRegistryImpl(Function<Integer, SchemaDescriptor> history) {
+        lastVer = INITIAL_SCHEMA_VERSION;
+        this.history = history;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param history Schema history.
+     */
+    public SchemaRegistryImpl(int initialVer, Function<Integer, 
SchemaDescriptor> history) {
+        lastVer = initialVer;
+        this.history = history;
+    }
+
+    /**
+     * Gets schema descriptor for given version.
+     *
+     * @param ver Schema version to get descriptor for.
+     * @return Schema descriptor.
+     * @throws SchemaRegistryException If no schema found for given version.
+     */
+    @Override public SchemaDescriptor schema(int ver) {
+        SchemaDescriptor desc = schemaCache.get(ver);
+
+        if (desc != null)
+            return desc;
+
+        desc = history.apply(ver);
+
+        if (desc != null) {
+            schemaCache.putIfAbsent(ver, desc);
+
+            return desc;
+        }
+
+        if (lastVer < ver || ver <= 0)
+            throw new SchemaRegistryException("Incorrect schema version 
requested: ver=" + ver);
+        else
+            throw new SchemaRegistryException("Failed to find schema: ver=" + 
ver);
+    }
+
+    /**
+     * Gets schema descriptor for the latest version if initialized.
+     *
+     * @return Schema descriptor if initialized, {@code null} otherwise.
+     * @throws SchemaRegistryException If failed.
+     */
+    @Override public @Nullable SchemaDescriptor schema() {
+        final int lastVer0 = lastVer;
+
+        if (lastVer0 == INITIAL_SCHEMA_VERSION)
+            return null;
+
+       return schema(lastVer0);
+    }
+
+    /**
+     * @return Last known schema version.
+     */
+    public int lastSchemaVersion() {
+        return lastVer;
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param desc Schema descriptor.
+     * @throws SchemaRegistrationConflictException If schema of provided 
version was already registered.
+     * @throws SchemaRegistryException If schema of incorrect version provided.
+     */
+    public void onSchemaRegistered(SchemaDescriptor desc) {
+        if (lastVer == INITIAL_SCHEMA_VERSION) {
+            if (desc.version() != 1)
+                throw new SchemaRegistryException("Try to register schema of 
wrong version: ver=" + desc.version() + ", lastVer=" + lastVer);
+        }
+        else if (desc.version() != lastVer + 1) {
+            if (desc.version() > 0 && desc.version() <= lastVer)
+                throw new SchemaRegistrationConflictException("Schema with 
given version has been already registered: " + desc.version());
+
+            throw new SchemaRegistryException("Try to register schema of wrong 
version: ver=" + desc.version() + ", lastVer=" + lastVer);
+        }
+
+        schemaCache.put(desc.version(), desc);
+
+        lastVer = desc.version();
+    }
+
+    /**
+     * Cleanup given schema version from history.
+     *
+     * @param ver Schema version to remove.
+     * @throws SchemaRegistryException If incorrect schema version provided.
+     */
+    public void onSchemaDropped(int ver) {
+        if (ver >= lastVer || ver <= 0 || schemaCache.keySet().first() < ver)
+            throw new SchemaRegistryException("Incorrect schema version to 
clean up to: " + ver);
+
+        schemaCache.remove(ver);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/package-info.java
similarity index 80%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
copy to 
modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/package-info.java
index c245b5b..b6a1bfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/package-info.java
@@ -15,11 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.manager;
-
 /**
- * The event cas whcih is produced by event producer component.
- * @see Producer#onEvent(Event, EventParameters, Exception)
+ * Schema registry responsible for maintaining versioned schema history.
  */
-public interface Event {
-}
+package org.apache.ignite.internal.schema.registry;
\ No newline at end of file
diff --git 
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
new file mode 100644
index 0000000..fb20183
--- /dev/null
+++ 
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/SchemaRegistryImplTest.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import 
org.apache.ignite.internal.schema.registry.SchemaRegistrationConflictException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.ignite.internal.schema.NativeType.BYTES;
+import static org.apache.ignite.internal.schema.NativeType.LONG;
+import static org.apache.ignite.internal.schema.NativeType.STRING;
+import static 
org.apache.ignite.internal.schema.registry.SchemaRegistryImpl.INITIAL_SCHEMA_VERSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Schema manager test.
+ */
+public class SchemaRegistryImplTest {
+    /**
+     * Check registration of schema with wrong versions.
+     */
+    @Test
+    public void testWrongSchemaVersionRegistration() {
+        final SchemaDescriptor schemaV0 = new 
SchemaDescriptor(INITIAL_SCHEMA_VERSION,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(0,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+        assertNull(reg.schema());
+
+        // Try to register schema with initial version.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaRegistered(schemaV0));
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        assertNull(reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> 
reg.schema(INITIAL_SCHEMA_VERSION));
+
+        // Try to register schema with version of 0-zero.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaRegistered(schemaV1));
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        assertThrows(SchemaRegistryException.class, () -> 
reg.schema(INITIAL_SCHEMA_VERSION));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(0));
+
+        // Try to register schema with version of 2.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaRegistered(schemaV2));
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        assertThrows(SchemaRegistryException.class, () -> 
reg.schema(INITIAL_SCHEMA_VERSION));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(0));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+    }
+
+    /**
+     * Check initial schema registration.
+     */
+    @Test
+    public void testSchemaRegistration() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+        assertNull(reg.schema());
+
+        // Register schema with very first version.
+        reg.onSchemaRegistered(schemaV1);
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+
+        // Register schema with next version.
+        reg.onSchemaRegistered(schemaV2);
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+
+        // Try to register schema with version of 4.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaRegistered(schemaV4));
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(4));
+    }
+
+    /**
+     * Check schema registration.
+     */
+    @Test
+    public void testDuplucateSchemaRegistration() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor wrongSchema = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        // Register schema with very first version.
+        reg.onSchemaRegistered(schemaV1);
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+
+        // Try to register same schema once again.
+        assertThrows(SchemaRegistrationConflictException.class, () -> 
reg.onSchemaRegistered(schemaV1));
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+
+        // Try to register another schema with same version and check nothing 
was registered.
+        assertThrows(SchemaRegistrationConflictException.class, () -> 
reg.onSchemaRegistered(wrongSchema));
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertEquals(1, reg.schema().version());
+
+        assertSameSchema(schemaV1, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+
+        // Register schema with next version.
+        reg.onSchemaRegistered(schemaV2);
+
+        assertEquals(2, reg.lastSchemaVersion());
+
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+    }
+
+    /**
+     * Check schema cleanup.
+     */
+    @Test
+    public void testSchemaCleanup() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null);
+
+        assertEquals(INITIAL_SCHEMA_VERSION, reg.lastSchemaVersion());
+
+        // Fail to cleanup initial schema
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaDropped(INITIAL_SCHEMA_VERSION));
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaDropped(0));
+
+        // Register schema with very first version.
+        reg.onSchemaRegistered(schemaV1);
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertNotNull(reg.schema());
+        assertNotNull(reg.schema(1));
+
+        // Try to remove latest schema.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaDropped(1));
+
+        assertEquals(1, reg.lastSchemaVersion());
+        assertNotNull(reg.schema());
+        assertNotNull(reg.schema(1));
+
+        // Register new schema with next version.
+        reg.onSchemaRegistered(schemaV2);
+        reg.onSchemaRegistered(schemaV3);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertNotNull(reg.schema(1));
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+
+        // Remove outdated schema 1.
+        reg.onSchemaDropped(1);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+
+        // Remove non-existed schemas.
+        reg.onSchemaDropped(1);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+
+        // Register new schema with next version.
+        reg.onSchemaRegistered(schemaV4);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+        assertNotNull(reg.schema(4));
+
+        // Remove non-existed schemas.
+        reg.onSchemaDropped(1);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema());
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema(4));
+
+        // Out of order remove.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaDropped(3));
+
+        // Correct removal order.
+        reg.onSchemaDropped(2);
+        reg.onSchemaDropped(3);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema());
+        assertSameSchema(schemaV4, reg.schema(4));
+
+        // Try to remove latest schema.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaDropped(4));
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema(4));
+    }
+
+    /**
+     * Check schema registration with full history.
+     */
+    @Test
+    public void testInitialSchemaWithFullHistory() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, 
schemaV2);
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(2, history::get);
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+
+        // Register schema with duplicate version.
+        assertThrows(SchemaRegistrationConflictException.class, () -> 
reg.onSchemaRegistered(schemaV1));
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+
+        // Register schema with out-of-order version.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaRegistered(schemaV4));
+
+        assertEquals(2, reg.lastSchemaVersion());
+        assertSameSchema(schemaV2, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+
+        // Register schema with next version.
+        reg.onSchemaRegistered(schemaV3);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+        assertSameSchema(schemaV1, reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+    }
+
+    /**
+     * Check schema registration with history tail.
+     */
+    @Test
+    public void testInitialSchemaWithTailHistory() {
+        final SchemaDescriptor schemaV1 = new SchemaDescriptor(1,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valBytesCol", BYTES, true)});
+
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valStringCol", STRING, true)});
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV5 = new SchemaDescriptor(5,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {new Column("valStringCol", STRING, true)});
+
+        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, 
schemaV3);
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(3, history::get);
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+
+        // Register schema with duplicate version.
+        assertThrows(SchemaRegistrationConflictException.class, () -> 
reg.onSchemaRegistered(schemaV2));
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+
+        // Register schema with out-of-order version.
+        assertThrows(SchemaRegistryException.class, () -> 
reg.onSchemaRegistered(schemaV5));
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+
+        // Register schema with outdated version.
+        assertThrows(SchemaRegistrationConflictException.class, () -> 
reg.onSchemaRegistered(schemaV1));
+
+        assertEquals(3, reg.lastSchemaVersion());
+        assertSameSchema(schemaV3, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+
+        // Register schema with next version.
+        reg.onSchemaRegistered(schemaV4);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema());
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema(4));
+    }
+
+    /**
+     * Check schema cleanup.
+     */
+    @Test
+    public void testSchemaWithHistoryCleanup() {
+        final SchemaDescriptor schemaV2 = new SchemaDescriptor(2,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valStringCol", STRING, true)
+            });
+
+        final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
+            new Column[] {new Column("keyLongCol", LONG, true)},
+            new Column[] {
+                new Column("valBytesCol", BYTES, true),
+                new Column("valStringCol", STRING, true)
+            });
+
+        Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, 
schemaV3, schemaV4);
+
+        final SchemaRegistryImpl reg = new SchemaRegistryImpl(4, history::get);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertSameSchema(schemaV4, reg.schema());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
+        assertSameSchema(schemaV2, reg.schema(2));
+        assertSameSchema(schemaV3, reg.schema(3));
+        assertSameSchema(schemaV4, reg.schema(4));
+
+        history.remove(1);
+        reg.onSchemaDropped(1);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertNotNull(reg.schema(2));
+        assertNotNull(reg.schema(3));
+        assertNotNull(reg.schema(4));
+
+        history.remove(2);
+        history.remove(3);
+        reg.onSchemaDropped(2);
+        reg.onSchemaDropped(3);
+
+        assertEquals(4, reg.lastSchemaVersion());
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
+        assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
+        assertNotNull(reg.schema(4));
+    }
+
+    /**
+     * @param history Table schema history.
+     * @return Schema history map.
+     */
+    private Map<Integer, SchemaDescriptor> schemaHistory(SchemaDescriptor... 
history) {
+        return 
Arrays.stream(history).collect(Collectors.toMap(SchemaDescriptor::version, e -> 
e));
+    }
+
+    /**
+     * Validate schemas are equals.
+     *
+     * @param schemaDesc1 Schema descriptor to compare with.
+     * @param schemaDesc2 Schema descriptor to compare.
+     */
+    private void assertSameSchema(SchemaDescriptor schemaDesc1, 
SchemaDescriptor schemaDesc2) {
+        assertEquals(schemaDesc1.version(), schemaDesc2.version(), 
"Descriptors of different versions.");
+
+        assertTrue(SchemaManager.equalSchemas(schemaDesc1, schemaDesc2), 
"Schemas are not equals.");
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index f2e298c..7881e1b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertCommand;
 import 
org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
@@ -232,7 +232,7 @@ public class ITDistributedTableTest {
             UUID.randomUUID(),
             partMap,
             PARTS
-        ), new TableSchemaView() {
+        ), new SchemaRegistry() {
             @Override public SchemaDescriptor schema() {
                 return SCHEMA;
             }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
index 8eff2ca..e9728b4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
@@ -23,7 +23,7 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Marshaller interface.
+ * Tuple marshaller interface.
  *
  * @apiNote For key tuple marshalling, a {@code marshal(key, null)} method 
call must be used.
  * A {@code marshal(key} may return the same result, but it validates value 
columns even if not specified.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index cceb908..cf23e08 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.lang.IgniteInternalException;
 
 /**
@@ -29,16 +30,16 @@ abstract class AbstractTableView {
     protected final InternalTable tbl;
 
     /** Schema manager. */
-    protected final TableSchemaView schemaMgr;
+    protected final SchemaRegistry schemaReg;
 
     /**
      * Constructor
      * @param tbl Internal table.
-     * @param schemaMgr Schema manager.
+     * @param schemaReg Schema registry.
      */
-    protected AbstractTableView(InternalTable tbl, TableSchemaView schemaMgr) {
+    protected AbstractTableView(InternalTable tbl, SchemaRegistry schemaReg) {
         this.tbl = tbl;
-        this.schemaMgr = schemaMgr;
+        this.schemaReg = schemaReg;
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
index c113dc7..474f049 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.KeyValueBinaryView;
 import org.apache.ignite.table.Tuple;
@@ -47,12 +48,12 @@ public class KVBinaryViewImpl extends AbstractTableView 
implements KeyValueBinar
      * Constructor.
      *
      * @param tbl Table storage.
-     * @param schemaMgr Schema manager.
+     * @param schemaReg Schema registry.
      */
-    public KVBinaryViewImpl(InternalTable tbl, TableSchemaView schemaMgr) {
-        super(tbl, schemaMgr);
+    public KVBinaryViewImpl(InternalTable tbl, SchemaRegistry schemaReg) {
+        super(tbl, schemaReg);
 
-        marsh = new TupleMarshallerImpl(schemaMgr);
+        marsh = new TupleMarshallerImpl(schemaReg);
     }
 
     /** {@inheritDoc} */
@@ -285,7 +286,7 @@ public class KVBinaryViewImpl extends AbstractTableView 
implements KeyValueBinar
         if (row == null)
             return null;
 
-        final SchemaDescriptor schema = schemaMgr.schema(row.schemaVersion());
+        final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion());
 
         return new TableRow(schema, new Row(schema, row));
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
index dfb6538..6d3d10b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.KVSerializer;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.mapper.KeyMapper;
@@ -40,13 +41,13 @@ public class KVViewImpl<K, V> extends AbstractTableView 
implements KeyValueView<
      * Constructor.
      *
      * @param tbl Table storage.
-     * @param schemaMgr Schema manager.
+     * @param schemaReg Schema registry.
      * @param keyMapper Key class mapper.
      * @param valueMapper Value class mapper.
      */
-    public KVViewImpl(InternalTable tbl, TableSchemaView schemaMgr, 
KeyMapper<K> keyMapper,
+    public KVViewImpl(InternalTable tbl, SchemaRegistry schemaReg, 
KeyMapper<K> keyMapper,
         ValueMapper<V> valueMapper) {
-        super(tbl, schemaMgr);
+        super(tbl, schemaReg);
     }
 
     /** {@inheritDoc} */
@@ -238,7 +239,7 @@ public class KVViewImpl<K, V> extends AbstractTableView 
implements KeyValueView<
         if (row == null)
             return null;
 
-        final SchemaDescriptor rowSchema = 
schemaMgr.schema(row.schemaVersion()); // Get a schema for row.
+        final SchemaDescriptor rowSchema = 
schemaReg.schema(row.schemaVersion()); // Get a schema for row.
 
         return new Row(rowSchema, row);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 0d9ac93..f442bbb 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.RecordSerializer;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.mapper.RecordMapper;
@@ -39,11 +40,11 @@ public class RecordViewImpl<R> extends AbstractTableView 
implements RecordView<R
      * Constructor.
      *
      * @param tbl Table.
-     * @param schemaMgr Schema manager.
+     * @param schemaReg Schema registry.
      * @param mapper Record class mapper.
      */
-    public RecordViewImpl(InternalTable tbl, TableSchemaView schemaMgr, 
RecordMapper<R> mapper) {
-        super(tbl, schemaMgr);
+    public RecordViewImpl(InternalTable tbl, SchemaRegistry schemaReg, 
RecordMapper<R> mapper) {
+        super(tbl, schemaReg);
     }
 
     /** {@inheritDoc} */
@@ -264,7 +265,7 @@ public class RecordViewImpl<R> extends AbstractTableView 
implements RecordView<R
         if (row == null)
             return null;
 
-        final SchemaDescriptor rowSchema = 
schemaMgr.schema(row.schemaVersion()); // Get a schema for row.
+        final SchemaDescriptor rowSchema = 
schemaReg.schema(row.schemaVersion()); // Get a schema for row.
 
         return new Row(rowSchema, row);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
index ecc312f..9b0483a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table;
 
+import java.util.BitSet;
+import java.util.UUID;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjects;
 import org.apache.ignite.internal.schema.Column;
@@ -101,4 +103,18 @@ public abstract class RowChunkAdapter implements Tuple {
 
         return row().stringValue(col.schemaIndex());
     }
+
+    /** {@inheritDoc} */
+    @Override public UUID uuidValue(String colName) {
+        Column col = columnByName(colName);
+
+        return row().uuidValue(col.schemaIndex());
+    }
+
+    /** {@inheritDoc} */
+    @Override public BitSet bitmaskValue(String colName) {
+        Column col = columnByName(colName);
+
+        return row().bitmaskValue(col.schemaIndex());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 3fd11de..284be44 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.KeyValueBinaryView;
 import org.apache.ignite.table.KeyValueView;
@@ -49,12 +50,12 @@ public class TableImpl extends AbstractTableView implements 
Table {
      * Constructor.
      *
      * @param tbl Table.
-     * @param schemaMgr Table schema manager.
+     * @param schemaReg Table schema registry.
      */
-    public TableImpl(InternalTable tbl, TableSchemaView schemaMgr) {
-        super(tbl, schemaMgr);
+    public TableImpl(InternalTable tbl, SchemaRegistry schemaReg) {
+        super(tbl, schemaReg);
 
-        marsh = new TupleMarshallerImpl(schemaMgr);
+        marsh = new TupleMarshallerImpl(schemaReg);
     }
 
     /**
@@ -71,23 +72,23 @@ public class TableImpl extends AbstractTableView implements 
Table {
      *
      * @return Schema view.
      */
-    public TableSchemaView schemaView() {
-        return schemaMgr;
+    public SchemaRegistry schemaView() {
+        return schemaReg;
     }
 
     /** {@inheritDoc} */
     @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
-        return new RecordViewImpl<>(tbl, schemaMgr, recMapper);
+        return new RecordViewImpl<>(tbl, schemaReg, recMapper);
     }
 
     /** {@inheritDoc} */
     @Override public <K, V> KeyValueView<K, V> kvView(KeyMapper<K> keyMapper, 
ValueMapper<V> valMapper) {
-        return new KVViewImpl<>(tbl, schemaMgr, keyMapper, valMapper);
+        return new KVViewImpl<>(tbl, schemaReg, keyMapper, valMapper);
     }
 
     /** {@inheritDoc} */
     @Override public KeyValueBinaryView kvView() {
-        return new KVBinaryViewImpl(tbl, schemaMgr);
+        return new KVBinaryViewImpl(tbl, schemaReg);
     }
 
     /** {@inheritDoc} */
@@ -327,7 +328,7 @@ public class TableImpl extends AbstractTableView implements 
Table {
         if (row == null)
             return null;
 
-        final SchemaDescriptor schema = schemaMgr.schema(row.schemaVersion());
+        final SchemaDescriptor schema = schemaReg.schema(row.schemaVersion());
 
         return new TableRow(schema, new Row(schema, row));
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
index 4e0e850..d457077 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.table;
 
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjects;
 import org.apache.ignite.table.Tuple;
@@ -95,4 +97,14 @@ public class TupleBuilderImpl implements TupleBuilder, Tuple 
{
     @Override public String stringValue(String colName) {
         return value(colName);
     }
+
+    /** {@inheritDoc} */
+    @Override public UUID uuidValue(String colName) {
+        return value(colName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public BitSet bitmaskValue(String colName) {
+        return value(colName);
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
index 05c3ed7..f418f3c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -24,25 +24,26 @@ import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.RowAssembler;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.table.Tuple;
 import org.jetbrains.annotations.NotNull;
 
 import static 
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.getValueSize;
 
 /**
- * Marshaller implementation.
+ * Tuple marshaller implementation.
  */
 public class TupleMarshallerImpl implements TupleMarshaller {
     /** Schema manager. */
-    private final TableSchemaView schemaMgr;
+    private final SchemaRegistry schemaReg;
 
     /**
      * Constructor.
      *
-     * @param schemaMgr Schema manager.
+     * @param schemaReg Schema manager.
      */
-    public TupleMarshallerImpl(TableSchemaView schemaMgr) {
-        this.schemaMgr = schemaMgr;
+    public TupleMarshallerImpl(SchemaRegistry schemaReg) {
+        this.schemaReg = schemaReg;
     }
 
     /** {@inheritDoc} */
@@ -52,7 +53,7 @@ public class TupleMarshallerImpl implements TupleMarshaller {
 
     /** {@inheritDoc} */
     @Override public Row marshal(Tuple keyTuple, Tuple valTuple) {
-        final SchemaDescriptor schema = schemaMgr.schema();
+        final SchemaDescriptor schema = schemaReg.schema();
 
         assert keyTuple instanceof TupleBuilderImpl;
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index a5c1cc0..f2a24e8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -36,12 +36,15 @@ import 
org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.affinity.AffinityManager;
 import org.apache.ignite.internal.affinity.event.AffinityEvent;
+import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.event.SchemaEvent;
+import org.apache.ignite.internal.schema.event.SchemaEventParameters;
 import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableSchemaViewImpl;
 import 
org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.event.TableEvent;
@@ -83,7 +86,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     private final AffinityManager affMgr;
 
     /** Tables. */
-    private Map<String, TableImpl> tables = new ConcurrentHashMap<>();
+    private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
@@ -118,8 +121,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * @param name Table name.
      * @param tblId Table id.
      * @param assignment Affinity assignment.
+     * @param schemaReg Schema registry for the table.
      */
-    private void createTableLocally(String name, UUID tblId, 
List<List<ClusterNode>> assignment) {
+    private void createTableLocally(
+        String name,
+        UUID tblId,
+        List<List<ClusterNode>> assignment,
+        SchemaRegistry schemaReg
+    ) {
         int partitions = assignment.size();
 
         HashMap<Integer, RaftGroupService> partitionMap = new 
HashMap<>(partitions);
@@ -135,7 +144,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         onEvent(TableEvent.CREATE, new TableEventParameters(
             tblId,
             name,
-            new TableSchemaViewImpl(tblId, schemaMgr),
+            schemaReg,
             new InternalTableImpl(tblId, partitionMap, partitions)
         ), null);
     }
@@ -199,22 +208,20 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 UUID tblId = new UUID(revision, 0L);
 
                 if (hasMetastorageLocally) {
-                    var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
+                    var key = new Key(INTERNAL_PREFIX + tblId);
                     futs.add(metaStorageMgr.invoke(
                         Conditions.key(key).value().eq(null),
                         Operations.put(key, 
tableView.name().getBytes(StandardCharsets.UTF_8)),
-                        Operations.noop()).thenCompose(res ->
-                        affMgr.calculateAssignments(tblId)));
+                        Operations.noop())
+                        .thenCompose(res -> 
schemaMgr.initSchemaForTable(tblId, tableView.name()))
+                        .thenCompose(res -> 
affMgr.calculateAssignments(tblId)));
                 }
 
-                affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
-                    if (!tblId.equals(parameters.tableId()))
-                        return false;
+                final CompletableFuture<AffinityEventParameters> 
affinityReadyFut = new CompletableFuture<>();
+                final CompletableFuture<SchemaEventParameters> schemaReadyFut 
= new CompletableFuture<>();
 
-                    if (e == null)
-                        createTableLocally(tblName, tblId, 
parameters.assignment());
-                    else {
+                CompletableFuture.allOf(affinityReadyFut, schemaReadyFut)
+                    .exceptionally(e -> {
                         LOG.error("Failed to create a new table [name=" + 
tblName + ", id=" + tblId + ']', e);
 
                         onEvent(TableEvent.CREATE, new TableEventParameters(
@@ -223,7 +230,36 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                             null,
                             null
                         ), e);
-                    }
+
+                        return null;
+                    })
+                    .thenRun(() -> createTableLocally(
+                        tblName,
+                        tblId,
+                        affinityReadyFut.join().assignment(),
+                        schemaReadyFut.join().schemaRegistry()
+                    ));
+
+                affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
+                    if (!tblId.equals(parameters.tableId()))
+                        return false;
+
+                    if (e == null)
+                        affinityReadyFut.complete(parameters);
+                    else
+                        affinityReadyFut.completeExceptionally(e);
+
+                    return true;
+                });
+
+                schemaMgr.listen(SchemaEvent.INITIALIZED, (parameters, e) -> {
+                    if (!tblId.equals(parameters.tableId()))
+                        return false;
+
+                    if (e == null)
+                        schemaReadyFut.complete(parameters);
+                    else
+                        schemaReadyFut.completeExceptionally(e);
 
                     return true;
                 });
@@ -240,12 +276,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 UUID tblId = t.internalTable().tableId();
 
                 if (hasMetastorageLocally) {
-                    var key = new Key(INTERNAL_PREFIX + tblId.toString());
-
-                    futs.add(affMgr.removeAssignment(tblId).thenCompose(res ->
-                        
metaStorageMgr.invoke(Conditions.key(key).value().ne(null),
-                            Operations.remove(key),
-                            Operations.noop())));
+                    var key = new Key(INTERNAL_PREFIX + tblId);
+
+                    futs.add(affMgr.removeAssignment(tblId)
+                        .thenCompose(res -> schemaMgr.unregisterSchemas(tblId))
+                        .thenCompose(res ->
+                            
metaStorageMgr.invoke(Conditions.key(key).value().ne(null),
+                                Operations.remove(key),
+                                Operations.noop())));
                 }
 
                 affMgr.listen(AffinityEvent.REMOVED, (parameters, e) -> {
@@ -304,8 +342,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     @Override public void dropTable(String name) {
         CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
 
-        listen(TableEvent.DROP, new BiPredicate<TableEventParameters, 
Exception>() {
-            @Override public boolean test(TableEventParameters params, 
Exception e) {
+        listen(TableEvent.DROP, new BiPredicate<>() {
+            @Override public boolean test(TableEventParameters params, 
Throwable e) {
                 String tableName = params.tableName();
 
                 if (!name.equals(tableName))
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
index 853617d..4e0a750 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.table.event;
 import java.util.UUID;
 import org.apache.ignite.internal.manager.EventParameters;
 import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 
 /**
  * Table event parameters.
@@ -34,7 +34,7 @@ public class TableEventParameters implements EventParameters {
     private final String tableName;
 
     /** Table schema view. */
-    private final TableSchemaView tableSchemaView;
+    private final SchemaRegistry schemaRegistry;
 
     /** Internal table. */
     private final InternalTable internalTable;
@@ -42,18 +42,18 @@ public class TableEventParameters implements 
EventParameters {
     /**
      * @param tableId Table identifier.
      * @param tableName Table name.
-     * @param tableSchemaView Table schema view.
+     * @param schemaRegistry Table schema view.
      * @param internalTable Internal table.
      */
     public TableEventParameters(
         UUID tableId,
         String tableName,
-        TableSchemaView tableSchemaView,
+        SchemaRegistry schemaRegistry,
         InternalTable internalTable
     ) {
         this.tableId = tableId;
         this.tableName = tableName;
-        this.tableSchemaView = tableSchemaView;
+        this.schemaRegistry = schemaRegistry;
         this.internalTable = internalTable;
     }
 
@@ -80,8 +80,8 @@ public class TableEventParameters implements EventParameters {
      *
      * @return Schema descriptor.
      */
-    public TableSchemaView tableSchemaView() {
-        return tableSchemaView;
+    public SchemaRegistry tableSchemaView() {
+        return schemaRegistry;
     }
 
     /**
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
index 3b015a3..9ecaec4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummySchemaManagerImpl.java
@@ -18,13 +18,13 @@
 package org.apache.ignite.internal.table.impl;
 
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * Dummy schema manager for tests.
  */
-public class DummySchemaManagerImpl implements TableSchemaView {
+public class DummySchemaManagerImpl implements SchemaRegistry {
     /** Schema. */
     private final SchemaDescriptor schema;
 

Reply via email to