This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b8bd6a6266 IGNITE-22757 Sql. Fix excessive memory usage in
schema-related code in SQL (#4180)
b8bd6a6266 is described below
commit b8bd6a62666832096cefcbd778f879e13c0461cf
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed Aug 7 16:48:27 2024 +0300
IGNITE-22757 Sql. Fix excessive memory usage in schema-related code in SQL
(#4180)
---
.../descriptors/CatalogTableDescriptor.java | 84 ++++++-
.../ignite/internal/catalog/CatalogTableTest.java | 9 +
.../internal/sql/engine/schema/IgniteIndex.java | 15 +-
.../sql/engine/schema/SqlSchemaManagerImpl.java | 241 +++++++++++++++++----
.../engine/schema/SqlSchemaManagerImplTest.java | 29 +++
5 files changed, 322 insertions(+), 56 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index ab14b290a0..2efed95c55 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -21,18 +21,20 @@ import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUS
import static
org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.readList;
import static
org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeList;
+import it.unimi.dsi.fastutil.ints.AbstractInt2ObjectMap.BasicEntry;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion;
import
org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.jetbrains.annotations.Nullable;
@@ -61,7 +63,7 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
private final List<String> colocationColumns;
@IgniteToStringExclude
- private Map<String, CatalogTableColumnDescriptor> columnsMap;
+ private Map<String, Int2ObjectMap.Entry<CatalogTableColumnDescriptor>>
columnsMap;
private long creationToken;
@@ -128,7 +130,14 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
this.zoneId = zoneId;
this.columns = Objects.requireNonNull(columns, "No columns defined.");
this.primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary
key columns.");
- this.columnsMap =
columns.stream().collect(Collectors.toMap(CatalogTableColumnDescriptor::name,
Function.identity()));
+
+ Map<String, Int2ObjectMap.Entry<CatalogTableColumnDescriptor>>
columnMap = IgniteUtils.newHashMap(columns.size());
+ for (int i = 0; i < columns.size(); i++) {
+ CatalogTableColumnDescriptor column = columns.get(i);
+ columnMap.put(column.name(), new BasicEntry<>(i, column));
+ }
+
+ this.columnsMap = columnMap;
this.colocationColumns = Objects.requireNonNullElse(colocationCols,
pkCols);
this.schemaVersions = Objects.requireNonNull(schemaVersions, "No
catalog schema versions.");
this.storageProfile = Objects.requireNonNull(storageProfile, "No
storage profile.");
@@ -160,66 +169,129 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
* Returns column descriptor for column with given name.
*/
public CatalogTableColumnDescriptor columnDescriptor(String columnName) {
- return columnsMap.get(columnName);
+ Entry<Integer, CatalogTableColumnDescriptor> column =
columnsMap.get(columnName);
+ if (column != null) {
+ return column.getValue();
+ } else {
+ return null;
+ }
}
+ /**
+ * Returns an identifier of a schema this table descriptor belongs to.
+ */
public int schemaId() {
return schemaId;
}
+ /**
+ * Returns versions of this table descriptor.
+ */
public CatalogTableSchemaVersions schemaVersions() {
return schemaVersions;
}
+ /**
+ * Returns an identifier of a distribution zone this table descriptor
belongs to.
+ */
public int zoneId() {
return zoneId;
}
+ /**
+ * Returns a identifier of the primary key index.
+ */
public int primaryKeyIndexId() {
return pkIndexId;
}
+ /**
+ * Returns a version of this table descriptor.
+ */
public int tableVersion() {
return schemaVersions.latestVersion();
}
+ /**
+ * Returns a list primary key column names.
+ */
public List<String> primaryKeyColumns() {
return primaryKeyColumns;
}
+ /**
+ * Returns a list colocation key column names.
+ */
public List<String> colocationColumns() {
return colocationColumns;
}
+ /**
+ * Returns a list column descriptors for the table.
+ */
public List<CatalogTableColumnDescriptor> columns() {
return columns;
}
+ /**
+ * Returns a column descriptor for column with given name.
+ */
public CatalogTableColumnDescriptor column(String name) {
- return columnsMap.get(name);
+ Entry<Integer, CatalogTableColumnDescriptor> column =
columnsMap.get(name);
+ if (column != null) {
+ return column.getValue();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Returns an index of a column with the given name, or {@code -1} if such
column does not exist.
+ */
+ public int columnIndex(String name) {
+ Entry<Integer, CatalogTableColumnDescriptor> column =
columnsMap.get(name);
+ if (column != null) {
+ return column.getKey();
+ } else {
+ return -1;
+ }
}
+ /**
+ * Returns {@code true} if this the given column is a part of the primary
key.
+ */
public boolean isPrimaryKeyColumn(String name) {
return primaryKeyColumns.contains(name);
}
+ /**
+ * Returns {@code true} if this the given column is a part of collocation
key.
+ */
public boolean isColocationColumn(String name) {
return colocationColumns.contains(name);
}
+ /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(CatalogTableDescriptor.class, this,
super.toString());
}
+ /**
+ * Returns a creation token.
+ */
public long creationToken() {
return creationToken;
}
+ /**
+ * Returns a name of a storage profile.
+ */
public String storageProfile() {
return storageProfile;
}
+ /** {@inheritDoc} */
@Override
public void updateToken(long updateToken) {
super.updateToken(updateToken);
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
index 9974e6280f..826057af1a 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogTableTest.java
@@ -171,6 +171,12 @@ public class CatalogTableTest extends
BaseCatalogManagerTest {
assertNotNull(desc);
// INT32 key
assertThat(desc.precision(), is(DEFAULT_PRECISION));
+
+ int key1ColIndex = schema.table(TABLE_NAME).columnIndex("key1");
+ assertEquals(0, key1ColIndex);
+
+ int key2ColIndex = schema.table(TABLE_NAME).columnIndex("key2");
+ assertEquals(1, key2ColIndex);
}
@Test
@@ -322,6 +328,9 @@ public class CatalogTableTest extends
BaseCatalogManagerTest {
assertEquals(11, column.length());
assertEquals(DEFAULT_PRECISION, column.precision());
assertEquals(DEFAULT_SCALE, column.scale());
+
+ int newColumnIndex =
schema.table(TABLE_NAME).columnIndex(NEW_COLUMN_NAME);
+ assertEquals(6, newColumnIndex);
}
@Test
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
index f21bc22745..763820b4d0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
@@ -38,6 +38,8 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor
import
org.apache.ignite.internal.catalog.descriptors.CatalogIndexColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -156,16 +158,14 @@ public class IgniteIndex {
return IgniteLogicalIndexScan.create(cluster, traitSet, relOptTable,
name, proj, condition, requiredCols);
}
- static RelCollation createIndexCollation(CatalogIndexDescriptor
descriptor, TableDescriptor tableDescriptor) {
+ static RelCollation createIndexCollation(CatalogIndexDescriptor
descriptor, CatalogTableDescriptor tableDescriptor) {
if (descriptor instanceof CatalogSortedIndexDescriptor) {
CatalogSortedIndexDescriptor sortedIndexDescriptor =
(CatalogSortedIndexDescriptor) descriptor;
List<CatalogIndexColumnDescriptor> columns =
sortedIndexDescriptor.columns();
List<RelFieldCollation> fieldCollations = new
ArrayList<>(columns.size());
- for (int i = 0; i < columns.size(); i++) {
- CatalogIndexColumnDescriptor column = columns.get(i);
- ColumnDescriptor columnDesc =
tableDescriptor.columnDescriptor(column.name());
- int fieldIndex = columnDesc.logicalIndex();
+ for (CatalogIndexColumnDescriptor column : columns) {
+ int fieldIndex = tableDescriptor.columnIndex(column.name());
RelFieldCollation fieldCollation;
switch (column.collation()) {
@@ -195,9 +195,10 @@ public class IgniteIndex {
List<RelFieldCollation> fieldCollations = new
ArrayList<>(columns.size());
for (String columnName : columns) {
- ColumnDescriptor columnDesc =
tableDescriptor.columnDescriptor(columnName);
+ CatalogTableColumnDescriptor tableColumn =
tableDescriptor.columnDescriptor(columnName);
+ int fieldIndex =
tableDescriptor.columns().indexOf(tableColumn);
- fieldCollations.add(new
RelFieldCollation(columnDesc.logicalIndex(), Direction.CLUSTERED,
NullDirection.UNSPECIFIED));
+ fieldCollations.add(new RelFieldCollation(fieldIndex,
Direction.CLUSTERED, NullDirection.UNSPECIFIED));
}
return RelCollations.of(fieldCollations);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 4b4dbe60b5..8abb1fdb36 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -30,9 +30,16 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
@@ -53,6 +60,7 @@ import
org.apache.ignite.internal.schema.DefaultValueGenerator;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
@@ -68,13 +76,26 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
private final Cache<Integer, SchemaPlus> schemaCache;
- private final Cache<Long, IgniteTable> tableCache;
+ /**
+ * Table cache by (tableId, tableVersion).
+ * Only data that included in a catalog table descriptor itself is
up-to-date.
+ * Table related information from other object is not reliable.
+ */
+ private final Cache<Long, IgniteTableImpl> tableCache;
+
+ /** Index cache by (indexId, indexStatus). */
+ private final Cache<Long, IgniteIndex> indexCache;
+
+ /** Table cache by (catalogVersion, tableId). Includes all table related
information. */
+ private final Cache<Long, ActualIgniteTable> fullDataTableCache;
/** Constructor. */
public SqlSchemaManagerImpl(CatalogManager catalogManager, CacheFactory
factory, int cacheSize) {
this.catalogManager = catalogManager;
this.schemaCache = factory.create(cacheSize);
this.tableCache = factory.create(cacheSize);
+ this.indexCache = factory.create(cacheSize);
+ this.fullDataTableCache = factory.create(cacheSize);
}
/** {@inheritDoc} */
@@ -108,9 +129,10 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
@Override
public IgniteTable table(int catalogVersion, int tableId) {
- return tableCache.get(tableCacheKey(catalogVersion, tableId), key -> {
+ return fullDataTableCache.get(cacheKey(catalogVersion, tableId), key
-> {
SchemaPlus rootSchema = schemaCache.get(catalogVersion);
+ // Retrieve table from the schema (if it exists).
if (rootSchema != null) {
for (String name : rootSchema.getSubSchemaNames()) {
SchemaPlus subSchema = rootSchema.getSubSchema(name);
@@ -121,7 +143,8 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
assert schema != null : "unknown schema " + subSchema;
- IgniteTable table = schema.tableByIdOpt(tableId);
+ // Schema contains a wrapper for IgniteTable that includes
actual information for a table (indexes, etc).
+ ActualIgniteTable table = (ActualIgniteTable)
schema.tableByIdOpt(tableId);
if (table != null) {
return table;
@@ -129,6 +152,8 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
}
}
+ // Load actual table information from the catalog.
+
Catalog catalog = catalogManager.catalog(catalogVersion);
if (catalog == null) {
@@ -141,17 +166,29 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
throw new IgniteInternalException(Common.INTERNAL_ERR, "Table
with given id not found: " + tableId);
}
- return createTable(catalog, tableDescriptor);
+ long tableKey = cacheKey(tableDescriptor.id(),
tableDescriptor.tableVersion());
+
+ IgniteTableImpl igniteTable = tableCache.get(tableKey, (x) -> {
+ TableDescriptor descriptor =
createTableDescriptorForTable(tableDescriptor);
+ return createTableDataOnlyTable(catalog, tableDescriptor,
descriptor);
+ });
+
+ Map<String, IgniteIndex> tableIndexes = getIndexes(catalog,
+ tableDescriptor.id(),
+ tableDescriptor.primaryKeyIndexId()
+ );
+
+ return new ActualIgniteTable(igniteTable, tableIndexes);
});
}
- private static long tableCacheKey(int catalogVersion, int tableId) {
- long cacheKey = catalogVersion;
+ private static long cacheKey(int part1, int part2) {
+ long cacheKey = part1;
cacheKey <<= 32;
- return cacheKey | tableId;
+ return cacheKey | part2;
}
- private static SchemaPlus createRootSchema(Catalog catalog) {
+ private SchemaPlus createRootSchema(Catalog catalog) {
SchemaPlus rootSchema = Frameworks.createRootSchema(false);
for (CatalogSchemaDescriptor schemaDescriptor : catalog.schemas()) {
@@ -162,7 +199,7 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
return rootSchema;
}
- private static IgniteSchema createSqlSchema(Catalog catalog,
CatalogSchemaDescriptor schemaDescriptor) {
+ private IgniteSchema createSqlSchema(Catalog catalog,
CatalogSchemaDescriptor schemaDescriptor) {
int catalogVersion = catalog.version();
String schemaName = schemaDescriptor.name();
@@ -171,9 +208,23 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
// Assemble sql-engine.TableDescriptors as they are required by
indexes.
for (CatalogTableDescriptor tableDescriptor :
schemaDescriptor.tables()) {
- schemaDataSources.add(
- createTable(catalog, tableDescriptor)
+ long tableKey = cacheKey(tableDescriptor.id(),
tableDescriptor.tableVersion());
+
+ // Load cached table by (id, version)
+ IgniteTableImpl igniteTable = tableCache.get(tableKey, (k) -> {
+ TableDescriptor descriptor =
createTableDescriptorForTable(tableDescriptor);
+ return createTableDataOnlyTable(catalog, tableDescriptor,
descriptor);
+ });
+
+ // Get actual indices
+ Map<String, IgniteIndex> tableIndexes = getIndexes(catalog,
+ tableDescriptor.id(),
+ tableDescriptor.primaryKeyIndexId()
);
+
+ // Store a wrapper for the table that includes actual information
for a table (indexes, etc),
+ // because the cached table entry (id, version) may not include
up-to-date information on indexes.
+ schemaDataSources.add(new ActualIgniteTable(igniteTable,
tableIndexes));
}
for (CatalogSystemViewDescriptor systemViewDescriptor :
schemaDescriptor.systemViews()) {
@@ -195,7 +246,8 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
private static IgniteIndex createSchemaIndex(
CatalogIndexDescriptor indexDescriptor,
- TableDescriptor tableDescriptor,
+ RelCollation outputCollation,
+ IgniteDistribution distribution,
boolean primaryKey
) {
Type type;
@@ -207,24 +259,21 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
throw new IllegalArgumentException("Unexpected index type: " +
indexDescriptor);
}
- RelCollation outputCollation =
IgniteIndex.createIndexCollation(indexDescriptor, tableDescriptor);
return new IgniteIndex(
- indexDescriptor.id(), indexDescriptor.name(), type,
tableDescriptor.distribution(), outputCollation, primaryKey
+ indexDescriptor.id(), indexDescriptor.name(), type,
distribution, outputCollation, primaryKey
);
}
private static TableDescriptor
createTableDescriptorForTable(CatalogTableDescriptor descriptor) {
List<CatalogTableColumnDescriptor> columns = descriptor.columns();
-
List<ColumnDescriptor> colDescriptors = new ArrayList<>(columns.size()
+ 1);
- Object2IntMap<String> columnToIndex = new
Object2IntOpenHashMap<>(columns.size() + 1);
+ Object2IntMap<String> columnToIndex = buildColumnToIndexMap(columns);
for (int i = 0; i < columns.size(); i++) {
CatalogTableColumnDescriptor col = columns.get(i);
boolean key = descriptor.isPrimaryKeyColumn(col.name());
ColumnDescriptor columnDescriptor = createColumnDescriptor(col,
key, i);
- columnToIndex.put(col.name(), i);
colDescriptors.add(columnDescriptor);
}
@@ -236,20 +285,35 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
}
}
- List<Integer> colocationColumns =
descriptor.colocationColumns().stream()
- .map(columnToIndex::getInt)
- .collect(Collectors.toList());
-
// Add virtual column.
ColumnDescriptorImpl partVirtualColumn =
createPartitionVirtualColumn(columns.size());
colDescriptors.add(partVirtualColumn);
- columnToIndex.put(partVirtualColumn.name(),
partVirtualColumn.logicalIndex());
+
+ IgniteDistribution distribution = createDistribution(descriptor,
columnToIndex);
+
+ return new TableDescriptorImpl(colDescriptors, distribution);
+ }
+
+ private static IgniteDistribution
createDistribution(CatalogTableDescriptor descriptor, Object2IntMap<String>
columnToIndex) {
+ List<Integer> colocationColumns =
descriptor.colocationColumns().stream()
+ .map(columnToIndex::getInt)
+ .collect(Collectors.toList());
// TODO Use the actual zone ID after implementing
https://issues.apache.org/jira/browse/IGNITE-18426.
int tableId = descriptor.id();
- IgniteDistribution distribution =
IgniteDistributions.affinity(colocationColumns, tableId, tableId);
- return new TableDescriptorImpl(colDescriptors, distribution);
+ return IgniteDistributions.affinity(colocationColumns, tableId,
tableId);
+ }
+
+ private static Object2IntMap<String>
buildColumnToIndexMap(List<CatalogTableColumnDescriptor> columns) {
+ Object2IntMap<String> columnToIndex = new
Object2IntOpenHashMap<>(columns.size() + 1);
+
+ for (int i = 0; i < columns.size(); i++) {
+ CatalogTableColumnDescriptor col = columns.get(i);
+ columnToIndex.put(col.name(), i);
+ }
+
+ return columnToIndex;
}
private static ColumnDescriptorImpl createPartitionVirtualColumn(int
logicalIndex) {
@@ -365,42 +429,77 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
return columnDescriptor;
}
- private static IgniteTable createTable(
+ private IgniteTableImpl createTableDataOnlyTable(
Catalog catalog,
- CatalogTableDescriptor tableDescriptor
+ CatalogTableDescriptor table,
+ TableDescriptor descriptor
) {
- TableDescriptor descriptor =
createTableDescriptorForTable(tableDescriptor);
+ Map<String, IgniteIndex> tableIndexes = getIndexes(catalog,
+ table.id(),
+ table.primaryKeyIndexId()
+ );
+ CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(catalog,
table.zoneId());
+
+ return createTable(table, descriptor, tableIndexes,
zoneDescriptor.partitions());
+ }
+
+ private Map<String, IgniteIndex> getIndexes(Catalog catalog, int tableId,
int primaryKeyIndexId) {
Map<String, IgniteIndex> tableIndexes = new HashMap<>();
- for (CatalogIndexDescriptor indexDescriptor :
catalog.indexes(tableDescriptor.id())) {
+ CatalogTableDescriptor table = catalog.table(tableId);
+ assert table != null;
+
+ for (CatalogIndexDescriptor indexDescriptor :
catalog.indexes(tableId)) {
if (indexDescriptor.status() != AVAILABLE) {
continue;
}
String indexName = indexDescriptor.name();
+ long indexKey = cacheKey(indexDescriptor.id(),
indexDescriptor.status().id());
- IgniteIndex schemaIndex = createSchemaIndex(
- indexDescriptor,
- descriptor,
- indexDescriptor.id() == tableDescriptor.primaryKeyIndexId()
- );
+ IgniteIndex schemaIndex = indexCache.get(indexKey, (x) -> {
+ RelCollation outputCollation =
IgniteIndex.createIndexCollation(indexDescriptor, table);
+ Object2IntMap<String> columnToIndex =
buildColumnToIndexMap(table.columns());
+ IgniteDistribution distribution = createDistribution(table,
columnToIndex);
+
+ return createSchemaIndex(
+ indexDescriptor,
+ outputCollation,
+ distribution,
+ indexDescriptor.id() == primaryKeyIndexId
+ );
+ });
tableIndexes.put(indexName, schemaIndex);
}
- int zoneId = tableDescriptor.zoneId();
+ return tableIndexes;
+ }
+
+ private static CatalogZoneDescriptor getZoneDescriptor(Catalog catalog,
int zoneId) {
CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
assert zoneDescriptor != null : "Zone is not found in schema: " +
zoneId;
- return createTable(tableDescriptor, descriptor, tableIndexes,
zoneDescriptor.partitions());
+ return zoneDescriptor;
}
- private static IgniteTable createTable(
+ private static IgniteTableImpl createTable(
CatalogTableDescriptor catalogTableDescriptor,
TableDescriptor tableDescriptor,
Map<String, IgniteIndex> indexes,
int parititions
) {
+ IgniteIndex primaryIndex = indexes.values().stream()
+ .filter(IgniteIndex::primaryKey)
+ .findFirst()
+ .orElseThrow();
+
+ // We do not need any index other than the primary index,
+ // all other indexes are stored in full table data cache.
+ Map<String, IgniteIndex> primaryKeyOnlyMap =
Map.of(primaryIndex.name(), primaryIndex);
+
+ ImmutableIntList primaryKeyColumns =
primaryIndex.collation().getKeys();
+
int tableId = catalogTableDescriptor.id();
String tableName = catalogTableDescriptor.name();
@@ -408,20 +507,76 @@ public class SqlSchemaManagerImpl implements
SqlSchemaManager {
// Let's fix table statistics keeping in mind IGNITE-19558 issue.
IgniteStatistic statistic = new IgniteStatistic(() -> 0.0d,
tableDescriptor.distribution());
- IgniteIndex primaryIndex = indexes.values().stream()
- .filter(IgniteIndex::primaryKey)
- .findFirst()
- .orElseThrow();
-
return new IgniteTableImpl(
tableName,
tableId,
catalogTableDescriptor.tableVersion(),
tableDescriptor,
- primaryIndex.collation().getKeys(),
+ primaryKeyColumns,
statistic,
- indexes,
+ primaryKeyOnlyMap,
parititions
);
}
+
+ private static class ActualIgniteTable extends AbstractIgniteDataSource
implements IgniteTable {
+
+ /** Cached table by id and version. */
+ private final IgniteTableImpl table;
+
+ /** Index map with up-to-date information. */
+ private final Map<String, IgniteIndex> indexMap;
+
+ ActualIgniteTable(IgniteTableImpl igniteTable, Map<String,
IgniteIndex> indexMap) {
+ super(igniteTable.name(), igniteTable.id(), igniteTable.version(),
igniteTable.descriptor(), igniteTable.getStatistic());
+
+ this.table = igniteTable;
+ this.indexMap = indexMap;
+ }
+
+ @Override
+ protected TableScan toRel(RelOptCluster cluster, RelTraitSet traitSet,
RelOptTable relOptTbl, List<RelHint> hints) {
+ return table.toRel(cluster, traitSet, relOptTbl, hints);
+ }
+
+ @Override
+ public boolean isUpdateAllowed(int colIdx) {
+ return table.isUpdateAllowed(colIdx);
+ }
+
+ @Override
+ public RelDataType rowTypeForInsert(IgniteTypeFactory factory) {
+ return table.rowTypeForInsert(factory);
+ }
+
+ @Override
+ public RelDataType rowTypeForUpdate(IgniteTypeFactory factory) {
+ return table.rowTypeForUpdate(factory);
+ }
+
+ @Override
+ public RelDataType rowTypeForDelete(IgniteTypeFactory factory) {
+ return table.rowTypeForDelete(factory);
+ }
+
+ @Override
+ public ImmutableIntList keyColumns() {
+ return table.keyColumns();
+ }
+
+ @Override
+ public Supplier<PartitionCalculator> partitionCalculator() {
+ return table.partitionCalculator();
+ }
+
+ @Override
+ public Map<String, IgniteIndex> indexes() {
+ return indexMap;
+ }
+
+ @Override
+ public int partitions() {
+ return table.partitions();
+ }
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
index 19cd84524e..d0f8384765 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImplTest.java
@@ -31,12 +31,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -46,6 +48,7 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
@@ -494,6 +497,9 @@ public class SqlSchemaManagerImplTest extends
BaseIgniteAbstractTest {
IgniteIndex index = findIndex(unwrapSchema(schemaPlus), "T1",
"VAL1_IDX");
assertNull(index, "Index should not be available");
+
+ Map<String, ?> indexes = findTableIndexes(versionAfter,
PUBLIC_SCHEMA_NAME, "T1");
+ assertEquals(Set.of("T1_PK"), indexes.keySet());
}
makeIndexAvailable("VAL1_IDX");
@@ -516,6 +522,10 @@ public class SqlSchemaManagerImplTest extends
BaseIgniteAbstractTest {
assertThat(index.collation(), equalTo(RelCollations.of(
new RelFieldCollation(1, Direction.CLUSTERED,
NullDirection.UNSPECIFIED)
)));
+
+ Map<String, ?> indexes = findTableIndexes(versionAfter,
PUBLIC_SCHEMA_NAME, "T1");
+ assertEquals(Set.of("T1_PK", "VAL1_IDX"), indexes.keySet());
+ assertSame(index, indexes.get("VAL1_IDX"), "VAL1_IDX cache entry");
}
}
@@ -545,6 +555,9 @@ public class SqlSchemaManagerImplTest extends
BaseIgniteAbstractTest {
IgniteIndex index2 = findIndex(unwrapSchema(schemaPlus), "T1",
"IDX2");
assertNull(index2);
+
+ Map<String, IgniteIndex> indexes = findTableIndexes(versionAfter,
PUBLIC_SCHEMA_NAME, "T1");
+ assertEquals(Set.of("T1_PK"), indexes.keySet());
}
makeIndexAvailable("IDX1");
@@ -568,6 +581,10 @@ public class SqlSchemaManagerImplTest extends
BaseIgniteAbstractTest {
new RelFieldCollation(1, Direction.ASCENDING,
NullDirection.FIRST),
new RelFieldCollation(2, Direction.ASCENDING,
NullDirection.LAST)
)));
+
+ Map<String, ?> indexes = findTableIndexes(versionAfter,
PUBLIC_SCHEMA_NAME, "T1");
+ assertEquals(Set.of("T1_PK", "IDX1"), indexes.keySet());
+ assertSame(index, indexes.get("IDX1"), "IDX1 cache entry");
}
makeIndexAvailable("IDX2");
@@ -591,9 +608,21 @@ public class SqlSchemaManagerImplTest extends
BaseIgniteAbstractTest {
new RelFieldCollation(1, Direction.DESCENDING,
NullDirection.FIRST),
new RelFieldCollation(2, Direction.DESCENDING,
NullDirection.LAST)
)));
+
+ Map<String, ?> indexes = findTableIndexes(versionAfter,
PUBLIC_SCHEMA_NAME, "T1");
+ assertEquals(Set.of("T1_PK", "IDX1", "IDX2"), indexes.keySet());
+ assertSame(index, indexes.get("IDX2"), "IDX2 cache entry");
}
}
+ private Map<String, IgniteIndex> findTableIndexes(int catalogVersion,
String schemaName, String tableName) {
+ Catalog catalog = catalogManager.catalog(catalogVersion);
+ CatalogTableDescriptor table =
catalog.schema(schemaName).table(tableName);
+
+ IgniteTable igniteTable = sqlSchemaManager.table(catalogVersion,
table.id());
+ return igniteTable.indexes();
+ }
+
private void makeIndexAvailable(String name) {
Map<String, CatalogIndexDescriptor> indices =
catalogManager.indexes(catalogManager.latestCatalogVersion())
.stream().collect(Collectors.toMap(CatalogIndexDescriptor::name,
Function.identity()));