This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 4cb2af8f17 IGNITE-20520 Specify schema version when getting table before query execution (#2645) 4cb2af8f17 is described below commit 4cb2af8f17e90b84aa998091d293bbf106e0a12d Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Tue Oct 3 13:44:45 2023 +0400 IGNITE-20520 Specify schema version when getting table before query execution (#2645) --- .../engine/exec/ExecutableTableRegistryImpl.java | 34 +++++++++------------- .../sql/engine/schema/CatalogSqlSchemaManager.java | 19 +++++++++--- .../sql/engine/schema/IgniteSystemViewImpl.java | 5 ++-- .../sql/engine/schema/IgniteTableImpl.java | 2 +- .../exec/ExecutableTableRegistrySelfTest.java | 3 +- .../sql/engine/exec/ExecutionServiceImplTest.java | 7 +---- .../engine/exec/NoOpExecutableTableRegistry.java | 2 +- .../sql/engine/planner/SystemViewPlannerTest.java | 2 +- .../engine/schema/CatalogSqlSchemaManagerTest.java | 2 -- 9 files changed, 38 insertions(+), 38 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java index f2b4d3c3b7..7012d6fcf4 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.sql.engine.exec; import com.github.benmanes.caffeine.cache.Caffeine; -import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; @@ -64,7 +63,7 @@ public class ExecutableTableRegistryImpl implements ExecutableTableRegistry, Sch /** {@inheritDoc} */ @Override public CompletableFuture<ExecutableTable> getTable(int tableId, int tableVersion, TableDescriptor tableDescriptor) { - return tableCache.computeIfAbsent(cacheKey(tableId, tableVersion), (k) -> loadTable(tableId, tableDescriptor)); + return tableCache.computeIfAbsent(cacheKey(tableId, tableVersion), (k) -> loadTable(tableId, tableVersion, tableDescriptor)); } /** {@inheritDoc} */ @@ -73,28 +72,23 @@ public class ExecutableTableRegistryImpl implements ExecutableTableRegistry, Sch tableCache.clear(); } - private CompletableFuture<ExecutableTable> loadTable(int tableId, TableDescriptor tableDescriptor) { - CompletableFuture<Map.Entry<InternalTable, SchemaRegistry>> f = tableManager.tableAsync(tableId) - .thenApply(table -> { - InternalTable internalTable = table.internalTable(); + private CompletableFuture<ExecutableTable> loadTable(int tableId, int tableVersion, TableDescriptor tableDescriptor) { + return tableManager.tableAsync(tableId) + .thenApply((table) -> { SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId); - return Map.entry(internalTable, schemaRegistry); - }); + SchemaDescriptor schemaDescriptor = schemaRegistry.schema(tableVersion); + TableRowConverterFactory converterFactory = requiredColumns -> new TableRowConverterImpl( + schemaRegistry, schemaDescriptor, tableDescriptor, requiredColumns + ); - return f.thenApply((table) -> { - SchemaRegistry schemaRegistry = table.getValue(); - SchemaDescriptor schemaDescriptor = schemaRegistry.schema(); - TableRowConverterFactory converterFactory = requiredColumns -> new TableRowConverterImpl( - schemaRegistry, schemaDescriptor, tableDescriptor, requiredColumns - ); - InternalTable internalTable = table.getKey(); - ScannableTable scannableTable = new ScannableTableImpl(internalTable, converterFactory, tableDescriptor); + InternalTable internalTable = table.internalTable(); + ScannableTable scannableTable = new ScannableTableImpl(internalTable, converterFactory, tableDescriptor); - UpdatableTableImpl updatableTable = new UpdatableTableImpl(tableId, tableDescriptor, internalTable.partitions(), - replicaService, clock, converterFactory.create(null), schemaDescriptor); + UpdatableTableImpl updatableTable = new UpdatableTableImpl(tableId, tableDescriptor, internalTable.partitions(), + replicaService, clock, converterFactory.create(null), schemaDescriptor); - return new ExecutableTableImpl(scannableTable, updatableTable); - }); + return new ExecutableTableImpl(scannableTable, updatableTable); + }); } private static final class ExecutableTableImpl implements ExecutableTable { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java index c8dcfb56da..e10656d9da 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManager.java @@ -98,7 +98,7 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { return catalogManager.catalogReadyFuture(version); } - private static SchemaPlus createSqlSchema(int version, CatalogSchemaDescriptor schemaDescriptor) { + private static SchemaPlus createSqlSchema(int catalogVersion, CatalogSchemaDescriptor schemaDescriptor) { String schemaName = schemaDescriptor.name(); int numTables = schemaDescriptor.tables().length; @@ -140,7 +140,14 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { IgniteStatistic statistic = new IgniteStatistic(() -> 0.0d, descriptor.distribution()); Map<String, IgniteIndex> tableIndexMap = schemaTableIndexes.getOrDefault(tableId, Collections.emptyMap()); - IgniteTable schemaTable = new IgniteTableImpl(tableName, tableId, version, descriptor, statistic, tableIndexMap); + IgniteTable schemaTable = new IgniteTableImpl( + tableName, + tableId, + tableDescriptor.tableVersion(), + descriptor, + statistic, + tableIndexMap + ); schemaDataSources.add(schemaTable); } @@ -150,14 +157,18 @@ public class CatalogSqlSchemaManager implements SqlSchemaManager { String viewName = systemViewDescriptor.name(); TableDescriptor descriptor = createTableDescriptorForSystemView(systemViewDescriptor); - IgniteSystemView schemaTable = new IgniteSystemViewImpl(viewName, viewId, version, descriptor); + IgniteSystemView schemaTable = new IgniteSystemViewImpl( + viewName, + viewId, + descriptor + ); schemaDataSources.add(schemaTable); } // create root schema SchemaPlus rootSchema = Frameworks.createRootSchema(false); - IgniteSchema igniteSchema = new IgniteSchema(schemaName, version, schemaDataSources); + IgniteSchema igniteSchema = new IgniteSchema(schemaName, catalogVersion, schemaDataSources); return rootSchema.add(schemaName, igniteSchema); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSystemViewImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSystemViewImpl.java index 191d465678..07588d23e2 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSystemViewImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSystemViewImpl.java @@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.schema.Statistic; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalSystemViewScan; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.checkerframework.checker.nullness.qual.Nullable; @@ -35,8 +36,8 @@ import org.checkerframework.checker.nullness.qual.Nullable; public class IgniteSystemViewImpl extends AbstractIgniteDataSource implements IgniteSystemView { /** Constructor. */ - public IgniteSystemViewImpl(String name, int id, int version, TableDescriptor desc) { - super(name, id, version, desc, new SystemViewStatistic(desc.distribution())); + public IgniteSystemViewImpl(String name, int id, TableDescriptor desc) { + super(name, id, CatalogTableDescriptor.INITIAL_TABLE_VERSION, desc, new SystemViewStatistic(desc.distribution())); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java index 32349b7e69..83982657ae 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java @@ -35,7 +35,7 @@ public class IgniteTableImpl extends AbstractIgniteDataSource implements IgniteT private final Map<String, IgniteIndex> indexMap; /** Constructor. */ - public IgniteTableImpl(String name, int id, int version, TableDescriptor desc, + public IgniteTableImpl(String name, int id, int version, TableDescriptor desc, Statistic statistic, Map<String, IgniteIndex> indexMap) { super(name, id, version, desc, statistic); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java index 99b56ab6a8..81d27aee05 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistrySelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.when; import java.util.List; @@ -152,7 +153,7 @@ public class ExecutableTableRegistrySelfTest extends BaseIgniteAbstractTest { when(tableManager.tableAsync(tableId)).thenReturn(CompletableFuture.completedFuture(table)); when(schemaManager.schemaRegistry(tableId)).thenReturn(schemaRegistry); - when(schemaRegistry.schema()).thenReturn(schemaDescriptor); + when(schemaRegistry.schema(anyInt())).thenReturn(schemaDescriptor); when(descriptor.iterator()).thenReturn(List.<ColumnDescriptor>of().iterator()); return registry.getTable(tableId, tableVersion, descriptor); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index a86016c7e5..d392e6f101 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -899,12 +899,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { ); } - return new TestTable( - new TableDescriptorImpl(columns, distr), - name, - size, - List.of() - ); + return new TestTable(new TableDescriptorImpl(columns, distr), name, size, List.of()); } private static class CapturingMailboxRegistry implements MailboxRegistry { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java index 04e57b9ad1..cf82bba832 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/NoOpExecutableTableRegistry.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; public final class NoOpExecutableTableRegistry implements ExecutableTableRegistry { /** {@inheritDoc} */ @Override - public CompletableFuture<ExecutableTable> getTable(int tableId, int schemaVersion, TableDescriptor tableDescriptor) { + public CompletableFuture<ExecutableTable> getTable(int tableId, int tableVersion, TableDescriptor tableDescriptor) { return CompletableFuture.completedFuture(new NoOpExecutableTable(tableId)); } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SystemViewPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SystemViewPlannerTest.java index f566c1cf01..06755ea666 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SystemViewPlannerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SystemViewPlannerTest.java @@ -143,7 +143,7 @@ public class SystemViewPlannerTest extends AbstractPlannerTest { int id = SYSTEM_VIEW_ID.incrementAndGet(); TableDescriptorImpl tableDescriptor = new TableDescriptorImpl(columns, IgniteDistributions.single()); - return new IgniteSystemViewImpl(name, id, 1, tableDescriptor); + return new IgniteSystemViewImpl(name, id, tableDescriptor); } private static <T extends RelNode> Predicate<T> hasExpr(Function<T, RexNode> expr, String... expectedExprs) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java index 1b1a71a1b5..2cf970e427 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java @@ -141,7 +141,6 @@ public class CatalogSqlSchemaManagerTest extends BaseIgniteAbstractTest { IgniteTable table = getTable(schema, testTable); assertEquals(testTable.id, table.id()); - assertEquals(schema.version(), table.version()); TableDescriptor descriptor = table.descriptor(); assertEquals(testTable.columns.size(), descriptor.columnsCount(), "column count"); @@ -222,7 +221,6 @@ public class CatalogSqlSchemaManagerTest extends BaseIgniteAbstractTest { IgniteTable table = getTable(schema, testTable); assertEquals(testTable.id, table.id()); - assertEquals(schema.version(), table.version()); ColumnDescriptor c1 = table.descriptor().columnDescriptor("c1"); assertNull(c1.defaultValue());