This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b8bd6a6266 IGNITE-22757 Sql. Fix excessive memory usage in 
schema-related code in SQL (#4180)
b8bd6a6266 is described below

commit b8bd6a62666832096cefcbd778f879e13c0461cf
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed Aug 7 16:48:27 2024 +0300

    IGNITE-22757 Sql. Fix excessive memory usage in schema-related code in SQL 
(#4180)
---
 .../descriptors/CatalogTableDescriptor.java        |  84 ++++++-
 .../ignite/internal/catalog/CatalogTableTest.java  |   9 +
 .../internal/sql/engine/schema/IgniteIndex.java    |  15 +-
 .../sql/engine/schema/SqlSchemaManagerImpl.java    | 241 +++++++++++++++++----
 .../engine/schema/SqlSchemaManagerImplTest.java    |  29 +++
 5 files changed, 322 insertions(+), 56 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index ab14b290a0..2efed95c55 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -21,18 +21,20 @@ import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUS
 import static 
org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.readList;
 import static 
org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeList;
 
+import it.unimi.dsi.fastutil.ints.AbstractInt2ObjectMap.BasicEntry;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion;
 import 
org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.jetbrains.annotations.Nullable;
@@ -61,7 +63,7 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
     private final List<String> colocationColumns;
 
     @IgniteToStringExclude
-    private Map<String, CatalogTableColumnDescriptor> columnsMap;
+    private Map<String, Int2ObjectMap.Entry<CatalogTableColumnDescriptor>> 
columnsMap;
 
     private long creationToken;
 
@@ -128,7 +130,14 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
         this.zoneId = zoneId;
         this.columns = Objects.requireNonNull(columns, "No columns defined.");
         this.primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary 
key columns.");
-        this.columnsMap = 
columns.stream().collect(Collectors.toMap(CatalogTableColumnDescriptor::name, 
Function.identity()));
+
+        Map<String, Int2ObjectMap.Entry<CatalogTableColumnDescriptor>> 
columnMap = IgniteUtils.newHashMap(columns.size());
+        for (int i = 0; i < columns.size(); i++) {
+            CatalogTableColumnDescriptor column = columns.get(i);
+            columnMap.put(column.name(), new BasicEntry<>(i, column));
+        }
+
+        this.columnsMap = columnMap;
         this.colocationColumns = Objects.requireNonNullElse(colocationCols, 
pkCols);
         this.schemaVersions =  Objects.requireNonNull(schemaVersions, "No 
catalog schema versions.");
         this.storageProfile = Objects.requireNonNull(storageProfile, "No 
storage profile.");
@@ -160,66 +169,129 @@ public class CatalogTableDescriptor extends 
CatalogObjectDescriptor {
      * Returns column descriptor for column with given name.
      */
     public CatalogTableColumnDescriptor columnDescriptor(String columnName) {
-        return columnsMap.get(columnName);
+        Entry<Integer, CatalogTableColumnDescriptor> column = 
columnsMap.get(columnName);
+        if (column != null) {
+            return column.getValue();
+        } else {
+            return null;
+        }
     }
 
+    /**
+     * Returns an identifier of a schema this table descriptor belongs to.
+     */
     public int schemaId() {
         return schemaId;
     }
 
+    /**
+     * Returns versions of this table descriptor.
+     */
     public CatalogTableSchemaVersions schemaVersions() {
         return schemaVersions;
     }
 
+    /**
+     * Returns an identifier of a distribution zone this table descriptor 
belongs to.
+     */
     public int zoneId() {
         return zoneId;
     }
 
+    /**
+     * Returns a identifier of the primary key index.
+     */
     public int primaryKeyIndexId() {
         return pkIndexId;
     }
 
+    /**
+     * Returns a version of this table descriptor.
+     */
     public int tableVersion() {
         return schemaVersions.latestVersion();
     }
 
+    /**
+     * Returns a list primary key column names.
+     */
     public List<String> primaryKeyColumns() {
         return primaryKeyColumns;
     }
 
+    /**
+     * Returns a list colocation key column names.
+     */
     public List<String> colocationColumns() {
         return colocationColumns;
     }
 
+    /**
+     * Returns a list column descriptors for the table.
+     */
     public List<CatalogTableColumnDescriptor> columns() {
         return columns;
     }
 
+    /**
+     * Returns a column descriptor for column with given name.
+     */
     public CatalogTableColumnDescriptor column(String name) {
-        return columnsMap.get(name);
+        Entry<Integer, CatalogTableColumnDescriptor> column = 
columnsMap.get(name);
+        if (column != null) {
+            return column.getValue();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Returns an index of a column with the given name, or {@code -1} if such 
column does not exist.
+     */
+    public int columnIndex(String name) {
+        Entry<Integer, CatalogTableColumnDescriptor> column = 
columnsMap.get(name);
+        if (column != null) {
+            return column.getKey();
+        } else {
+            return -1;
+        }
     }
 
+    /**
+     * Returns {@code true} if this the given column is a part of the primary 
key.
+     */
     public boolean isPrimaryKeyColumn(String name) {
         return primaryKeyColumns.contains(name);
     }
 
+    /**
+     * Returns {@code true} if this the given column is a part of collocation 
key.
+     */
     public boolean isColocationColumn(String name) {
         return colocationColumns.contains(name);
     }
 
+    /** {@inheritDoc} */
     @Override
     public String toString() {
         return S.toString(CatalogTableDescriptor.class, this, 
super.toString());
     }
 
+    /**
+     * Returns a creation token.
+     */
     public long creationToken() {
         return creationToken;
     }
 
+    /**
+     * Returns a name of a storage profile.
+     */
     public String storageProfile() {
         return storageProfile;
     }
 
+    /** {@inheritDoc} */
     @Override
     public void updateToken(long updateToken) {
         super.updateToken(updateToken);
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
index 9974e6280f..826057af1a 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
@@ -171,6 +171,12 @@ public class CatalogTableTest extends 
BaseCatalogManagerTest {
         assertNotNull(desc);
         // INT32 key
         assertThat(desc.precision(), is(DEFAULT_PRECISION));
+
+        int key1ColIndex = schema.table(TABLE_NAME).columnIndex("key1");
+        assertEquals(0, key1ColIndex);
+
+        int key2ColIndex = schema.table(TABLE_NAME).columnIndex("key2");
+        assertEquals(1, key2ColIndex);
     }
 
     @Test
@@ -322,6 +328,9 @@ public class CatalogTableTest extends 
BaseCatalogManagerTest {
         assertEquals(11, column.length());
         assertEquals(DEFAULT_PRECISION, column.precision());
         assertEquals(DEFAULT_SCALE, column.scale());
+
+        int newColumnIndex = 
schema.table(TABLE_NAME).columnIndex(NEW_COLUMN_NAME);
+        assertEquals(6, newColumnIndex);
     }
 
     @Test
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
index f21bc22745..763820b4d0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
@@ -38,6 +38,8 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import 
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -156,16 +158,14 @@ public class IgniteIndex {
         return IgniteLogicalIndexScan.create(cluster, traitSet, relOptTable, 
name, proj, condition, requiredCols);
     }
 
-    static RelCollation createIndexCollation(CatalogIndexDescriptor 
descriptor, TableDescriptor tableDescriptor) {
+    static RelCollation createIndexCollation(CatalogIndexDescriptor 
descriptor, CatalogTableDescriptor tableDescriptor) {
         if (descriptor instanceof CatalogSortedIndexDescriptor) {
             CatalogSortedIndexDescriptor sortedIndexDescriptor = 
(CatalogSortedIndexDescriptor) descriptor;
             List<CatalogIndexColumnDescriptor> columns = 
sortedIndexDescriptor.columns();
             List<RelFieldCollation> fieldCollations = new 
ArrayList<>(columns.size());
 
-            for (int i = 0; i < columns.size(); i++) {
-                CatalogIndexColumnDescriptor column = columns.get(i);
-                ColumnDescriptor columnDesc = 
tableDescriptor.columnDescriptor(column.name());
-                int fieldIndex = columnDesc.logicalIndex();
+            for (CatalogIndexColumnDescriptor column : columns) {
+                int fieldIndex = tableDescriptor.columnIndex(column.name());
 
                 RelFieldCollation fieldCollation;
                 switch (column.collation()) {
@@ -195,9 +195,10 @@ public class IgniteIndex {
             List<RelFieldCollation> fieldCollations = new 
ArrayList<>(columns.size());
 
             for (String columnName : columns) {
-                ColumnDescriptor columnDesc = 
tableDescriptor.columnDescriptor(columnName);
+                CatalogTableColumnDescriptor tableColumn = 
tableDescriptor.columnDescriptor(columnName);
+                int fieldIndex = 
tableDescriptor.columns().indexOf(tableColumn);
 
-                fieldCollations.add(new 
RelFieldCollation(columnDesc.logicalIndex(), Direction.CLUSTERED, 
NullDirection.UNSPECIFIED));
+                fieldCollations.add(new RelFieldCollation(fieldIndex, 
Direction.CLUSTERED, NullDirection.UNSPECIFIED));
             }
 
             return RelCollations.of(fieldCollations);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 4b4dbe60b5..8abb1fdb36 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -30,9 +30,16 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogService;
@@ -53,6 +60,7 @@ import 
org.apache.ignite.internal.schema.DefaultValueGenerator;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.cache.Cache;
 import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
@@ -68,13 +76,26 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
     private final Cache<Integer, SchemaPlus> schemaCache;
 
-    private final Cache<Long, IgniteTable> tableCache;
+    /**
+     * Table cache by (tableId, tableVersion).
+     * Only data that included in a catalog table descriptor itself is 
up-to-date.
+     * Table related information from other object is not reliable.
+     */
+    private final Cache<Long, IgniteTableImpl> tableCache;
+
+    /** Index cache by (indexId, indexStatus). */
+    private final Cache<Long, IgniteIndex> indexCache;
+
+    /** Table cache by (catalogVersion, tableId). Includes all table related 
information. */
+    private final Cache<Long, ActualIgniteTable> fullDataTableCache;
 
     /** Constructor. */
     public SqlSchemaManagerImpl(CatalogManager catalogManager, CacheFactory 
factory, int cacheSize) {
         this.catalogManager = catalogManager;
         this.schemaCache = factory.create(cacheSize);
         this.tableCache = factory.create(cacheSize);
+        this.indexCache = factory.create(cacheSize);
+        this.fullDataTableCache = factory.create(cacheSize);
     }
 
     /** {@inheritDoc} */
@@ -108,9 +129,10 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
     @Override
     public IgniteTable table(int catalogVersion, int tableId) {
-        return tableCache.get(tableCacheKey(catalogVersion, tableId), key -> {
+        return fullDataTableCache.get(cacheKey(catalogVersion, tableId), key 
-> {
             SchemaPlus rootSchema = schemaCache.get(catalogVersion);
 
+            // Retrieve table from the schema (if it exists).
             if (rootSchema != null) {
                 for (String name : rootSchema.getSubSchemaNames()) {
                     SchemaPlus subSchema = rootSchema.getSubSchema(name);
@@ -121,7 +143,8 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
                     assert schema != null : "unknown schema " + subSchema;
 
-                    IgniteTable table = schema.tableByIdOpt(tableId);
+                    // Schema contains a wrapper for IgniteTable that includes 
actual information for a table (indexes, etc).
+                    ActualIgniteTable table = (ActualIgniteTable) 
schema.tableByIdOpt(tableId);
 
                     if (table != null) {
                         return table;
@@ -129,6 +152,8 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 }
             }
 
+            // Load actual table information from the catalog.
+
             Catalog catalog = catalogManager.catalog(catalogVersion);
 
             if (catalog == null) {
@@ -141,17 +166,29 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
                 throw new IgniteInternalException(Common.INTERNAL_ERR, "Table 
with given id not found: " + tableId);
             }
 
-            return createTable(catalog, tableDescriptor);
+            long tableKey = cacheKey(tableDescriptor.id(), 
tableDescriptor.tableVersion());
+
+            IgniteTableImpl igniteTable = tableCache.get(tableKey, (x) -> {
+                TableDescriptor descriptor = 
createTableDescriptorForTable(tableDescriptor);
+                return createTableDataOnlyTable(catalog, tableDescriptor, 
descriptor);
+            });
+
+            Map<String, IgniteIndex> tableIndexes = getIndexes(catalog,
+                    tableDescriptor.id(),
+                    tableDescriptor.primaryKeyIndexId()
+            );
+
+            return new ActualIgniteTable(igniteTable, tableIndexes);
         });
     }
 
-    private static long tableCacheKey(int catalogVersion, int tableId) {
-        long cacheKey = catalogVersion;
+    private static long cacheKey(int part1, int part2) {
+        long cacheKey = part1;
         cacheKey <<= 32;
-        return cacheKey | tableId;
+        return cacheKey | part2;
     }
 
-    private static SchemaPlus createRootSchema(Catalog catalog) {
+    private SchemaPlus createRootSchema(Catalog catalog) {
         SchemaPlus rootSchema = Frameworks.createRootSchema(false);
 
         for (CatalogSchemaDescriptor schemaDescriptor : catalog.schemas()) {
@@ -162,7 +199,7 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
         return rootSchema;
     }
 
-    private static IgniteSchema createSqlSchema(Catalog catalog, 
CatalogSchemaDescriptor schemaDescriptor) {
+    private IgniteSchema createSqlSchema(Catalog catalog, 
CatalogSchemaDescriptor schemaDescriptor) {
         int catalogVersion = catalog.version();
         String schemaName = schemaDescriptor.name();
 
@@ -171,9 +208,23 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
         // Assemble sql-engine.TableDescriptors as they are required by 
indexes.
         for (CatalogTableDescriptor tableDescriptor : 
schemaDescriptor.tables()) {
-            schemaDataSources.add(
-                    createTable(catalog, tableDescriptor)
+            long tableKey = cacheKey(tableDescriptor.id(), 
tableDescriptor.tableVersion());
+
+            // Load cached table by (id, version)
+            IgniteTableImpl igniteTable = tableCache.get(tableKey, (k) -> {
+                TableDescriptor descriptor = 
createTableDescriptorForTable(tableDescriptor);
+                return createTableDataOnlyTable(catalog, tableDescriptor, 
descriptor);
+            });
+
+            // Get actual indices
+            Map<String, IgniteIndex> tableIndexes = getIndexes(catalog,
+                    tableDescriptor.id(),
+                    tableDescriptor.primaryKeyIndexId()
             );
+
+            // Store a wrapper for the table that includes actual information 
for a table (indexes, etc),
+            // because the cached table entry (id, version) may not include 
up-to-date information on indexes.
+            schemaDataSources.add(new ActualIgniteTable(igniteTable, 
tableIndexes));
         }
 
         for (CatalogSystemViewDescriptor systemViewDescriptor : 
schemaDescriptor.systemViews()) {
@@ -195,7 +246,8 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
 
     private static IgniteIndex createSchemaIndex(
             CatalogIndexDescriptor indexDescriptor,
-            TableDescriptor tableDescriptor,
+            RelCollation outputCollation,
+            IgniteDistribution distribution,
             boolean primaryKey
     ) {
         Type type;
@@ -207,24 +259,21 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
             throw new IllegalArgumentException("Unexpected index type: " + 
indexDescriptor);
         }
 
-        RelCollation outputCollation = 
IgniteIndex.createIndexCollation(indexDescriptor, tableDescriptor);
         return new IgniteIndex(
-                indexDescriptor.id(), indexDescriptor.name(), type, 
tableDescriptor.distribution(), outputCollation, primaryKey
+                indexDescriptor.id(), indexDescriptor.name(), type, 
distribution, outputCollation, primaryKey
         );
     }
 
     private static TableDescriptor 
createTableDescriptorForTable(CatalogTableDescriptor descriptor) {
         List<CatalogTableColumnDescriptor> columns = descriptor.columns();
-
         List<ColumnDescriptor> colDescriptors = new ArrayList<>(columns.size() 
+ 1);
-        Object2IntMap<String> columnToIndex = new 
Object2IntOpenHashMap<>(columns.size() + 1);
+        Object2IntMap<String> columnToIndex = buildColumnToIndexMap(columns);
 
         for (int i = 0; i < columns.size(); i++) {
             CatalogTableColumnDescriptor col = columns.get(i);
             boolean key = descriptor.isPrimaryKeyColumn(col.name());
             ColumnDescriptor columnDescriptor = createColumnDescriptor(col, 
key, i);
 
-            columnToIndex.put(col.name(), i);
             colDescriptors.add(columnDescriptor);
         }
 
@@ -236,20 +285,35 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
             }
         }
 
-        List<Integer> colocationColumns = 
descriptor.colocationColumns().stream()
-                .map(columnToIndex::getInt)
-                .collect(Collectors.toList());
-
         // Add virtual column.
         ColumnDescriptorImpl partVirtualColumn = 
createPartitionVirtualColumn(columns.size());
         colDescriptors.add(partVirtualColumn);
-        columnToIndex.put(partVirtualColumn.name(), 
partVirtualColumn.logicalIndex());
+
+        IgniteDistribution distribution = createDistribution(descriptor, 
columnToIndex);
+
+        return new TableDescriptorImpl(colDescriptors, distribution);
+    }
+
+    private static IgniteDistribution 
createDistribution(CatalogTableDescriptor descriptor, Object2IntMap<String> 
columnToIndex) {
+        List<Integer> colocationColumns = 
descriptor.colocationColumns().stream()
+                .map(columnToIndex::getInt)
+                .collect(Collectors.toList());
 
         // TODO Use the actual zone ID after implementing 
https://issues.apache.org/jira/browse/IGNITE-18426.
         int tableId = descriptor.id();
-        IgniteDistribution distribution = 
IgniteDistributions.affinity(colocationColumns, tableId, tableId);
 
-        return new TableDescriptorImpl(colDescriptors, distribution);
+        return IgniteDistributions.affinity(colocationColumns, tableId, 
tableId);
+    }
+
+    private static Object2IntMap<String> 
buildColumnToIndexMap(List<CatalogTableColumnDescriptor> columns) {
+        Object2IntMap<String> columnToIndex = new 
Object2IntOpenHashMap<>(columns.size() + 1);
+
+        for (int i = 0; i < columns.size(); i++) {
+            CatalogTableColumnDescriptor col = columns.get(i);
+            columnToIndex.put(col.name(), i);
+        }
+
+        return columnToIndex;
     }
 
     private static ColumnDescriptorImpl createPartitionVirtualColumn(int 
logicalIndex) {
@@ -365,42 +429,77 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
         return columnDescriptor;
     }
 
-    private static IgniteTable createTable(
+    private IgniteTableImpl createTableDataOnlyTable(
             Catalog catalog,
-            CatalogTableDescriptor tableDescriptor
+            CatalogTableDescriptor table,
+            TableDescriptor descriptor
     ) {
-        TableDescriptor descriptor = 
createTableDescriptorForTable(tableDescriptor);
+        Map<String, IgniteIndex> tableIndexes = getIndexes(catalog,
+                table.id(),
+                table.primaryKeyIndexId()
+        );
 
+        CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(catalog, 
table.zoneId());
+
+        return createTable(table, descriptor, tableIndexes, 
zoneDescriptor.partitions());
+    }
+
+    private Map<String, IgniteIndex> getIndexes(Catalog catalog, int tableId, 
int primaryKeyIndexId) {
         Map<String, IgniteIndex> tableIndexes = new HashMap<>();
-        for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableDescriptor.id())) {
+        CatalogTableDescriptor table = catalog.table(tableId);
+        assert table != null;
+
+        for (CatalogIndexDescriptor indexDescriptor : 
catalog.indexes(tableId)) {
             if (indexDescriptor.status() != AVAILABLE) {
                 continue;
             }
 
             String indexName = indexDescriptor.name();
+            long indexKey = cacheKey(indexDescriptor.id(), 
indexDescriptor.status().id());
 
-            IgniteIndex schemaIndex = createSchemaIndex(
-                    indexDescriptor,
-                    descriptor,
-                    indexDescriptor.id() == tableDescriptor.primaryKeyIndexId()
-            );
+            IgniteIndex schemaIndex = indexCache.get(indexKey, (x) -> {
+                RelCollation outputCollation = 
IgniteIndex.createIndexCollation(indexDescriptor, table);
+                Object2IntMap<String> columnToIndex = 
buildColumnToIndexMap(table.columns());
+                IgniteDistribution distribution = createDistribution(table, 
columnToIndex);
+
+                return createSchemaIndex(
+                        indexDescriptor,
+                        outputCollation,
+                        distribution,
+                        indexDescriptor.id() == primaryKeyIndexId
+                );
+            });
 
             tableIndexes.put(indexName, schemaIndex);
         }
 
-        int zoneId = tableDescriptor.zoneId();
+        return tableIndexes;
+    }
+
+    private static CatalogZoneDescriptor getZoneDescriptor(Catalog catalog, 
int zoneId) {
         CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
         assert zoneDescriptor != null : "Zone is not found in schema: " + 
zoneId;
 
-        return createTable(tableDescriptor, descriptor, tableIndexes, 
zoneDescriptor.partitions());
+        return zoneDescriptor;
     }
 
-    private static IgniteTable createTable(
+    private static IgniteTableImpl createTable(
             CatalogTableDescriptor catalogTableDescriptor,
             TableDescriptor tableDescriptor,
             Map<String, IgniteIndex> indexes,
             int parititions
     ) {
+        IgniteIndex primaryIndex = indexes.values().stream()
+                .filter(IgniteIndex::primaryKey)
+                .findFirst()
+                .orElseThrow();
+
+        // We do not need any index other than the primary index,
+        // all other indexes are stored in full table data cache.
+        Map<String, IgniteIndex> primaryKeyOnlyMap = 
Map.of(primaryIndex.name(), primaryIndex);
+
+        ImmutableIntList primaryKeyColumns = 
primaryIndex.collation().getKeys();
+
         int tableId = catalogTableDescriptor.id();
         String tableName = catalogTableDescriptor.name();
 
@@ -408,20 +507,76 @@ public class SqlSchemaManagerImpl implements 
SqlSchemaManager {
         // Let's fix table statistics keeping in mind IGNITE-19558 issue.
         IgniteStatistic statistic = new IgniteStatistic(() -> 0.0d, 
tableDescriptor.distribution());
 
-        IgniteIndex primaryIndex = indexes.values().stream()
-                .filter(IgniteIndex::primaryKey)
-                .findFirst()
-                .orElseThrow();
-
         return new IgniteTableImpl(
                 tableName,
                 tableId,
                 catalogTableDescriptor.tableVersion(),
                 tableDescriptor,
-                primaryIndex.collation().getKeys(),
+                primaryKeyColumns,
                 statistic,
-                indexes,
+                primaryKeyOnlyMap,
                 parititions
         );
     }
+
+    private static class ActualIgniteTable extends AbstractIgniteDataSource 
implements IgniteTable {
+
+        /** Cached table by id and version. */
+        private final IgniteTableImpl table;
+
+        /** Index map with up-to-date information. */
+        private final Map<String, IgniteIndex> indexMap;
+
+        ActualIgniteTable(IgniteTableImpl igniteTable, Map<String, 
IgniteIndex> indexMap) {
+            super(igniteTable.name(), igniteTable.id(), igniteTable.version(), 
igniteTable.descriptor(), igniteTable.getStatistic());
+
+            this.table = igniteTable;
+            this.indexMap = indexMap;
+        }
+
+        @Override
+        protected TableScan toRel(RelOptCluster cluster, RelTraitSet traitSet, 
RelOptTable relOptTbl, List<RelHint> hints) {
+            return table.toRel(cluster, traitSet, relOptTbl, hints);
+        }
+
+        @Override
+        public boolean isUpdateAllowed(int colIdx) {
+            return table.isUpdateAllowed(colIdx);
+        }
+
+        @Override
+        public RelDataType rowTypeForInsert(IgniteTypeFactory factory) {
+            return table.rowTypeForInsert(factory);
+        }
+
+        @Override
+        public RelDataType rowTypeForUpdate(IgniteTypeFactory factory) {
+            return table.rowTypeForUpdate(factory);
+        }
+
+        @Override
+        public RelDataType rowTypeForDelete(IgniteTypeFactory factory) {
+            return table.rowTypeForDelete(factory);
+        }
+
+        @Override
+        public ImmutableIntList keyColumns() {
+            return table.keyColumns();
+        }
+
+        @Override
+        public Supplier<PartitionCalculator> partitionCalculator() {
+            return table.partitionCalculator();
+        }
+
+        @Override
+        public Map<String, IgniteIndex> indexes() {
+            return indexMap;
+        }
+
+        @Override
+        public int partitions() {
+            return table.partitions();
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
index 19cd84524e..d0f8384765 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
@@ -31,12 +31,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -46,6 +48,7 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
@@ -494,6 +497,9 @@ public class SqlSchemaManagerImplTest extends 
BaseIgniteAbstractTest {
 
             IgniteIndex index = findIndex(unwrapSchema(schemaPlus), "T1", 
"VAL1_IDX");
             assertNull(index, "Index should not be available");
+
+            Map<String, ?> indexes = findTableIndexes(versionAfter, 
PUBLIC_SCHEMA_NAME, "T1");
+            assertEquals(Set.of("T1_PK"), indexes.keySet());
         }
 
         makeIndexAvailable("VAL1_IDX");
@@ -516,6 +522,10 @@ public class SqlSchemaManagerImplTest extends 
BaseIgniteAbstractTest {
             assertThat(index.collation(), equalTo(RelCollations.of(
                     new RelFieldCollation(1, Direction.CLUSTERED, 
NullDirection.UNSPECIFIED)
             )));
+
+            Map<String, ?> indexes = findTableIndexes(versionAfter, 
PUBLIC_SCHEMA_NAME, "T1");
+            assertEquals(Set.of("T1_PK", "VAL1_IDX"), indexes.keySet());
+            assertSame(index, indexes.get("VAL1_IDX"), "VAL1_IDX cache entry");
         }
     }
 
@@ -545,6 +555,9 @@ public class SqlSchemaManagerImplTest extends 
BaseIgniteAbstractTest {
 
             IgniteIndex index2 = findIndex(unwrapSchema(schemaPlus), "T1", 
"IDX2");
             assertNull(index2);
+
+            Map<String, IgniteIndex> indexes = findTableIndexes(versionAfter, 
PUBLIC_SCHEMA_NAME, "T1");
+            assertEquals(Set.of("T1_PK"), indexes.keySet());
         }
 
         makeIndexAvailable("IDX1");
@@ -568,6 +581,10 @@ public class SqlSchemaManagerImplTest extends 
BaseIgniteAbstractTest {
                     new RelFieldCollation(1, Direction.ASCENDING, 
NullDirection.FIRST),
                     new RelFieldCollation(2, Direction.ASCENDING, 
NullDirection.LAST)
             )));
+
+            Map<String, ?> indexes = findTableIndexes(versionAfter, 
PUBLIC_SCHEMA_NAME, "T1");
+            assertEquals(Set.of("T1_PK", "IDX1"), indexes.keySet());
+            assertSame(index, indexes.get("IDX1"), "IDX1 cache entry");
         }
 
         makeIndexAvailable("IDX2");
@@ -591,9 +608,21 @@ public class SqlSchemaManagerImplTest extends 
BaseIgniteAbstractTest {
                     new RelFieldCollation(1, Direction.DESCENDING, 
NullDirection.FIRST),
                     new RelFieldCollation(2, Direction.DESCENDING, 
NullDirection.LAST)
             )));
+
+            Map<String, ?> indexes = findTableIndexes(versionAfter, 
PUBLIC_SCHEMA_NAME, "T1");
+            assertEquals(Set.of("T1_PK", "IDX1", "IDX2"), indexes.keySet());
+            assertSame(index, indexes.get("IDX2"), "IDX2 cache entry");
         }
     }
 
+    private Map<String, IgniteIndex> findTableIndexes(int catalogVersion, 
String schemaName, String tableName) {
+        Catalog catalog = catalogManager.catalog(catalogVersion);
+        CatalogTableDescriptor table = 
catalog.schema(schemaName).table(tableName);
+
+        IgniteTable igniteTable = sqlSchemaManager.table(catalogVersion, 
table.id());
+        return igniteTable.indexes();
+    }
+
     private void makeIndexAvailable(String name) {
         Map<String, CatalogIndexDescriptor> indices = 
catalogManager.indexes(catalogManager.latestCatalogVersion())
                 
.stream().collect(Collectors.toMap(CatalogIndexDescriptor::name, 
Function.identity()));


Reply via email to