This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 63cf2beecb IGNITE-19646 Transform IndexManager to internally work 
against Catalog event types (#2143)
63cf2beecb is described below

commit 63cf2beecb6bbd954ba387a4aed5bd708c92476f
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jun 7 12:29:30 2023 +0300

    IGNITE-19646 Transform IndexManager to internally work against Catalog 
event types (#2143)
---
 modules/catalog/build.gradle                       |   1 +
 .../descriptors/CatalogDescriptorUtils.java        | 191 +++++++++++++++++++++
 .../apache/ignite/internal/index/IndexManager.java | 173 ++++++++++---------
 .../sql/engine/ClusterPerClassIntegrationTest.java |  31 +++-
 .../configuration/SchemaConfigurationUtils.java    |  63 +++++++
 modules/storage-api/build.gradle                   |   2 +
 .../internal/storage/engine/MvTableStorage.java    |   2 +-
 .../storage/index/HashIndexDescriptor.java         | 100 +++--------
 .../internal/storage/index/IndexDescriptor.java    |  33 ++--
 .../storage/index/SortedIndexDescriptor.java       | 110 ++++--------
 .../storage/AbstractMvTableStorageTest.java        |  69 ++------
 .../internal/storage/impl/TestMvTableStorage.java  |   3 +-
 .../index/AbstractHashIndexStorageTest.java        |  11 +-
 .../index/AbstractSortedIndexStorageTest.java      |  11 +-
 modules/storage-page-memory/build.gradle           |   1 +
 .../pagememory/AbstractPageMemoryTableStorage.java |   3 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  49 ++++--
 modules/storage-rocksdb/build.gradle               |   1 +
 .../storage/rocksdb/RocksDbTableStorage.java       |  40 +++--
 .../replicator/PartitionReplicaListener.java       |  36 +++-
 20 files changed, 577 insertions(+), 353 deletions(-)

diff --git a/modules/catalog/build.gradle b/modules/catalog/build.gradle
index 5fb5e02c74..fe4565e235 100644
--- a/modules/catalog/build.gradle
+++ b/modules/catalog/build.gradle
@@ -28,6 +28,7 @@ dependencies {
     implementation project(':ignite-configuration')
     implementation project(':ignite-metastorage-api')
     implementation project(':ignite-vault')
+    implementation project(':ignite-schema')
     implementation libs.jetbrains.annotations
     implementation libs.auto.service.annotations
 
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptorUtils.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptorUtils.java
new file mode 100644
index 0000000000..ccc2e5dd86
--- /dev/null
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogDescriptorUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.descriptors;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.descriptors.ColumnCollation.ASC_NULLS_LAST;
+import static 
org.apache.ignite.internal.catalog.descriptors.ColumnCollation.DESC_NULLS_FIRST;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.configuration.ColumnTypeView;
+import org.apache.ignite.internal.schema.configuration.ColumnView;
+import 
org.apache.ignite.internal.schema.configuration.ConfigurationToSchemaDescriptorConverter;
+import org.apache.ignite.internal.schema.configuration.PrimaryKeyView;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import 
org.apache.ignite.internal.schema.configuration.ValueSerializationHelper;
+import 
org.apache.ignite.internal.schema.configuration.defaultvalue.ColumnDefaultConfigurationSchema;
+import 
org.apache.ignite.internal.schema.configuration.defaultvalue.ColumnDefaultView;
+import 
org.apache.ignite.internal.schema.configuration.defaultvalue.ConstantValueDefaultView;
+import 
org.apache.ignite.internal.schema.configuration.defaultvalue.FunctionCallDefaultView;
+import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
+import org.apache.ignite.internal.schema.configuration.index.IndexColumnView;
+import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
+import 
org.apache.ignite.internal.schema.configuration.index.TableIndexConfigurationSchema;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
+
+/**
+ * Helper class for working with catalog descriptors.
+ */
+// TODO: IGNITE-19499 Get rid of the table configuration
+// TODO: IGNITE-19500 Get rid of the index configuration
+public class CatalogDescriptorUtils {
+    /**
+     * Converts a table configuration to a catalog table descriptor.
+     *
+     * @param config Table configuration.
+     */
+    public static TableDescriptor toTableDescriptor(TableView config) {
+        PrimaryKeyView primaryKeyConfig = config.primaryKey();
+
+        return new TableDescriptor(
+                config.id(),
+                config.name(),
+                
config.columns().stream().map(CatalogDescriptorUtils::toTableColumnDescriptor).collect(toList()),
+                List.of(primaryKeyConfig.columns()),
+                List.of(primaryKeyConfig.colocationColumns())
+        );
+    }
+
+    /**
+     * Converts a index configuration to a catalog index descriptor.
+     *
+     * @param config Index configuration.
+     */
+    public static IndexDescriptor toIndexDescriptor(TableIndexView config) {
+        switch (config.type()) {
+            case TableIndexConfigurationSchema.HASH_INDEX_TYPE:
+                return toHashIndexDescriptor(((HashIndexView) config));
+            case TableIndexConfigurationSchema.SORTED_INDEX_TYPE:
+                return toSortedIndexDescriptor(((SortedIndexView) config));
+            default:
+                throw new IllegalArgumentException("Unknown index type:" + 
config);
+        }
+    }
+
+    /**
+     * Converts a hash index configuration to a catalog hash index descriptor.
+     *
+     * @param config Hash index configuration.
+     */
+    public static HashIndexDescriptor toHashIndexDescriptor(HashIndexView 
config) {
+        return new HashIndexDescriptor(config.id(), config.name(), 
config.tableId(), config.uniq(), List.of(config.columnNames()));
+    }
+
+    /**
+     * Converts a sorted index configuration to a catalog hash index 
descriptor.
+     *
+     * @param config Sorted index configuration.
+     */
+    public static SortedIndexDescriptor 
toSortedIndexDescriptor(SortedIndexView config) {
+        return new SortedIndexDescriptor(
+                config.id(),
+                config.name(),
+                config.tableId(),
+                config.uniq(),
+                
config.columns().stream().map(CatalogDescriptorUtils::toIndexColumnDescriptor).collect(toList())
+        );
+    }
+
+    /**
+     * Gets the column native type from the catalog table column descriptor.
+     *
+     * @param column Table column descriptor.
+     */
+    public static NativeType getNativeType(TableColumnDescriptor column) {
+        switch (column.type()) {
+            case INT8:
+                return NativeTypes.INT8;
+            case INT16:
+                return NativeTypes.INT16;
+            case INT32:
+                return NativeTypes.INT32;
+            case INT64:
+                return NativeTypes.INT64;
+            case FLOAT:
+                return NativeTypes.FLOAT;
+            case DOUBLE:
+                return NativeTypes.DOUBLE;
+            case DECIMAL:
+                return NativeTypes.decimalOf(column.precision(), 
column.scale());
+            case NUMBER:
+                return NativeTypes.numberOf(column.precision());
+            case DATE:
+                return NativeTypes.DATE;
+            case TIME:
+                return NativeTypes.time(column.precision());
+            case DATETIME:
+                return NativeTypes.datetime(column.precision());
+            case TIMESTAMP:
+                return NativeTypes.timestamp(column.precision());
+            case UUID:
+                return NativeTypes.UUID;
+            case BITMASK:
+                return NativeTypes.bitmaskOf(column.length());
+            case STRING:
+                return NativeTypes.stringOf(column.length());
+            case BYTE_ARRAY:
+                return NativeTypes.blobOf(column.length());
+            default:
+                throw new IllegalArgumentException("Unknown type: " + 
column.type());
+        }
+    }
+
+    private static TableColumnDescriptor toTableColumnDescriptor(ColumnView 
config) {
+        ColumnTypeView typeConfig = config.type();
+
+        NativeType nativeType = 
ConfigurationToSchemaDescriptorConverter.convert(typeConfig);
+
+        return new TableColumnDescriptor(
+                config.name(),
+                nativeType.spec().asColumnType(),
+                config.nullable(),
+                typeConfig.precision(),
+                typeConfig.scale(),
+                typeConfig.length(),
+                toDefaultValue(nativeType, config.defaultValueProvider())
+        );
+    }
+
+    private static DefaultValue toDefaultValue(NativeType columnType, 
ColumnDefaultView config) {
+        switch (config.type()) {
+            case ColumnDefaultConfigurationSchema.NULL_VALUE_TYPE:
+                return DefaultValue.constant(null);
+            case ColumnDefaultConfigurationSchema.CONSTANT_VALUE_TYPE:
+                String defaultValue = ((ConstantValueDefaultView) 
config).defaultValue();
+
+                return 
DefaultValue.constant(ValueSerializationHelper.fromString(defaultValue, 
columnType));
+            case ColumnDefaultConfigurationSchema.FUNCTION_CALL_TYPE:
+                String functionName = ((FunctionCallDefaultView) 
config).functionName();
+
+                return DefaultValue.functionCall(functionName);
+            default:
+                throw new IllegalArgumentException("Unknown default value:" + 
config);
+        }
+    }
+
+    private static IndexColumnDescriptor 
toIndexColumnDescriptor(IndexColumnView config) {
+        //TODO IGNITE-15141: Make null-order configurable.
+        // NULLS FIRST for DESC, NULLS LAST for ASC by default.
+        ColumnCollation collation = config.asc() ? ASC_NULLS_LAST : 
DESC_NULLS_FIRST;
+
+        return new IndexColumnDescriptor(config.name(), collation);
+    }
+}
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 443819e572..07da99ce5b 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.index;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static 
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findTableView;
 import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
 
 import java.util.ArrayList;
@@ -34,6 +36,8 @@ import java.util.function.Function;
 import org.apache.ignite.configuration.NamedListView;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.catalog.descriptors.IndexColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.index.event.IndexEvent;
 import org.apache.ignite.internal.index.event.IndexEventParameters;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -51,9 +55,6 @@ import 
org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesView;
 import org.apache.ignite.internal.schema.configuration.index.HashIndexChange;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
-import org.apache.ignite.internal.schema.configuration.index.IndexColumnView;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
 import org.apache.ignite.internal.schema.configuration.index.TableIndexChange;
 import 
org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
 import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
@@ -70,8 +71,6 @@ import org.apache.ignite.lang.IndexAlreadyExistsException;
 import org.apache.ignite.lang.IndexNotFoundException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableNotFoundException;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * An Ignite component that is responsible for handling index-related commands 
like CREATE or DROP
@@ -338,7 +337,7 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
 
         for (TableIndexView cfg : tablesCfg.indexes().value()) {
             if (targetTableId == null) {
-                TableView tbl = findTableView(cfg.tableId(), tablesView);
+                TableView tbl = findTableView(tablesView, cfg.tableId());
 
                 if (tbl == null || !tableName.equals(tbl.name())) {
                     continue;
@@ -355,17 +354,6 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         return res;
     }
 
-    @Nullable
-    private static TableView findTableView(int tableId, 
NamedListView<TableView> tablesView) {
-        for (TableView tableView : tablesView) {
-            if (tableView.id() == tableId) {
-                return tableView;
-            }
-        }
-
-        return null;
-    }
-
     private void validateName(String indexName) {
         if (StringUtils.nullOrEmpty(indexName)) {
             throw new IgniteInternalException(
@@ -422,10 +410,12 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
      * @return A future.
      */
     private CompletableFuture<?> 
onIndexCreate(ConfigurationNotificationEvent<TableIndexView> evt) {
-        int tableId = evt.newValue().tableId();
+        TableIndexView indexConfig = evt.newValue();
+
+        int tableId = indexConfig.tableId();
 
         if (!busyLock.enterBusy()) {
-            int idxId = evt.newValue().id();
+            int idxId = indexConfig.id();
 
             fireEvent(IndexEvent.CREATE,
                     new IndexEventParameters(evt.storageRevision(), tableId, 
idxId),
@@ -436,7 +426,16 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         }
 
         try {
-            return createIndexLocally(evt.storageRevision(), tableId, 
evt.newValue(), evt.newValue(TablesView.class));
+            TablesView tablesView = evt.newValue(TablesView.class);
+
+            TableView tableView = findTableView(tablesView.tables(), tableId);
+
+            assert tableView != null : "tableId=" + tableId + ", indexId=" + 
indexConfig.id();
+
+            TableDescriptor tableDescriptor = toTableDescriptor(tableView);
+            org.apache.ignite.internal.catalog.descriptors.IndexDescriptor 
indexDescriptor = toIndexDescriptor(indexConfig);
+
+            return createIndexLocally(evt.storageRevision(), tableDescriptor, 
indexDescriptor);
         } finally {
             busyLock.leaveBusy();
         }
@@ -444,23 +443,21 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
 
     private CompletableFuture<?> createIndexLocally(
             long causalityToken,
-            int tableId,
-            TableIndexView tableIndexView,
-            TablesView tablesView
+            TableDescriptor tableDescriptor,
+            org.apache.ignite.internal.catalog.descriptors.IndexDescriptor 
indexDescriptor
     ) {
-        assert tableIndexView != null;
-
-        int indexId = tableIndexView.id();
+        int tableId = tableDescriptor.id();
+        int indexId = indexDescriptor.id();
 
         LOG.trace("Creating local index: name={}, id={}, tableId={}, token={}",
-                tableIndexView.name(), indexId, tableId, causalityToken);
+                indexDescriptor.name(), indexId, tableId, causalityToken);
 
-        IndexDescriptor descriptor = newDescriptor(tableIndexView);
+        IndexDescriptor eventIndexDescriptor = 
toEventIndexDescriptor(indexDescriptor);
 
-        org.apache.ignite.internal.storage.index.IndexDescriptor 
storageIndexDescriptor = createIndexDescriptor(tablesView, indexId);
+        var storageIndexDescriptor = 
org.apache.ignite.internal.storage.index.IndexDescriptor.create(tableDescriptor,
 indexDescriptor);
 
         CompletableFuture<?> fireEventFuture =
-                fireEvent(IndexEvent.CREATE, new 
IndexEventParameters(causalityToken, tableId, indexId, descriptor));
+                fireEvent(IndexEvent.CREATE, new 
IndexEventParameters(causalityToken, tableId, indexId, eventIndexDescriptor));
 
         CompletableFuture<TableImpl> tableFuture = 
tableManager.tableAsync(causalityToken, tableId);
 
@@ -469,22 +466,24 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         CompletableFuture<?> createIndexFuture = 
tableFuture.thenAcceptBoth(schemaRegistryFuture, (table, schemaRegistry) -> {
             TableRowToIndexKeyConverter tableRowConverter = new 
TableRowToIndexKeyConverter(
                     schemaRegistry,
-                    descriptor.columns().toArray(STRING_EMPTY_ARRAY)
+                    eventIndexDescriptor.columns().toArray(STRING_EMPTY_ARRAY)
             );
 
-            if (descriptor instanceof SortedIndexDescriptor) {
+            if (eventIndexDescriptor instanceof SortedIndexDescriptor) {
                 table.registerSortedIndex(
                         
(org.apache.ignite.internal.storage.index.SortedIndexDescriptor) 
storageIndexDescriptor,
                         tableRowConverter::convert
                 );
             } else {
+                boolean unique = indexDescriptor.unique();
+
                 table.registerHashIndex(
                         (HashIndexDescriptor) storageIndexDescriptor,
-                        tableIndexView.uniq(),
+                        unique,
                         tableRowConverter::convert
                 );
 
-                if (tableIndexView.uniq()) {
+                if (unique) {
                     table.pkId(indexId);
                 }
             }
@@ -493,44 +492,6 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
         return allOf(createIndexFuture, fireEventFuture);
     }
 
-    private IndexDescriptor newDescriptor(TableIndexView indexView) {
-        if (indexView instanceof SortedIndexView) {
-            return convert((SortedIndexView) indexView);
-        } else if (indexView instanceof HashIndexView) {
-            return convert((HashIndexView) indexView);
-        }
-
-        throw new AssertionError("Unknown index type [type=" + (indexView != 
null ? indexView.getClass() : null) + ']');
-    }
-
-    private IndexDescriptor convert(HashIndexView indexView) {
-        return new IndexDescriptor(
-                indexView.name(),
-                Arrays.asList(indexView.columnNames())
-        );
-    }
-
-    private SortedIndexDescriptor convert(SortedIndexView indexView) {
-        int colsCount = indexView.columns().size();
-        var indexedColumns = new ArrayList<String>(colsCount);
-        var collations = new ArrayList<ColumnCollation>(colsCount);
-
-        for (IndexColumnView columnView : indexView.columns()) {
-            //TODO IGNITE-15141: Make null-order configurable.
-            // NULLS FIRST for DESC, NULLS LAST for ASC by default.
-            boolean nullsFirst = !columnView.asc();
-
-            indexedColumns.add(columnView.name());
-            collations.add(ColumnCollation.get(columnView.asc(), nullsFirst));
-        }
-
-        return new SortedIndexDescriptor(
-                indexView.name(),
-                indexedColumns,
-                collations
-        );
-    }
-
     /**
      * This class encapsulates the logic of conversion from table row to a 
particular index key.
      */
@@ -616,30 +577,76 @@ public class IndexManager extends Producer<IndexEvent, 
IndexEventParameters> imp
     }
 
     private class ConfigurationListener implements 
ConfigurationNamedListListener<TableIndexView> {
-        /** {@inheritDoc} */
         @Override
-        public @NotNull CompletableFuture<?> onCreate(@NotNull 
ConfigurationNotificationEvent<TableIndexView> ctx) {
+        public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableIndexView> ctx) {
             return onIndexCreate(ctx);
         }
 
-        /** {@inheritDoc} */
         @Override
-        public @NotNull CompletableFuture<?> onRename(
-                ConfigurationNotificationEvent<TableIndexView> ctx
-        ) {
+        public CompletableFuture<?> 
onRename(ConfigurationNotificationEvent<TableIndexView> ctx) {
             return failedFuture(new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-16196";));
         }
 
-        /** {@inheritDoc} */
         @Override
-        public @NotNull CompletableFuture<?> onDelete(@NotNull 
ConfigurationNotificationEvent<TableIndexView> ctx) {
+        public CompletableFuture<?> 
onDelete(ConfigurationNotificationEvent<TableIndexView> ctx) {
             return onIndexDrop(ctx);
         }
 
-        /** {@inheritDoc} */
         @Override
-        public @NotNull CompletableFuture<?> onUpdate(@NotNull 
ConfigurationNotificationEvent<TableIndexView> ctx) {
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<TableIndexView> ctx) {
             return failedFuture(new IllegalStateException("Should not be 
called"));
         }
     }
+
+    /**
+     * Converts a catalog index descriptor to an event index descriptor.
+     *
+     * @param descriptor Catalog index descriptor.
+     */
+    private static IndexDescriptor 
toEventIndexDescriptor(org.apache.ignite.internal.catalog.descriptors.IndexDescriptor
 descriptor) {
+        if (descriptor instanceof 
org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor) {
+            return 
toEventHashIndexDescriptor(((org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor)
 descriptor));
+        }
+
+        if (descriptor instanceof 
org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor) {
+            return 
toEventSortedIndexDescriptor(((org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor)
 descriptor));
+        }
+
+        throw new IllegalArgumentException("Unknown index type: " + 
descriptor);
+    }
+
+    /**
+     * Converts a catalog hash index descriptor to an event hash index 
descriptor.
+     *
+     * @param descriptor Catalog hash index descriptor.
+     */
+    private static IndexDescriptor toEventHashIndexDescriptor(
+            org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor 
descriptor
+    ) {
+        return new IndexDescriptor(descriptor.name(), descriptor.columns());
+    }
+
+    /**
+     * Converts a catalog sorted index descriptor to an event sorted index 
descriptor.
+     *
+     * @param descriptor Catalog sorted index descriptor.
+     */
+    private static SortedIndexDescriptor toEventSortedIndexDescriptor(
+            
org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor descriptor
+    ) {
+        List<String> columns = new ArrayList<>(descriptor.columns().size());
+        List<ColumnCollation> collations = new 
ArrayList<>(descriptor.columns().size());
+
+        for (IndexColumnDescriptor column : descriptor.columns()) {
+            columns.add(column.name());
+
+            collations.add(toEventCollation(column.collation()));
+        }
+
+        return new SortedIndexDescriptor(descriptor.name(), columns, 
collations);
+    }
+
+    private static ColumnCollation 
toEventCollation(org.apache.ignite.internal.catalog.descriptors.ColumnCollation 
collation) {
+        return ColumnCollation.get(collation.asc(), collation.nullsFirst());
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 5fb5b0118d..74bf72b8cd 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -18,8 +18,9 @@
 package org.apache.ignite.internal.sql.engine;
 
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
 import static 
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
-import static 
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -48,13 +49,17 @@ import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesView;
 import 
org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
@@ -512,11 +517,16 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
                     continue;
                 }
 
-                TablesView tablesView = 
getTablesConfiguration(clusterNode).value();
+                TableView tableView = getTableConfiguration(clusterNode, 
tableName).value();
+                TableIndexView indexView = getIndexConfiguration(clusterNode, 
indexName).value();
 
-                int indexId = 
tablesView.indexes().get(indexName.toUpperCase()).id();
+                TableDescriptor catalogTableDescriptor = 
toTableDescriptor(tableView);
+                IndexDescriptor catalogIndexDescriptor = 
toIndexDescriptor(indexView);
 
-                IndexStorage index = 
internalTable.storage().getOrCreateIndex(partitionId, 
createIndexDescriptor(tablesView, indexId));
+                IndexStorage index = internalTable.storage().getOrCreateIndex(
+                        partitionId,
+                        
org.apache.ignite.internal.storage.index.IndexDescriptor.create(catalogTableDescriptor,
 catalogIndexDescriptor)
+                );
 
                 assertTrue(waitForCondition(() -> index.getNextRowIdToBuild() 
== null, 10, TimeUnit.SECONDS.toMillis(10)));
 
@@ -574,4 +584,15 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
 
         return indexConfig == null ? null : indexConfig.id().value();
     }
+
+    /**
+     * Returns table configuration of the given table at the given node, or 
{@code null} if no such table exists.
+     *
+     * @param node Node.
+     * @param tableName Table name.
+     * @return Table configuration.
+     */
+    private static @Nullable TableConfiguration getTableConfiguration(Ignite 
node, String tableName) {
+        return 
getTablesConfiguration(node).tables().get(tableName.toUpperCase());
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationUtils.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationUtils.java
new file mode 100644
index 0000000000..9562494dce
--- /dev/null
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema.configuration;
+
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for working with schema configuration.
+ */
+public class SchemaConfigurationUtils {
+    /**
+     * Looks up the index configuration by ID, {@code null} if absent.
+     *
+     * @param tablesView Tables configuration.
+     * @param indexId Index ID.
+     */
+    public static @Nullable TableIndexView findIndexView(TablesView 
tablesView, int indexId) {
+        return tablesView.indexes().stream()
+                .filter(tableIndexView -> indexId == tableIndexView.id())
+                .findFirst()
+                .orElse(null);
+    }
+
+    /**
+     * Looks up the table configuration by ID, {@code null} if absent.
+     *
+     * @param tablesView Tables configuration.
+     * @param tableId Table ID.
+     */
+    public static @Nullable TableView findTableView(TablesView tablesView, int 
tableId) {
+        return findTableView(tablesView.tables(), tableId);
+    }
+
+    /**
+     * Looks up the table configuration by ID, {@code null} if absent.
+     *
+     * @param tableListView Table list configuration.
+     * @param tableId Table ID.
+     */
+    public static @Nullable TableView findTableView(NamedListView<? extends 
TableView> tableListView, int tableId) {
+        return tableListView.stream()
+                .filter(tableView -> tableId == tableView.id())
+                .findFirst()
+                .orElse(null);
+    }
+}
diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle
index a9db9ba2d6..81555f0db7 100644
--- a/modules/storage-api/build.gradle
+++ b/modules/storage-api/build.gradle
@@ -29,6 +29,7 @@ dependencies {
     implementation project(':ignite-distribution-zones')
     implementation project(':ignite-configuration')
     implementation project(":ignite-core")
+    implementation project(":ignite-catalog")
     implementation libs.jetbrains.annotations
     implementation libs.auto.service.annotations
 
@@ -52,6 +53,7 @@ dependencies {
     testFixturesImplementation project(':ignite-distribution-zones')
     testFixturesImplementation project(':ignite-schema')
     testFixturesImplementation project(':ignite-api')
+    testFixturesImplementation project(':ignite-catalog')
     testFixturesImplementation(testFixtures(project(':ignite-core')))
     testFixturesImplementation(testFixtures(project(':ignite-configuration')))
     testFixturesImplementation(testFixtures(project(':ignite-schema')))
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
index b6a2a3b866..edbdc7faec 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java
@@ -287,5 +287,5 @@ public interface MvTableStorage extends ManuallyCloseable {
      * @throws StorageException If the given partition does not exist.
      */
     // TODO: IGNITE-19112 Change or get rid of
-    @Nullable IndexStorage getIndex(int partitionId, UUID indexId);
+    @Nullable IndexStorage getIndex(int partitionId, int indexId);
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
index e93a8e9088..f6265c8fc0 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/HashIndexDescriptor.java
@@ -17,21 +17,12 @@
 
 package org.apache.ignite.internal.storage.index;
 
-import static java.util.stream.Collectors.toUnmodifiableList;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.getNativeType;
 
-import java.util.Arrays;
 import java.util.List;
-import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.schema.configuration.ColumnView;
-import 
org.apache.ignite.internal.schema.configuration.ConfigurationToSchemaDescriptorConverter;
-import org.apache.ignite.internal.schema.configuration.TableView;
-import org.apache.ignite.internal.schema.configuration.TablesView;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
-import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.tostring.S;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Descriptor for creating a Hash Index Storage.
@@ -62,17 +53,6 @@ public class HashIndexDescriptor implements IndexDescriptor {
             this.nullable = nullable;
         }
 
-        /**
-         * Creates a Column Descriptor.
-         *
-         * @param tableColumnView Table column configuration.
-         */
-        HashIndexColumnDescriptor(ColumnView tableColumnView) {
-            this.name = tableColumnView.name();
-            this.type = 
ConfigurationToSchemaDescriptorConverter.convert(tableColumnView.type());
-            this.nullable = tableColumnView.nullable();
-        }
-
         @Override
         public String name() {
             return name;
@@ -99,13 +79,16 @@ public class HashIndexDescriptor implements IndexDescriptor 
{
     private final List<HashIndexColumnDescriptor> columns;
 
     /**
-     * Creates an Index Descriptor from a given Table Configuration.
+     * Constructor.
      *
-     * @param indexId Index id.
-     * @param tablesConfig Tables and indexes configuration.
+     * @param table Catalog table descriptor.
+     * @param index Catalog index descriptor.
      */
-    public HashIndexDescriptor(int indexId, TablesView tablesConfig) {
-        this(indexId, extractIndexColumnsConfiguration(indexId, tablesConfig));
+    public HashIndexDescriptor(
+            org.apache.ignite.internal.catalog.descriptors.TableDescriptor 
table,
+            org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor 
index
+    ) {
+        this(index.id(), extractIndexColumnsConfiguration(table, index));
     }
 
     /**
@@ -119,52 +102,6 @@ public class HashIndexDescriptor implements 
IndexDescriptor {
         this.columns = columns;
     }
 
-    private static List<HashIndexColumnDescriptor> 
extractIndexColumnsConfiguration(int indexId, TablesView tablesConfig) {
-        TableIndexView indexConfig = tablesConfig.indexes().stream()
-                .filter(tableIndexView -> tableIndexView.id() == indexId)
-                .findFirst()
-                .orElse(null);
-
-        if (indexConfig == null) {
-            throw new StorageException(String.format("Index configuration for 
\"%s\" could not be found", indexId));
-        }
-
-        if (!(indexConfig instanceof HashIndexView)) {
-            throw new StorageException(String.format(
-                    "Index \"%s\" is not configured as a Hash Index. Actual 
type: %s",
-                    indexConfig.id(), indexConfig.type()
-            ));
-        }
-
-        TableView tableConfig = findTableById(indexConfig.tableId(), 
tablesConfig.tables());
-
-        if (tableConfig == null) {
-            throw new StorageException(String.format("Table configuration for 
\"%s\" could not be found", indexConfig.tableId()));
-        }
-
-        String[] indexColumns = ((HashIndexView) indexConfig).columnNames();
-
-        return Arrays.stream(indexColumns)
-                .map(columnName -> {
-                    ColumnView columnView = 
tableConfig.columns().get(columnName);
-
-                    assert columnView != null : "Incorrect index column 
configuration. " + columnName + " column does not exist";
-
-                    return new HashIndexColumnDescriptor(columnView);
-                })
-                .collect(toUnmodifiableList());
-    }
-
-    private @Nullable static TableView findTableById(int tableId, 
NamedListView<? extends TableView> tablesView) {
-        for (TableView table : tablesView) {
-            if (table.id() == tableId) {
-                return table;
-            }
-        }
-
-        return null;
-    }
-
     @Override
     public int id() {
         return id;
@@ -174,4 +111,21 @@ public class HashIndexDescriptor implements 
IndexDescriptor {
     public List<HashIndexColumnDescriptor> columns() {
         return columns;
     }
+
+    private static List<HashIndexColumnDescriptor> 
extractIndexColumnsConfiguration(
+            org.apache.ignite.internal.catalog.descriptors.TableDescriptor 
table,
+            org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor 
index
+    ) {
+        assert table.id() == index.tableId() : "tableId=" + table.id() + ", 
indexTableId=" + index.tableId();
+
+        return index.columns().stream()
+                .map(columnName -> {
+                    
org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor column = 
table.column(columnName);
+
+                    assert column != null : columnName;
+
+                    return new HashIndexColumnDescriptor(column.name(), 
getNativeType(column), column.nullable());
+                })
+                .collect(toList());
+    }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
index 1500e096b6..10b64f4295 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
@@ -18,11 +18,8 @@
 package org.apache.ignite.internal.storage.index;
 
 import java.util.List;
+import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.schema.configuration.TablesView;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 
 /**
  * Index descriptor.
@@ -59,23 +56,23 @@ public interface IndexDescriptor {
     List<? extends ColumnDescriptor> columns();
 
     /**
-     * Creates an index description based on the configuration.
+     * Creates an index description based on the catalog descriptors.
      *
-     * @param tablesView Tables configuration.
-     * @param indexId Index ID.
+     * @param table Catalog table descriptor.
+     * @param index Catalog index descriptor.
      */
-    static IndexDescriptor createIndexDescriptor(TablesView tablesView, int 
indexId) {
-        TableIndexView indexView = tablesView.indexes().stream()
-                .filter(tableIndexView -> indexId == tableIndexView.id())
-                .findFirst()
-                .orElse(null);
+    static IndexDescriptor create(
+            TableDescriptor table,
+            org.apache.ignite.internal.catalog.descriptors.IndexDescriptor 
index
+    ) {
+        if (index instanceof 
org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor) {
+            return new HashIndexDescriptor(table, 
(org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor) index);
+        }
 
-        if (indexView instanceof HashIndexView) {
-            return new HashIndexDescriptor(indexId, tablesView);
-        } else if (indexView instanceof SortedIndexView) {
-            return new SortedIndexDescriptor(indexId, tablesView);
-        } else {
-            throw new AssertionError("Unknown type: " + indexView);
+        if (index instanceof 
org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor) {
+            return new SortedIndexDescriptor(table, 
((org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor) index));
         }
+
+        throw new IllegalArgumentException("Unknown type: " + index);
     }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
index 2b28e21e66..aca926364e 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
@@ -17,23 +17,16 @@
 
 package org.apache.ignite.internal.storage.index;
 
-import static java.util.stream.Collectors.toUnmodifiableList;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.getNativeType;
 
 import java.util.List;
-import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.catalog.descriptors.ColumnCollation;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.schema.NativeType;
-import org.apache.ignite.internal.schema.configuration.ColumnView;
-import 
org.apache.ignite.internal.schema.configuration.ConfigurationToSchemaDescriptorConverter;
-import org.apache.ignite.internal.schema.configuration.TableView;
-import org.apache.ignite.internal.schema.configuration.TablesView;
-import org.apache.ignite.internal.schema.configuration.index.IndexColumnView;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
-import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.tostring.S;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Descriptor for creating a Sorted Index Storage.
@@ -68,19 +61,6 @@ public class SortedIndexDescriptor implements 
IndexDescriptor {
             this.asc = asc;
         }
 
-        /**
-         * Creates a Column Descriptor.
-         *
-         * @param tableColumnView Table column configuration.
-         * @param indexColumnView Index column configuration.
-         */
-        public SortedIndexColumnDescriptor(ColumnView tableColumnView, 
IndexColumnView indexColumnView) {
-            this.name = tableColumnView.name();
-            this.type = 
ConfigurationToSchemaDescriptorConverter.convert(tableColumnView.type());
-            this.nullable = tableColumnView.nullable();
-            this.asc = indexColumnView.asc();
-        }
-
         @Override
         public String name() {
             return name;
@@ -116,13 +96,16 @@ public class SortedIndexDescriptor implements 
IndexDescriptor {
     private final BinaryTupleSchema binaryTupleSchema;
 
     /**
-     * Creates an Index Descriptor from a given Table Configuration.
+     * Constructor.
      *
-     * @param indexId Index ID.
-     * @param tablesConfig Tables configuration.
+     * @param table Catalog table descriptor.
+     * @param index Catalog index descriptor.
      */
-    public SortedIndexDescriptor(int indexId, TablesView tablesConfig) {
-        this(indexId, extractIndexColumnsConfiguration(indexId, tablesConfig));
+    public SortedIndexDescriptor(
+            org.apache.ignite.internal.catalog.descriptors.TableDescriptor 
table,
+            
org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor index
+    ) {
+        this(index.id(), extractIndexColumnsConfiguration(table, index));
     }
 
     /**
@@ -137,44 +120,6 @@ public class SortedIndexDescriptor implements 
IndexDescriptor {
         this.binaryTupleSchema = createSchema(columns);
     }
 
-    private static List<SortedIndexColumnDescriptor> 
extractIndexColumnsConfiguration(int indexId, TablesView tablesConfig) {
-        TableIndexView indexConfig = tablesConfig.indexes().stream()
-                .filter(tableIndexView -> tableIndexView.id() == indexId)
-                .findFirst()
-                .orElse(null);
-
-        if (indexConfig == null) {
-            throw new StorageException(String.format("Index configuration for 
\"%s\" could not be found", indexId));
-        }
-
-        if (!(indexConfig instanceof SortedIndexView)) {
-            throw new StorageException(String.format(
-                    "Index \"%s\" is not configured as a Sorted Index. Actual 
type: %s",
-                    indexId, indexConfig.type()
-            ));
-        }
-
-        TableView tableConfig = findTableById(indexConfig.tableId(), 
tablesConfig.tables());
-
-        if (tableConfig == null) {
-            throw new StorageException(String.format("Table configuration for 
\"%s\" could not be found", indexConfig.tableId()));
-        }
-
-        NamedListView<? extends IndexColumnView> indexColumns = 
((SortedIndexView) indexConfig).columns();
-
-        return indexColumns.stream()
-                .map(indexColumnView -> {
-                    String columnName = indexColumnView.name();
-
-                    ColumnView columnView = 
tableConfig.columns().get(columnName);
-
-                    assert columnView != null : "Incorrect index column 
configuration. " + columnName + " column does not exist";
-
-                    return new SortedIndexColumnDescriptor(columnView, 
indexColumnView);
-                })
-                .collect(toUnmodifiableList());
-    }
-
     private static BinaryTupleSchema 
createSchema(List<SortedIndexColumnDescriptor> columns) {
         Element[] elements = columns.stream()
                 .map(columnDescriptor -> new Element(columnDescriptor.type(), 
columnDescriptor.nullable()))
@@ -183,16 +128,6 @@ public class SortedIndexDescriptor implements 
IndexDescriptor {
         return BinaryTupleSchema.create(elements);
     }
 
-    private @Nullable static TableView findTableById(int tableId, 
NamedListView<? extends TableView> tablesView) {
-        for (TableView table : tablesView) {
-            if (table.id() == tableId) {
-                return table;
-            }
-        }
-
-        return null;
-    }
-
     @Override
     public int id() {
         return id;
@@ -209,4 +144,25 @@ public class SortedIndexDescriptor implements 
IndexDescriptor {
     public BinaryTupleSchema binaryTupleSchema() {
         return binaryTupleSchema;
     }
+
+    private static List<SortedIndexColumnDescriptor> 
extractIndexColumnsConfiguration(
+            org.apache.ignite.internal.catalog.descriptors.TableDescriptor 
table,
+            
org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor index
+    ) {
+        assert table.id() == index.tableId() : "tableId=" + table.id() + ", 
indexTableId=" + index.tableId();
+
+        return index.columns().stream()
+                .map(columnDescriptor -> {
+                    String columnName = columnDescriptor.name();
+
+                    TableColumnDescriptor column = table.column(columnName);
+
+                    assert column != null : columnName;
+
+                    ColumnCollation collation = columnDescriptor.collation();
+
+                    return new SortedIndexColumnDescriptor(columnName, 
getNativeType(column), column.nullable(), collation.asc());
+                })
+                .collect(toList());
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 7981458b24..1fa0b77c7c 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -19,6 +19,10 @@ package org.apache.ignite.internal.storage;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toHashIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toSortedIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findTableView;
 import static 
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -49,6 +53,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -59,6 +64,8 @@ import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
+import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
 import 
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
@@ -129,8 +136,13 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
 
         TablesView tablesView = tablesConfig.value();
 
-        sortedIdx = new 
SortedIndexDescriptor(tablesView.indexes().get(SORTED_INDEX_NAME).id(), 
tablesView);
-        hashIdx = new 
HashIndexDescriptor(tablesView.indexes().get(HASH_INDEX_NAME).id(), tablesView);
+        SortedIndexView sortedIndexView = (SortedIndexView) 
tablesView.indexes().get(SORTED_INDEX_NAME);
+        HashIndexView hashIndexView = (HashIndexView) 
tablesView.indexes().get(HASH_INDEX_NAME);
+
+        TableDescriptor catalogTableDescriptor = 
toTableDescriptor(findTableView(tablesView, sortedIndexView.tableId()));
+
+        sortedIdx = new SortedIndexDescriptor(catalogTableDescriptor, 
toSortedIndexDescriptor(sortedIndexView));
+        hashIdx = new HashIndexDescriptor(catalogTableDescriptor, 
toHashIndexDescriptor(hashIndexView));
     }
 
     @AfterEach
@@ -322,59 +334,6 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         assertThat(getAll(storage2.get(tuple)), contains(rowId2));
     }
 
-    /**
-     * Tests that exceptions are thrown if indices are not configured 
correctly.
-     */
-    @Test
-    public void testMisconfiguredIndices() {
-        Exception e = assertThrows(
-                StorageException.class,
-                () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, 
sortedIdx)
-        );
-
-        assertThat(e.getMessage(), containsString("Partition ID " + 
PARTITION_ID + " does not exist"));
-
-        e = assertThrows(
-                StorageException.class,
-                () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx)
-        );
-
-        assertThat(e.getMessage(), containsString("Partition ID " + 
PARTITION_ID + " does not exist"));
-
-        tableStorage.createMvPartition(PARTITION_ID);
-
-        int invalidId = Integer.MAX_VALUE;
-
-        TablesView tablesView = tableStorage.tablesConfiguration().value();
-
-        e = assertThrows(
-                StorageException.class,
-                () -> new HashIndexDescriptor(invalidId, tablesView)
-        );
-
-        assertThat(e.getMessage(), containsString(String.format("Index 
configuration for \"%s\" could not be found", invalidId)));
-
-        e = assertThrows(
-                StorageException.class,
-                () -> new HashIndexDescriptor(sortedIdx.id(), tablesView)
-        );
-
-        assertThat(
-                e.getMessage(),
-                containsString(String.format("Index \"%s\" is not configured 
as a Hash Index. Actual type: SORTED", sortedIdx.id()))
-        );
-
-        e = assertThrows(
-                StorageException.class,
-                () -> new SortedIndexDescriptor(hashIdx.id(), tablesView)
-        );
-
-        assertThat(
-                e.getMessage(),
-                containsString(String.format("Index \"%s\" is not configured 
as a Sorted Index. Actual type: HASH", hashIdx.id()))
-        );
-    }
-
     @Test
     public void testDestroyPartition() throws Exception {
         assertThrows(IllegalArgumentException.class, () -> 
tableStorage.destroyPartition(getPartitionIdOutOfRange()));
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index b9b1f36ae7..d4c455be9d 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.spy;
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Stream;
@@ -285,7 +284,7 @@ public class TestMvTableStorage implements MvTableStorage {
     }
 
     @Override
-    public @Nullable IndexStorage getIndex(int partitionId, UUID indexId) {
+    public @Nullable IndexStorage getIndex(int partitionId, int indexId) {
         if (mvPartitionStorages.get(partitionId) == null) {
             throw new 
StorageException(createMissingMvPartitionErrorMessage(partitionId));
         }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
index f6ae7e5006..e78b934eab 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.storage.index;
 
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toHashIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findTableView;
 import static 
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.addIndex;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.CoreMatchers.is;
@@ -25,7 +28,10 @@ import static org.hamcrest.Matchers.empty;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
 import 
org.apache.ignite.internal.schema.testutils.definition.index.HashIndexDefinition;
@@ -55,9 +61,12 @@ public abstract class AbstractHashIndexStorageTest extends 
AbstractIndexStorageT
 
         TablesView tablesView = tablesCfg.value();
 
+        TableIndexView indexView = 
tablesView.indexes().get(indexDefinition.name());
+        TableView tableView = findTableView(tablesView, indexView.tableId());
+
         return tableStorage.getOrCreateHashIndex(
                 TEST_PARTITION,
-                new 
HashIndexDescriptor(tablesView.indexes().get(indexDefinition.name()).id(), 
tablesView)
+                new HashIndexDescriptor(toTableDescriptor(tableView), 
toHashIndexDescriptor(((HashIndexView) indexView)))
         );
     }
 
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 1c5982167e..4b3707c89c 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -20,6 +20,9 @@ package org.apache.ignite.internal.storage.index;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toSortedIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findTableView;
 import static 
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.addIndex;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
@@ -53,7 +56,10 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.SchemaTestUtils;
+import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
 import 
org.apache.ignite.internal.schema.testutils.builder.SortedIndexDefinitionBuilder;
 import 
org.apache.ignite.internal.schema.testutils.builder.SortedIndexDefinitionBuilder.SortedIndexColumnBuilder;
@@ -129,9 +135,12 @@ public abstract class AbstractSortedIndexStorageTest 
extends AbstractIndexStorag
 
         TablesView tablesView = tablesCfg.value();
 
+        TableIndexView indexView = 
tablesView.indexes().get(indexDefinition.name());
+        TableView tableView = findTableView(tablesView, indexView.tableId());
+
         return tableStorage.getOrCreateSortedIndex(
                 TEST_PARTITION,
-                new 
SortedIndexDescriptor(tablesView.indexes().get(indexDefinition.name()).id(), 
tablesView)
+                new SortedIndexDescriptor(toTableDescriptor(tableView), 
toSortedIndexDescriptor((SortedIndexView) indexView))
         );
     }
 
diff --git a/modules/storage-page-memory/build.gradle 
b/modules/storage-page-memory/build.gradle
index 312645428c..2d74bb93fe 100644
--- a/modules/storage-page-memory/build.gradle
+++ b/modules/storage-page-memory/build.gradle
@@ -30,6 +30,7 @@ dependencies {
     implementation project(':ignite-file-io')
     implementation project(':ignite-configuration')
     implementation project(':ignite-transactions')
+    implementation project(':ignite-catalog')
     implementation libs.jetbrains.annotations
     implementation libs.auto.service.annotations
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 3379fdad7a..5b9ac89d30 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -24,7 +24,6 @@ import static 
org.apache.ignite.internal.storage.util.StorageUtils.createMissing
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -319,7 +318,7 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
     }
 
     @Override
-    public @Nullable IndexStorage getIndex(int partitionId, UUID indexId) {
+    public @Nullable IndexStorage getIndex(int partitionId, int indexId) {
         return busy(() -> {
             AbstractPageMemoryMvPartitionStorage partitionStorage = 
mvPartitionStorages.get(partitionId);
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 9a06d9c68e..403436fd45 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findIndexView;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findTableView;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
@@ -32,6 +36,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.pagememory.PageIdAllocator;
 import org.apache.ignite.internal.pagememory.PageMemory;
@@ -42,9 +48,8 @@ import 
org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
 import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesView;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
 import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
@@ -185,23 +190,33 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
                 for (IndexMeta indexMeta : cursor) {
                     int indexId = indexMeta.indexId();
 
-                    TableIndexView indexCfgView = tablesView.indexes().stream()
-                            .filter(tableIndexView -> indexId == 
tableIndexView.id())
-                            .findFirst()
-                            .orElse(null);
+                    TableIndexView indexView = findIndexView(tablesView, 
indexId);
 
-                    if (indexCfgView instanceof HashIndexView) {
-                        hashIndexes.put(
-                                indexId,
-                                createOrRestoreHashIndex(indexMeta, new 
HashIndexDescriptor(indexId, tablesView))
+                    assert indexView != null : indexId;
+
+                    TableView tableView = findTableView(tablesView, 
indexView.tableId());
+
+                    assert tableView != null : "indexId=" + indexId + ", 
tableId=" + indexView.tableId();
+
+                    TableDescriptor catalogTableDescriptor = 
toTableDescriptor(tableView);
+                    IndexDescriptor catalogIndexDescriptor = 
toIndexDescriptor(indexView);
+
+                    if (catalogIndexDescriptor instanceof 
org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor) {
+                        HashIndexDescriptor storageIndexDescriptor = new 
HashIndexDescriptor(
+                                catalogTableDescriptor,
+                                
(org.apache.ignite.internal.catalog.descriptors.HashIndexDescriptor) 
catalogIndexDescriptor
                         );
-                    } else if (indexCfgView instanceof SortedIndexView) {
-                        sortedIndexes.put(
-                                indexId,
-                                createOrRestoreSortedIndex(indexMeta, new 
SortedIndexDescriptor(indexId, tablesView))
+
+                        hashIndexes.put(indexId, 
createOrRestoreHashIndex(indexMeta, storageIndexDescriptor));
+                    } else if (catalogIndexDescriptor instanceof 
org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor) {
+                        SortedIndexDescriptor storageIndexDescriptor = new 
SortedIndexDescriptor(
+                                catalogTableDescriptor,
+                                
((org.apache.ignite.internal.catalog.descriptors.SortedIndexDescriptor) 
catalogIndexDescriptor)
                         );
+
+                        sortedIndexes.put(indexId, 
createOrRestoreSortedIndex(indexMeta, storageIndexDescriptor));
                     } else {
-                        assert indexCfgView == null;
+                        assert indexView == null;
 
                         //TODO: IGNITE-17626 Drop the index synchronously.
                     }
@@ -994,9 +1009,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     /**
      * Returns a index storage instance or {@code null} if not exists.
      *
-     * @param indexId Index UUID.
+     * @param indexId Index ID.
      */
-    public @Nullable IndexStorage getIndex(UUID indexId) {
+    public @Nullable IndexStorage getIndex(int indexId) {
         return busy(() -> {
             PageMemoryHashIndexStorage hashIndexStorage = 
hashIndexes.get(indexId);
 
diff --git a/modules/storage-rocksdb/build.gradle 
b/modules/storage-rocksdb/build.gradle
index d4ec696d4b..ce43baa2f0 100644
--- a/modules/storage-rocksdb/build.gradle
+++ b/modules/storage-rocksdb/build.gradle
@@ -31,6 +31,7 @@ dependencies {
     implementation project(':ignite-configuration')
     implementation project(':ignite-distribution-zones')
     implementation project(':ignite-transactions')
+    implementation project(':ignite-catalog')
     implementation libs.jetbrains.annotations
     implementation libs.auto.service.annotations
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index f75c329215..4896fc8cf8 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -21,6 +21,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toSortedIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findIndexView;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.GC_QUEUE_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.HASH_INDEX_CF_NAME;
 import static 
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.META_CF_NAME;
@@ -40,7 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -52,7 +54,10 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
@@ -235,7 +240,10 @@ public class RocksDbTableStorage implements MvTableStorage 
{
                 throw new StorageException("Failed to create a directory for 
the table storage", e);
             }
 
-            List<ColumnFamilyDescriptor> cfDescriptors = 
getExistingCfDescriptors();
+            TablesView tablesView = tablesCfg.value();
+            TableView tableView = tableCfg.value();
+
+            List<ColumnFamilyDescriptor> cfDescriptors = 
getExistingCfDescriptors(tablesView, tableView);
 
             List<ColumnFamilyHandle> cfHandles = new 
ArrayList<>(cfDescriptors.size());
 
@@ -294,7 +302,11 @@ public class RocksDbTableStorage implements MvTableStorage 
{
                 for (Entry<Integer, ColumnFamily> entry : 
sortedIndexColumnFamilyByIndexId.entrySet()) {
                     int indexId = entry.getKey();
 
-                    var indexDescriptor = new SortedIndexDescriptor(indexId, 
tablesCfg.value());
+                    SortedIndexView indexView = (SortedIndexView) 
findIndexView(tablesView, indexId);
+
+                    assert indexView != null : "tableId=" + tableView.id() + 
", indexId=" + indexId;
+
+                    var indexDescriptor = new 
SortedIndexDescriptor(toTableDescriptor(tableView), 
toSortedIndexDescriptor(indexView));
 
                     sortedIndices.put(indexId, new 
SortedIndex(entry.getValue(), indexDescriptor, meta));
                 }
@@ -305,7 +317,7 @@ public class RocksDbTableStorage implements MvTableStorage {
             }
 
             MvPartitionStorages<RocksDbMvPartitionStorage> mvPartitionStorages 
=
-                    new MvPartitionStorages<>(tableCfg.value(), 
distributionZoneConfiguration().value());
+                    new MvPartitionStorages<>(tableView, 
distributionZoneConfiguration().value());
 
             for (int partitionId : meta.getPartitionIds()) {
                 // There is no need to wait for futures, since there will be 
no parallel operations yet.
@@ -574,19 +586,17 @@ public class RocksDbTableStorage implements 
MvTableStorage {
 
     /**
      * Returns a list of CF descriptors present in the RocksDB instance.
-     *
-     * @return List of CF descriptors.
      */
-    private List<ColumnFamilyDescriptor> getExistingCfDescriptors() {
+    private List<ColumnFamilyDescriptor> getExistingCfDescriptors(TablesView 
tablesView, TableView tableView) {
         return getExistingCfNames().stream()
-                .map(this::cfDescriptorFromName)
+                .map(cfName -> cfDescriptorFromName(tablesView, tableView, 
cfName))
                 .collect(toList());
     }
 
     /**
      * Creates a Column Family descriptor for the given Family type (encoded 
in its name).
      */
-    private ColumnFamilyDescriptor cfDescriptorFromName(String cfName) {
+    private ColumnFamilyDescriptor cfDescriptorFromName(TablesView tablesView, 
TableView tableView, String cfName) {
         switch (ColumnFamilyType.fromCfName(cfName)) {
             case META:
             case GC_QUEUE:
@@ -608,12 +618,18 @@ public class RocksDbTableStorage implements 
MvTableStorage {
                 );
 
             case SORTED_INDEX:
-                var indexDescriptor = new 
SortedIndexDescriptor(sortedIndexId(cfName), tablesCfg.value());
+                int indexId = sortedIndexId(cfName);
+
+                SortedIndexView indexView = (SortedIndexView) 
findIndexView(tablesView, indexId);
+
+                assert indexView != null : "tableId=" + tableView.id() + ", 
indexId=" + indexId;
+
+                var indexDescriptor = new 
SortedIndexDescriptor(toTableDescriptor(tableView), 
toSortedIndexDescriptor(indexView));
 
                 return sortedIndexCfDescriptor(cfName, indexDescriptor);
 
             default:
-                throw new StorageException("Unidentified column family [name=" 
+ cfName + ", table=" + getTableName() + ']');
+                throw new StorageException("Unidentified column family 
[name={}, table={}]", cfName, tableView.name());
         }
     }
 
@@ -744,7 +760,7 @@ public class RocksDbTableStorage implements MvTableStorage {
     }
 
     @Override
-    public @Nullable IndexStorage getIndex(int partitionId, UUID indexId) {
+    public @Nullable IndexStorage getIndex(int partitionId, int indexId) {
         return inBusyLock(busyLock, () -> {
             if (mvPartitionStorages.get(partitionId) == null) {
                 throw new 
StorageException(createMissingMvPartitionErrorMessage(partitionId));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 3d95f2fb3f..f8789ffaff 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,7 +22,9 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toIndexDescriptor;
+import static 
org.apache.ignite.internal.catalog.descriptors.CatalogDescriptorUtils.toTableDescriptor;
+import static 
org.apache.ignite.internal.schema.configuration.SchemaConfigurationUtils.findTableView;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.IgniteUtils.filter;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
@@ -58,6 +60,7 @@ import java.util.stream.Collectors;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
+import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.raft.Command;
@@ -78,6 +81,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesView;
 import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -2432,11 +2436,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             @Override
             public CompletableFuture<?> 
onCreate(ConfigurationNotificationEvent<TableIndexView> ctx) {
                 inBusyLock(() -> {
-                    TableIndexView tableIndexView = ctx.newValue();
+                    TableIndexView indexView = ctx.newValue();
 
-                    if (tableId() == tableIndexView.tableId()) {
-                        
startBuildIndex(createIndexDescriptor(ctx.newValue(TablesView.class), 
tableIndexView.id()));
+                    if (tableId() != indexView.tableId()) {
+                        return;
                     }
+
+                    TableView tableView = 
findTableView(ctx.newValue(TablesView.class), tableId());
+
+                    assert tableView != null : tableId();
+
+                    TableDescriptor catalogTableDescriptor = 
toTableDescriptor(tableView);
+                    
org.apache.ignite.internal.catalog.descriptors.IndexDescriptor 
catalogIndexDescriptor = toIndexDescriptor(indexView);
+
+                    
startBuildIndex(IndexDescriptor.create(catalogTableDescriptor, 
catalogIndexDescriptor));
                 });
 
                 return completedFuture(null);
@@ -2517,8 +2530,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         // Let's try to build an index for the previously created indexes for 
the table.
         TablesView tablesView = mvTableStorage.tablesConfiguration().value();
 
-        for (int indexId : collectIndexIds(tablesView)) {
-            startBuildIndex(createIndexDescriptor(tablesView, indexId));
+        for (TableIndexView indexView : tablesView.indexes()) {
+            if (indexView.tableId() != tableId()) {
+                continue;
+            }
+
+            TableView tableView = findTableView(tablesView, tableId());
+
+            assert tableView != null : tableId();
+
+            TableDescriptor catalogTableDescriptor = 
toTableDescriptor(tableView);
+            org.apache.ignite.internal.catalog.descriptors.IndexDescriptor 
catalogIndexDescriptor = toIndexDescriptor(indexView);
+
+            startBuildIndex(IndexDescriptor.create(catalogTableDescriptor, 
catalogIndexDescriptor));
         }
     }
 

Reply via email to