This is an automated email from the ASF dual-hosted git repository. korlov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 953d0d4e53 IGNITE-20437 Sql. Support system views in execution (#2666) 953d0d4e53 is described below commit 953d0d4e53dccf12f2b68b19d5a9e25000d05dfd Author: korlov42 <kor...@gridgain.com> AuthorDate: Fri Oct 6 12:03:25 2023 +0300 IGNITE-20437 Sql. Support system views in execution (#2666) --- .../java/org/apache/ignite/lang/ErrorGroups.java | 10 + .../runner/app/ItIgniteNodeRestartTest.java | 4 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 6 +- modules/sql-engine/build.gradle | 1 + .../internal/sql/engine/SqlQueryProcessor.java | 45 ++- .../exec/ExecutionDependencyResolverImpl.java | 23 +- .../sql/engine/exec/LogicalRelImplementor.java | 45 ++- .../sql/engine/exec/ResolvedDependencies.java | 17 +- .../sql/engine/exec/ScannableDataSource.java} | 20 +- ...vider.java => ScannableDataSourceProvider.java} | 18 +- .../internal/sql/engine/exec/SqlRowHandler.java | 3 +- .../exec/mapping/ExecutionTargetProvider.java | 18 +- .../sql/engine/exec/mapping/FragmentMapper.java | 7 +- .../sql/engine/exec/mapping/FragmentSplitter.java | 48 ++- .../engine/exec/mapping/MappingServiceImpl.java | 27 +- .../sql/engine/exec/rel/DataSourceScanNode.java | 81 +++++ .../ignite/internal/sql/engine/prepare/Cloner.java | 4 +- .../internal/sql/engine/prepare/Fragment.java | 38 ++- .../internal/sql/engine/prepare/QuerySplitter.java | 49 ++- .../exec/ExecutionDependencyResolverSelfTest.java | 2 +- .../sql/engine/exec/ExecutionServiceImplTest.java | 11 +- .../sql/engine/exec/rel/AbstractExecutionTest.java | 63 +++- .../exec/rel/AbstractSetOpExecutionTest.java | 9 +- .../sql/engine/exec/rel/BaseAggregateTest.java | 8 +- .../exec/rel/DataSourceScanNodeSelfTest.java | 345 +++++++++++++++++++++ .../sql/engine/exec/rel/ExchangeExecutionTest.java | 9 +- .../sql/engine/exec/rel/ExecutionTest.java | 8 +- .../rel/HashAggregateSingleGroupExecutionTest.java | 9 +- .../exec/rel/HashIndexSpoolExecutionTest.java | 9 +- .../exec/rel/IndexScanNodeExecutionTest.java | 9 +- .../sql/engine/exec/rel/LimitExecutionTest.java | 9 +- .../engine/exec/rel/MergeJoinExecutionTest.java | 8 +- .../exec/rel/NestedLoopJoinExecutionTest.java | 8 +- .../exec/rel/SortedIndexSpoolExecutionTest.java | 8 +- .../exec/rel/TableScanNodeExecutionTest.java | 9 +- .../engine/exec/rel/TableSpoolExecutionTest.java | 9 +- .../sql/engine/framework/TestBuilders.java | 19 +- .../internal/sql/engine/framework/TestNode.java | 2 +- .../internal/systemview/SystemViewManager.java | 21 ++ .../internal/systemview/SystemViewManagerImpl.java | 77 ++++- .../internal/systemview/SystemViewManagerTest.java | 66 ++++ 41 files changed, 1051 insertions(+), 131 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java index 963280289a..ce198c36d4 100755 --- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java @@ -254,6 +254,16 @@ public class ErrorGroups { /** Session closed error. Operation is rejected because SQL session was closed. */ public static final int SESSION_CLOSED_ERR = SQL_ERR_GROUP.registerErrorCode((short) 11); + + /** + * SQL engine was unable to map query on current cluster topology. + * + * <p>This may be due to a variety of reasons, but most probably because of all nodes hosting certain system view + * or a table partition went offline. + * + * <p>See error message for details. + */ + public static final int MAPPING_ERR = SQL_ERR_GROUP.registerErrorCode((short) 12); } /** Meta storage error group. */ diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 97a0019bca..2bbe6f8f67 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -110,6 +110,7 @@ import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.DataStorageModule; import org.apache.ignite.internal.storage.DataStorageModules; +import org.apache.ignite.internal.systemview.SystemViewManagerImpl; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; @@ -405,7 +406,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { replicaSvc, hybridClock, catalogManager, - metricManager + metricManager, + new SystemViewManagerImpl(name, catalogManager) ); // Preparing the result map. diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 9b1cc38fab..d25d7429e4 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -529,8 +529,9 @@ public class IgniteImpl implements Ignite { delayDurationMsSupplier ); - systemViewManager = new SystemViewManagerImpl(catalogManager); + systemViewManager = new SystemViewManagerImpl(name, catalogManager); nodeAttributesCollector.register(systemViewManager); + logicalTopology.addEventListener(systemViewManager); raftMgr.appendEntriesRequestInterceptor(new CheckCatalogVersionOnAppendEntries(catalogManager)); raftMgr.actionRequestInterceptor(new CheckCatalogVersionOnActionRequest(catalogManager)); @@ -589,7 +590,8 @@ public class IgniteImpl implements Ignite { replicaSvc, clock, catalogManager, - metricManager + metricManager, + systemViewManager ); sql = new IgniteSqlImpl(qryEngine, new IgniteTransactionsImpl(txManager, observableTimestampTracker)); diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle index 57beaa5bcb..94b8d4e999 100644 --- a/modules/sql-engine/build.gradle +++ b/modules/sql-engine/build.gradle @@ -38,6 +38,7 @@ dependencies { implementation project(':ignite-catalog') implementation project(':ignite-metrics') implementation project(':ignite-cluster-management') + implementation project(':ignite-system-view') implementation libs.jetbrains.annotations implementation libs.fastutil.core implementation libs.caffeine diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index f80787f964..01fc27d788 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; +import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR; @@ -72,6 +73,8 @@ import org.apache.ignite.internal.sql.engine.prepare.QueryPlan; import org.apache.ignite.internal.sql.engine.property.PropertiesHelper; import org.apache.ignite.internal.sql.engine.property.PropertiesHolder; import org.apache.ignite.internal.sql.engine.schema.CatalogSqlSchemaManager; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.session.Session; import org.apache.ignite.internal.sql.engine.session.SessionId; @@ -82,16 +85,19 @@ import org.apache.ignite.internal.sql.engine.session.SessionProperty; import org.apache.ignite.internal.sql.engine.sql.ParsedResult; import org.apache.ignite.internal.sql.engine.sql.ParserService; import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl; +import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.sql.engine.util.BaseQueryContext; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory; import org.apache.ignite.internal.sql.metrics.SqlClientMetricSource; import org.apache.ignite.internal.storage.DataStorageManager; +import org.apache.ignite.internal.systemview.SystemViewManager; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.ErrorGroups.Sql; import org.apache.ignite.lang.SchemaNotFoundException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.sql.SqlException; @@ -158,6 +164,10 @@ public class SqlQueryProcessor implements QueryProcessor { private final ReplicaService replicaService; + private final SqlSchemaManager sqlSchemaManager; + + private final SystemViewManager systemViewManager; + private volatile SessionManager sessionManager; private volatile QueryTaskExecutor taskExecutor; @@ -166,8 +176,6 @@ public class SqlQueryProcessor implements QueryProcessor { private volatile PrepareService prepareSvc; - private volatile SqlSchemaManager sqlSchemaManager; - /** Clock. */ private final HybridClock clock; @@ -192,7 +200,8 @@ public class SqlQueryProcessor implements QueryProcessor { ReplicaService replicaService, HybridClock clock, CatalogManager catalogManager, - MetricManager metricManager + MetricManager metricManager, + SystemViewManager systemViewManager ) { this.clusterSrvc = clusterSrvc; this.logicalTopologyService = logicalTopologyService; @@ -204,6 +213,7 @@ public class SqlQueryProcessor implements QueryProcessor { this.clock = clock; this.catalogManager = catalogManager; this.metricManager = metricManager; + this.systemViewManager = systemViewManager; sqlSchemaManager = new CatalogSqlSchemaManager( catalogManager, @@ -250,13 +260,16 @@ public class SqlQueryProcessor implements QueryProcessor { var executableTableRegistry = new ExecutableTableRegistryImpl(tableManager, schemaManager, replicaService, clock, TABLE_CACHE_SIZE); - var dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry); + var dependencyResolver = new ExecutionDependencyResolverImpl( + executableTableRegistry, + view -> () -> systemViewManager.scanView(view.name()) + ); var executionTargetProvider = new ExecutionTargetProvider() { @Override - public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, int tableId) { - return tableManager.tableAsync(tableId) - .thenCompose(table -> table.internalTable().primaryReplicas()) + public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, IgniteTable table) { + return tableManager.tableAsync(table.id()) + .thenCompose(tbl -> tbl.internalTable().primaryReplicas()) .thenApply(replicas -> { List<NodeWithTerm> assignments = replicas.stream() .map(primaryReplica -> new NodeWithTerm(primaryReplica.node().name(), primaryReplica.term())) @@ -265,6 +278,24 @@ public class SqlQueryProcessor implements QueryProcessor { return factory.partitioned(assignments); }); } + + @Override + public CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) { + List<String> nodes = systemViewManager.owningNodes(view.name()); + + if (nullOrEmpty(nodes)) { + return CompletableFuture.failedFuture( + new SqlException(Sql.MAPPING_ERR, format("The view with name '{}' could not be found on" + + " any active nodes in the cluster", view.name())) + ); + } + + return CompletableFuture.completedFuture( + view.distribution() == IgniteDistributions.single() + ? factory.oneOf(nodes) + : factory.allOf(nodes) + ); + } }; var mappingService = new MappingServiceImpl(nodeName, executionTargetProvider); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java index 90d35e9b35..c1351ca8eb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java @@ -28,10 +28,12 @@ import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle; import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.rel.IgniteSender; +import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan; import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify; import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.sql.engine.trait.DistributionFunction; @@ -45,9 +47,14 @@ import org.apache.ignite.internal.sql.engine.trait.TraitUtils; public class ExecutionDependencyResolverImpl implements ExecutionDependencyResolver { private final ExecutableTableRegistry registry; + private final ScannableDataSourceProvider dataSourceProvider; - public ExecutionDependencyResolverImpl(ExecutableTableRegistry registry) { + public ExecutionDependencyResolverImpl( + ExecutableTableRegistry registry, + ScannableDataSourceProvider dataSourceProvider + ) { this.registry = registry; + this.dataSourceProvider = dataSourceProvider; } /** @@ -56,6 +63,7 @@ public class ExecutionDependencyResolverImpl implements ExecutionDependencyResol @Override public CompletableFuture<ResolvedDependencies> resolveDependencies(Iterable<IgniteRel> rels, IgniteSchema schema) { Map<Integer, CompletableFuture<ExecutableTable>> tableMap = new HashMap<>(); + Map<Integer, ScannableDataSource> dataSources = new HashMap<>(); IgniteRelShuttle shuttle = new IgniteRelShuttle() { @Override @@ -103,6 +111,17 @@ public class ExecutionDependencyResolverImpl implements ExecutionDependencyResol return rel; } + @Override + public IgniteRel visit(IgniteSystemViewScan rel) { + IgniteSystemView view = rel.getTable().unwrap(IgniteSystemView.class); + + assert view != null; + + dataSources.put(view.id(), dataSourceProvider.forSystemView(view)); + + return rel; + } + private void resolveDistributionFunction(IgniteDistribution distribution) { DistributionFunction function = distribution.function(); @@ -135,7 +154,7 @@ public class ExecutionDependencyResolverImpl implements ExecutionDependencyResol .map(e -> Map.entry(e.getKey(), e.getValue().join())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - return new ResolvedDependencies(map); + return new ResolvedDependencies(map, dataSources); }); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java index 130bea8319..10612cda10 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java @@ -44,6 +44,8 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.ignite.internal.schema.BinaryTupleSchema; +import org.apache.ignite.internal.schema.BinaryTupleSchema.Element; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory; import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable; @@ -52,6 +54,7 @@ import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType; import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup; import org.apache.ignite.internal.sql.engine.exec.rel.AbstractSetOpNode; import org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNode; +import org.apache.ignite.internal.sql.engine.exec.rel.DataSourceScanNode; import org.apache.ignite.internal.sql.engine.exec.rel.FilterNode; import org.apache.ignite.internal.sql.engine.exec.rel.HashAggregateNode; import org.apache.ignite.internal.sql.engine.exec.rel.Inbox; @@ -108,9 +111,12 @@ import org.apache.ignite.internal.sql.engine.rel.set.IgniteMapSetOp; import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceIntersect; import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp; import org.apache.ignite.internal.sql.engine.rule.LogicalScanConverterRule; +import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor; +import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; +import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.sql.engine.trait.Destination; import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution; import org.apache.ignite.internal.sql.engine.trait.TraitUtils; @@ -444,7 +450,33 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> /** {@inheritDoc} */ @Override public Node<RowT> visit(IgniteSystemViewScan rel) { - throw new UnsupportedOperationException("System view scan is not implemented"); + RexNode condition = rel.condition(); + List<RexNode> projects = rel.projects(); + ImmutableBitSet requiredColumns = rel.requiredColumns(); + IgniteDataSource igniteDataSource = rel.getTable().unwrapOrThrow(IgniteDataSource.class); + + BinaryTupleSchema schema = fromTableDescriptor(igniteDataSource.descriptor()); + + ScannableDataSource dataSource = resolvedDependencies.dataSource(igniteDataSource.id()); + + IgniteTypeFactory typeFactory = ctx.getTypeFactory(); + + RelDataType rowType = igniteDataSource.getRowType(typeFactory, requiredColumns); + + Predicate<RowT> filters = condition == null ? null : expressionFactory.predicate(condition, rowType); + Function<RowT, RowT> prj = projects == null ? null : expressionFactory.project(projects, rowType); + + RowSchema rowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType)); + RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema); + return new DataSourceScanNode<>( + ctx, + rowFactory, + schema, + dataSource, + filters, + prj, + requiredColumns == null ? null : requiredColumns.toBitSet() + ); } /** {@inheritDoc} */ @@ -859,4 +891,15 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> public <T extends Node<RowT>> T go(IgniteRel rel) { return (T) visit(rel); } + + private static BinaryTupleSchema fromTableDescriptor(TableDescriptor descriptor) { + Element[] elements = new Element[descriptor.columnsCount()]; + + int idx = 0; + for (ColumnDescriptor column : descriptor) { + elements[idx++] = new Element(column.physicalType(), column.nullable()); + } + + return BinaryTupleSchema.create(elements); + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java index 7dfffbebb6..2371b09f87 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java @@ -27,9 +27,15 @@ public class ResolvedDependencies { private final Map<Integer, ExecutableTable> tableMap; + private final Map<Integer, ScannableDataSource> dataSourceMap; + /** Constructor. */ - public ResolvedDependencies(Map<Integer, ExecutableTable> tableMap) { + public ResolvedDependencies( + Map<Integer, ExecutableTable> tableMap, + Map<Integer, ScannableDataSource> dataSourceMap + ) { this.tableMap = tableMap; + this.dataSourceMap = dataSourceMap; } /** @@ -56,6 +62,15 @@ public class ResolvedDependencies { return executableTable.tableDescriptor(); } + /** Returns data source instance by given id. */ + public ScannableDataSource dataSource(int dataSourceId) { + ScannableDataSource dataSource = dataSourceMap.get(dataSourceId); + + assert dataSource != null : "DataSource does not exist: " + dataSourceId; + + return dataSource; + } + private ExecutableTable getTable(int tableId) { ExecutableTable executableTable = tableMap.get(tableId); assert executableTable != null : "ExecutableTable does not exist: " + tableId; diff --git a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSource.java similarity index 58% copy from modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSource.java index 1c2ef6814d..7453c899fb 100644 --- a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSource.java @@ -15,21 +15,15 @@ * limitations under the License. */ -package org.apache.ignite.internal.systemview; +package org.apache.ignite.internal.sql.engine.exec; -import org.apache.ignite.internal.manager.IgniteComponent; +import java.util.concurrent.Flow.Publisher; +import org.apache.ignite.internal.schema.row.InternalTuple; /** - * The system view manager is responsible for registering system views in the cluster. + * Provides read operations over an abstract data source. */ -public interface SystemViewManager extends IgniteComponent { - /** - * Registers a system view. - * - * <p>Registration of views is completed when the system view manager starts. Therefore, - * it is necessary for other components to register the views before the manager is started. - * - * @param view System view to register. - */ - void register(SystemView<?> view); +@FunctionalInterface +public interface ScannableDataSource { + Publisher<InternalTuple> scan(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSourceProvider.java similarity index 58% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSourceProvider.java index 8d9335c314..4449109a61 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSourceProvider.java @@ -15,22 +15,14 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.exec.mapping; +package org.apache.ignite.internal.sql.engine.exec; -import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; /** - * An integration point that helps the mapper to acquire an execution target of particular - * relation from fragment. + * An integration point that helps the execution to scan over arbitrary source of rows. */ @SuppressWarnings("InterfaceMayBeAnnotatedFunctional") -public interface ExecutionTargetProvider { - /** - * Returns an execution target for a table with given id. - * - * @param factory A factory to create target for given table. - * @param tableId A table id to create execution target for. - * @return A future representing the result. - */ - CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, int tableId); +public interface ScannableDataSourceProvider { + ScannableDataSource forSystemView(IgniteSystemView view); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java index 6683214d84..03a87a63e3 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.sql.engine.exec; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl.UNSPECIFIED_VALUE_PLACEHOLDER; import java.math.BigDecimal; @@ -176,7 +177,7 @@ public class SqlRowHandler implements RowHandler<RowWrapper> { /** {@inheritDoc} */ @Override public RowWrapper create(InternalTuple tuple) { - assert schemaLen == tuple.elementCount(); + assert schemaLen == tuple.elementCount() : format("schemaLen={}, tupleSize={}", schemaLen, tuple.elementCount()); return new BinaryTupleRowWrapper(rowSchema, tuple); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java index 8d9335c314..fa6f3a343c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java @@ -18,19 +18,29 @@ package org.apache.ignite.internal.sql.engine.exec.mapping; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; /** * An integration point that helps the mapper to acquire an execution target of particular * relation from fragment. */ -@SuppressWarnings("InterfaceMayBeAnnotatedFunctional") public interface ExecutionTargetProvider { /** - * Returns an execution target for a table with given id. + * Returns an execution target for a given table. * * @param factory A factory to create target for given table. - * @param tableId A table id to create execution target for. + * @param table A table to create execution target for. * @return A future representing the result. */ - CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, int tableId); + CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, IgniteTable table); + + /** + * Returns an execution target for a given view. + * + * @param factory A factory to create target for given table. + * @param view A view to create execution target for. + * @return A future representing the result. + */ + CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory factory, IgniteSystemView view); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java index 202673fa83..86e7ba61a2 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate; import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate; import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp; +import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.sql.engine.trait.TraitUtils; @@ -413,10 +414,10 @@ class FragmentMapper { } private Mapping mapTableScan(long sourceId, ProjectableFilterableTableScan rel) { - IgniteTable igniteTable = rel.getTable().unwrapOrThrow(IgniteTable.class); + IgniteDataSource igniteDataSource = rel.getTable().unwrapOrThrow(IgniteDataSource.class); - ExecutionTarget target = targets.get(igniteTable.id()); - assert target != null : "No colocation group for " + igniteTable.id(); + ExecutionTarget target = targets.get(igniteDataSource.id()); + assert target != null : "No colocation group for " + igniteDataSource.id(); return newMapping(sourceId, target); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java index 42f15dee32..f23ed61d6a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.sql.engine.exec.mapping; -import it.unimi.dsi.fastutil.ints.IntArraySet; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.Deque; @@ -36,9 +36,11 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan; import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.rel.IgniteSender; +import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan; import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify; import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.util.Commons; @@ -77,23 +79,54 @@ class FragmentSplitter extends IgniteRelShuttle { return res; } + @Override + public IgniteRel visit(IgniteSystemViewScan rel) { + IgniteSystemView view = rel.getTable().unwrap(IgniteSystemView.class); + + assert view != null; + + if (curr.seenRelations.add(view.id())) { + curr.systemViews.add(view); + } + + return super.visit(rel); + } + @Override public IgniteRel visit(IgniteTableScan rel) { - curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id()); + IgniteTable table = rel.getTable().unwrap(IgniteTable.class); + + assert table != null; + + if (curr.seenRelations.add(table.id())) { + curr.tables.add(table); + } return super.visit(rel); } @Override public IgniteRel visit(IgniteIndexScan rel) { - curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id()); + IgniteTable table = rel.getTable().unwrap(IgniteTable.class); + + assert table != null; + + if (curr.seenRelations.add(table.id())) { + curr.tables.add(table); + } return super.visit(rel); } @Override public IgniteRel visit(IgniteTableModify rel) { - curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id()); + IgniteTable table = rel.getTable().unwrap(IgniteTable.class); + + assert table != null; + + if (curr.seenRelations.add(table.id())) { + curr.tables.add(table); + } return super.visit(rel); } @@ -182,8 +215,11 @@ class FragmentSplitter extends IgniteRelShuttle { private IgniteRel root; + private final IntSet seenRelations = new IntOpenHashSet(); + private final List<IgniteReceiver> remotes = new ArrayList<>(); - private final IntSet tableIds = new IntArraySet(); + private final List<IgniteTable> tables = new ArrayList<>(); + private final List<IgniteSystemView> systemViews = new ArrayList<>(); private FragmentProto(long id, boolean correlated, IgniteRel root) { this.id = id; @@ -192,7 +228,7 @@ class FragmentSplitter extends IgniteRelShuttle { } Fragment build() { - return new Fragment(id, correlated, root, remotes, tableIds); + return new Fragment(id, correlated, root, remotes, tables, systemViews); } } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java index 8b0c1cfcff..e84f62c928 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.mapping; import static java.util.concurrent.CompletableFuture.allOf; import static org.apache.ignite.internal.util.CollectionUtils.first; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; -import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; @@ -34,6 +33,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; @@ -43,6 +43,7 @@ import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan; import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver; import org.apache.ignite.internal.sql.engine.rel.IgniteSender; import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.lang.ErrorGroups.Sql; /** * An implementation of {@link MappingService}. @@ -76,14 +77,17 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL List<Fragment> fragments0 = Commons.transform(fragments, fragment -> fragment.attach(context.cluster())); - List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets = fragments0.stream() - .flatMap(fragment -> - fragment.tableIds().intStream() - .mapToObj(id -> - targetProvider.forTable(context.targetFactory(), id) - .thenApply(target -> IntObjectPair.of(id, target)) + List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets = + fragments0.stream().flatMap(fragment -> Stream.concat( + fragment.tables().stream() + .map(table -> targetProvider.forTable(context.targetFactory(), table) + .thenApply(target -> IntObjectPair.of(table.id(), target)) + ), + fragment.systemViews().stream() + .map(view -> targetProvider.forSystemView(context.targetFactory(), view) + .thenApply(target -> IntObjectPair.of(view.id(), target)) ) - ) + )) .collect(Collectors.toList()); return allOf(targets.toArray(new CompletableFuture[0])) @@ -155,12 +159,12 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL } if (!lastAttemptSucceed) { - throw new IgniteInternalException(INTERNAL_ERR, "Unable to map query", ex); + throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable to map query: " + ex.getMessage(), ex); } List<MappedFragment> result = new ArrayList<>(fragmentsToMap.size()); for (Fragment fragment : fragmentsToMap) { - FragmentMapping mapping = mappingByFragmentId.get(fragment.fragmentId()); + FragmentMapping mapping = mappingByFragmentId.get(fragment.fragmentId()); ColocationGroup targetGroup = null; if (!fragment.rootFragment()) { @@ -242,7 +246,8 @@ public class MappingServiceImpl implements MappingService, LogicalTopologyEventL sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(), sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution()); - fragment = new Fragment(fragment.fragmentId(), fragment.correlated(), sender, fragment.remotes(), fragment.tableIds()); + fragment = new Fragment(fragment.fragmentId(), fragment.correlated(), sender, + fragment.remotes(), fragment.tables(), fragment.systemViews()); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNode.java new file mode 100644 index 0000000000..24da64c175 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNode.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.rel; + +import java.util.BitSet; +import java.util.concurrent.Flow.Publisher; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.ignite.internal.schema.BinaryTupleSchema; +import org.apache.ignite.internal.schema.row.InternalTuple; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource; +import org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple; +import org.apache.ignite.internal.util.subscription.TransformingPublisher; +import org.jetbrains.annotations.Nullable; + +/** + * Execution node for scan over arbitrary {@link ScannableDataSource data source}. + */ +public class DataSourceScanNode<RowT> extends StorageScanNode<RowT> { + + private final ScannableDataSource dataSource; + + private final Function<InternalTuple, RowT> converter; + + /** + * Constructor. + * + * @param ctx Execution context. + * @param rowFactory Row factory. + * @param schema Schema of the tuples returned by data source. + * @param dataSource A data source to scan. + * @param filters Optional filter to filter out rows. + * @param rowTransformer Optional projection function. + * @param requiredColumns Optional set of column of interest. + */ + public DataSourceScanNode( + ExecutionContext<RowT> ctx, + RowHandler.RowFactory<RowT> rowFactory, + BinaryTupleSchema schema, + ScannableDataSource dataSource, + @Nullable Predicate<RowT> filters, + @Nullable Function<RowT, RowT> rowTransformer, + @Nullable BitSet requiredColumns + ) { + super(ctx, filters, rowTransformer); + + this.dataSource = dataSource; + + if (requiredColumns == null || requiredColumns.cardinality() == schema.elementCount()) { + converter = rowFactory::create; + } else { + int[] mapping = requiredColumns.stream().toArray(); + + converter = tuple -> rowFactory.create(new FieldDeserializingProjectedTuple(schema, tuple, mapping)); + } + } + + /** {@inheritDoc} */ + @Override + protected Publisher<RowT> scan() { + return new TransformingPublisher<>(dataSource.scan(), converter); + } +} + diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java index c6491b6a5a..ed2918bfdb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java @@ -49,8 +49,8 @@ public class Cloner { IgniteRel newRoot = visit(src.root()); - return new Fragment(src.fragmentId(), newRoot, src.correlated(), - remotes, src.serialized(), src.tableIds()); + return new Fragment(src.fragmentId(), src.correlated(), newRoot, src.serialized(), + remotes, src.tables(), src.systemViews()); } finally { remotes = null; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java index 2fa37ec274..6fe68c3d62 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java @@ -19,14 +19,14 @@ package org.apache.ignite.internal.sql.engine.prepare; import static org.apache.ignite.internal.sql.engine.externalize.RelJsonWriter.toJson; -import it.unimi.dsi.fastutil.ints.IntSet; -import it.unimi.dsi.fastutil.ints.IntSets; import java.util.List; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptUtil; import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.rel.IgniteSender; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; import org.apache.ignite.internal.tostring.IgniteToStringExclude; import org.apache.ignite.internal.tostring.S; @@ -45,7 +45,8 @@ public class Fragment { private final String rootSer; private final List<IgniteReceiver> remotes; - private final IntSet tableIds; + private final List<IgniteTable> tables; + private final List<IgniteSystemView> systemViews; private final boolean correlated; @@ -56,21 +57,32 @@ public class Fragment { * @param correlated Whether some correlated variables should be set prior to fragment execution. * @param root Root node of the fragment. * @param remotes Remote sources of the fragment. + * @param tables A list of tables containing by this fragment. + * @param systemViews A list of system views containing by this fragment. */ - public Fragment(long id, boolean correlated, IgniteRel root, List<IgniteReceiver> remotes, IntSet tableIds) { - this(id, root, correlated, remotes, null, tableIds); + public Fragment(long id, boolean correlated, IgniteRel root, List<IgniteReceiver> remotes, + List<IgniteTable> tables, List<IgniteSystemView> systemViews) { + this(id, correlated, root, null, remotes, tables, systemViews); } /** * Constructor. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 + * + * @param id An identifier of this fragment. + * @param correlated Whether some correlated variables should be set prior to fragment execution. + * @param root Root node of the fragment. + * @param rootSer Serialised representation of a root. Optional. + * @param remotes Remote sources of the fragment. + * @param tables A list of tables containing by this fragment. + * @param systemViews A list of system views containing by this fragment. */ - Fragment(long id, IgniteRel root, boolean correlated, List<IgniteReceiver> remotes, - @Nullable String rootSer, IntSet tableIds) { + Fragment(long id, boolean correlated, IgniteRel root, @Nullable String rootSer, List<IgniteReceiver> remotes, + List<IgniteTable> tables, List<IgniteSystemView> systemViews) { this.id = id; this.root = root; this.remotes = List.copyOf(remotes); - this.tableIds = IntSets.unmodifiable(tableIds); + this.tables = List.copyOf(tables); + this.systemViews = List.copyOf(systemViews); this.rootSer = rootSer != null ? rootSer : toJson(root); this.correlated = correlated; } @@ -115,8 +127,12 @@ public class Fragment { return remotes; } - public IntSet tableIds() { - return tableIds; + public List<IgniteTable> tables() { + return tables; + } + + public List<IgniteSystemView> systemViews() { + return systemViews; } public boolean rootFragment() { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java index e0da65870d..7ed65752a1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.sql.engine.prepare; -import it.unimi.dsi.fastutil.ints.IntArraySet; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.Deque; @@ -30,10 +30,12 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan; import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.rel.IgniteSender; +import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan; import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify; import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan; import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange; import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.util.Commons; @@ -130,7 +132,13 @@ public class QuerySplitter extends IgniteRelShuttle { /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteIndexScan rel) { - curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id()); + IgniteTable table = rel.getTable().unwrap(IgniteTable.class); + + assert table != null; + + if (curr.seenRelations.add(table.id())) { + curr.tables.add(table); + } return rel.clone(IdGenerator.nextId()); } @@ -138,7 +146,13 @@ public class QuerySplitter extends IgniteRelShuttle { /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteTableScan rel) { - curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id()); + IgniteTable table = rel.getTable().unwrap(IgniteTable.class); + + assert table != null; + + if (curr.seenRelations.add(table.id())) { + curr.tables.add(table); + } return rel.clone(IdGenerator.nextId()); } @@ -146,19 +160,42 @@ public class QuerySplitter extends IgniteRelShuttle { /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteTableModify rel) { - curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id()); + IgniteTable table = rel.getTable().unwrap(IgniteTable.class); + + assert table != null; + + if (curr.seenRelations.add(table.id())) { + curr.tables.add(table); + } return super.visit(rel); } + /** {@inheritDoc} */ + @Override + public IgniteRel visit(IgniteSystemViewScan rel) { + IgniteSystemView view = rel.getTable().unwrap(IgniteSystemView.class); + + assert view != null; + + if (curr.seenRelations.add(view.id())) { + curr.systemViews.add(view); + } + + return rel.clone(IdGenerator.nextId()); + } + private static class FragmentProto { private final long id; private final boolean correlated; private IgniteRel root; + private final IntSet seenRelations = new IntOpenHashSet(); + private final List<IgniteReceiver> remotes = new ArrayList<>(); - private final IntSet tableIds = new IntArraySet(); + private final List<IgniteTable> tables = new ArrayList<>(); + private final List<IgniteSystemView> systemViews = new ArrayList<>(); private FragmentProto(long id, boolean correlated, IgniteRel root) { this.id = id; @@ -167,7 +204,7 @@ public class QuerySplitter extends IgniteRelShuttle { } Fragment build() { - return new Fragment(id, correlated, root, remotes, tableIds); + return new Fragment(id, correlated, root, remotes, tables, systemViews); } } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java index d83ece2045..f6ce53c7fb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java @@ -216,7 +216,7 @@ public class ExecutionDependencyResolverSelfTest extends AbstractPlannerTest { } CompletableFuture<ResolvedDependencies> resolveDependencies(String sql) { - ExecutionDependencyResolver resolver = new ExecutionDependencyResolverImpl(registry); + ExecutionDependencyResolver resolver = new ExecutionDependencyResolverImpl(registry, null); IgniteRel rel; try { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index d392e6f101..78bd82e109 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -95,6 +95,8 @@ import org.apache.ignite.internal.sql.engine.schema.CatalogColumnDescriptor; import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor; import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; import org.apache.ignite.internal.sql.engine.sql.ParsedResult; @@ -606,7 +608,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { NoOpExecutableTableRegistry executableTableRegistry = new NoOpExecutableTableRegistry(); - ExecutionDependencyResolver dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry); + ExecutionDependencyResolver dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry, null); CalciteSchema rootSch = CalciteSchema.createRootSchema(false); rootSch.add(schema.getName(), schema); @@ -616,13 +618,18 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { var targetProvider = new ExecutionTargetProvider() { @Override - public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, int tableId) { + public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, IgniteTable table) { if (mappingException != null) { return CompletableFuture.failedFuture(mappingException); } return CompletableFuture.completedFuture(factory.allOf(nodeNames)); } + + @Override + public CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) { + return CompletableFuture.failedFuture(new AssertionError("Not supported")); + } }; var mappingService = new MappingServiceImpl(nodeName, targetProvider); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java index a78189bd4d..28ffb1549b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.sql.engine.exec.rel; +import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; + import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -36,6 +38,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; +import org.apache.ignite.internal.schema.BinaryRowConverter; +import org.apache.ignite.internal.schema.BinaryTuple; +import org.apache.ignite.internal.schema.BinaryTupleSchema; import org.apache.ignite.internal.schema.row.InternalTuple; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; @@ -61,11 +67,9 @@ import org.junit.jupiter.api.BeforeEach; * AbstractExecutionTest. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ -public abstract class AbstractExecutionTest extends IgniteAbstractTest { +public abstract class AbstractExecutionTest<T> extends IgniteAbstractTest { public static final Object[][] EMPTY = new Object[0][]; - private Throwable lastE; - private QueryTaskExecutorImpl taskExecutor; @BeforeEach @@ -81,17 +85,15 @@ public abstract class AbstractExecutionTest extends IgniteAbstractTest { @AfterEach public void afterTest() { taskExecutor.stop(); - - if (lastE != null) { - throw new AssertionError(lastE); - } } - protected ExecutionContext<Object[]> executionContext() { + protected abstract RowHandler<T> rowHandler(); + + protected ExecutionContext<T> executionContext() { return executionContext(false); } - protected ExecutionContext<Object[]> executionContext(boolean withDelays) { + protected ExecutionContext<T> executionContext(boolean withDelays) { if (withDelays) { StripedThreadPoolExecutor testExecutor = new IgniteTestStripedThreadPoolExecutor(8, NamedThreadFactory.threadPrefix("fake-test-node", "sqlTestExec"), @@ -120,17 +122,12 @@ public abstract class AbstractExecutionTest extends IgniteAbstractTest { new ClusterNodeImpl("1", "fake-test-node", NetworkAddress.from("127.0.0.1:1111")), "fake-test-node", fragmentDesc, - ArrayRowHandler.INSTANCE, + rowHandler(), Map.of(), TxAttributes.fromTx(new NoOpTransaction("fake-test-node")) ); } - private void handle(Thread t, Throwable ex) { - log.error(ex.getMessage(), ex); - lastE = ex; - } - protected Object[] row(Object... fields) { return fields; } @@ -341,7 +338,7 @@ public abstract class AbstractExecutionTest extends IgniteAbstractTest { } } - protected RowHandler.RowFactory<Object[]> rowFactory() { + static RowHandler.RowFactory<Object[]> rowFactory() { return new RowHandler.RowFactory<>() { @Override public RowHandler<Object[]> handler() { @@ -369,4 +366,38 @@ public abstract class AbstractExecutionTest extends IgniteAbstractTest { } }; } + + static TupleFactory tupleFactoryFromSchema(BinaryTupleSchema schema) { + return new BinaryTupleFactory(schema); + } + + @FunctionalInterface + interface TupleFactory { + InternalTuple create(Object... values); + } + + private static class BinaryTupleFactory implements TupleFactory { + private final BinaryTupleSchema schema; + + BinaryTupleFactory(BinaryTupleSchema schema) { + this.schema = schema; + } + + @Override + public InternalTuple create(Object... values) { + if (schema.elementCount() != values.length) { + throw new IllegalArgumentException( + format("Expecting {} elements, but was {}", schema.elementCount(), values.length) + ); + } + + BinaryTupleBuilder builder = new BinaryTupleBuilder(schema.elementCount()); + + for (int i = 0; i < schema.elementCount(); i++) { + BinaryRowConverter.appendValue(builder, schema.element(i), values[i]); + } + + return new BinaryTuple(schema.elementCount(), builder.build()); + } + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java index 8b69f87b4e..08f081ab49 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java @@ -36,13 +36,15 @@ import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.junit.jupiter.api.Test; /** * Abstract test for set operator (MINUS, INTERSECT) execution. */ -public abstract class AbstractSetOpExecutionTest extends AbstractExecutionTest { +public abstract class AbstractSetOpExecutionTest extends AbstractExecutionTest<Object[]> { private static final int COLUMN_NUN = 2; @@ -162,4 +164,9 @@ public abstract class AbstractSetOpExecutionTest extends AbstractExecutionTest { protected abstract AbstractSetOpNode<Object[]> setOpNodeFactory(ExecutionContext<Object[]> ctx, AggregateType type, int columnCount, boolean all, int inputsCnt); + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java index 00fa3ee1fd..27f06ffe28 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper; import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators; import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; @@ -57,7 +58,7 @@ import org.junit.jupiter.params.provider.EnumSource; * A test class that defines basic test scenarios to verify the execution of each type of aggregate. */ @SuppressWarnings("resource") -public abstract class BaseAggregateTest extends AbstractExecutionTest { +public abstract class BaseAggregateTest extends AbstractExecutionTest<Object[]> { @ParameterizedTest @EnumSource public void count(TestAggregateType testAgg) { @@ -720,4 +721,9 @@ public abstract class BaseAggregateTest extends AbstractExecutionTest { MAP_REDUCE } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java new file mode 100644 index 0000000000..05645aeb75 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.rel; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.ignite.internal.schema.BinaryTupleSchema; +import org.apache.ignite.internal.schema.BinaryTupleSchema.Element; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.row.InternalTuple; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; +import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource; +import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler; +import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper; +import org.apache.ignite.internal.sql.engine.exec.row.BaseTypeSpec; +import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; +import org.apache.ignite.internal.sql.engine.exec.row.TypeSpec; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.Test; + +/** Tests to verify {@link DataSourceScanNode}. */ +@SuppressWarnings("resource") +public class DataSourceScanNodeSelfTest extends AbstractExecutionTest<RowWrapper> { + private static final RowSchema ROW_SCHEMA = RowSchema.builder() + .addField(NativeTypes.INT32) + .addField(NativeTypes.INT64) + .addField(NativeTypes.INT8) + .addField(NativeTypes.stringOf(64)) + .addField(NativeTypes.UUID) + .build(); + + private static final BinaryTupleSchema TUPLE_SCHEMA = fromRowSchema(ROW_SCHEMA); + + private static final TupleFactory TUPLE_FACTORY = tupleFactoryFromSchema(TUPLE_SCHEMA); + + private static final Object[][] ROWS = { + {111, 11L, (byte) 1, "1111", new UUID(0L, 1L)}, + {222, 22L, (byte) 2, "2222", new UUID(0L, 2L)}, + {333, 33L, (byte) 3, "3333", new UUID(0L, 3L)}, + {444, 44L, (byte) 4, "4444", new UUID(0L, 4L)}, + {555, 55L, (byte) 5, "5555", new UUID(0L, 5L)}, + }; + + @Test + void simpleTest() { + ExecutionContext<RowWrapper> context = executionContext(); + List<RowWrapper> rows = initScanAndGetResults(context, null, null, null); + + assertThat(rows, notNullValue()); + + List<String> expectedRows = List.of( + "Row[111, 11, 1, 1111, 00000000-0000-0000-0000-000000000001]", + "Row[222, 22, 2, 2222, 00000000-0000-0000-0000-000000000002]", + "Row[333, 33, 3, 3333, 00000000-0000-0000-0000-000000000003]", + "Row[444, 44, 4, 4444, 00000000-0000-0000-0000-000000000004]", + "Row[555, 55, 5, 5555, 00000000-0000-0000-0000-000000000005]" + ); + + RowHandler<RowWrapper> handler = context.rowHandler(); + + List<String> actualRows = rows.stream().map(handler::toString).collect(Collectors.toList()); + + assertThat(actualRows, equalTo(expectedRows)); + } + + @Test + void scanWithRequiredFields() { + ExecutionContext<RowWrapper> context = executionContext(); + RowHandler<RowWrapper> handler = context.rowHandler(); + List<RowWrapper> rows = initScanAndGetResults(context, null, null, ImmutableBitSet.of(1, 3, 4).toBitSet()); + + assertThat(rows, notNullValue()); + + List<String> expectedRows = List.of( + "Row[11, 1111, 00000000-0000-0000-0000-000000000001]", + "Row[22, 2222, 00000000-0000-0000-0000-000000000002]", + "Row[33, 3333, 00000000-0000-0000-0000-000000000003]", + "Row[44, 4444, 00000000-0000-0000-0000-000000000004]", + "Row[55, 5555, 00000000-0000-0000-0000-000000000005]" + ); + + List<String> actualRows = rows.stream().map(handler::toString).collect(Collectors.toList()); + + assertThat(actualRows, equalTo(expectedRows)); + } + + @Test + @SuppressWarnings("DataFlowIssue") + void scanWithProjection() { + ExecutionContext<RowWrapper> context = executionContext(); + RowHandler<RowWrapper> handler = context.rowHandler(); + RowFactory<RowWrapper> factory = handler.factory(ROW_SCHEMA); + + Function<RowWrapper, RowWrapper> doubleFirstColumnProjection = row -> { + int size = handler.columnCount(row); + + Object[] values = new Object[size]; + + values[0] = (Integer) handler.get(0, row) * 2; + + for (int i = 1; i < size; i++) { + values[i] = handler.get(i, row); + } + + return factory.create(values); + }; + + List<RowWrapper> rows = initScanAndGetResults(context, null, doubleFirstColumnProjection, null); + + assertThat(rows, notNullValue()); + + List<String> expectedRows = List.of( + "Row[222, 11, 1, 1111, 00000000-0000-0000-0000-000000000001]", + "Row[444, 22, 2, 2222, 00000000-0000-0000-0000-000000000002]", + "Row[666, 33, 3, 3333, 00000000-0000-0000-0000-000000000003]", + "Row[888, 44, 4, 4444, 00000000-0000-0000-0000-000000000004]", + "Row[1110, 55, 5, 5555, 00000000-0000-0000-0000-000000000005]" + ); + + List<String> actualRows = rows.stream().map(handler::toString).collect(Collectors.toList()); + + assertThat(actualRows, equalTo(expectedRows)); + } + + @Test + @SuppressWarnings("DataFlowIssue") + void scanWithFilter() { + ExecutionContext<RowWrapper> context = executionContext(); + RowHandler<RowWrapper> handler = context.rowHandler(); + + Predicate<RowWrapper> onlyEven = row -> ((Integer) handler.get(0, row)) % 2 == 0; + + List<RowWrapper> rows = initScanAndGetResults(context, onlyEven, null, null); + + assertThat(rows, notNullValue()); + + List<String> expectedRows = List.of( + "Row[222, 22, 2, 2222, 00000000-0000-0000-0000-000000000002]", + "Row[444, 44, 4, 4444, 00000000-0000-0000-0000-000000000004]" + ); + + List<String> actualRows = rows.stream().map(handler::toString).collect(Collectors.toList()); + + assertThat(actualRows, equalTo(expectedRows)); + } + + @Test + @SuppressWarnings("DataFlowIssue") + void scanWithAllOptions() { + ExecutionContext<RowWrapper> context = executionContext(); + RowHandler<RowWrapper> handler = context.rowHandler(); + + BitSet requiredFields = ImmutableBitSet.of(1, 3, 4).toBitSet(); + + RowFactory<RowWrapper> factory = handler.factory(project(ROW_SCHEMA, requiredFields.stream().toArray())); + + // predicate matching goes before projection transformation, thus this predicate is valid + Predicate<RowWrapper> onlyEven = row -> ((Long) handler.get(0, row)) % 2 == 0; + + Function<RowWrapper, RowWrapper> doubleFirstColumnProjection = row -> { + int size = handler.columnCount(row); + + Object[] values = new Object[size]; + + values[0] = (Long) handler.get(0, row) * 2; + + for (int i = 1; i < size; i++) { + values[i] = handler.get(i, row); + } + + return factory.create(values); + }; + + List<RowWrapper> rows = initScanAndGetResults(context, onlyEven, doubleFirstColumnProjection, requiredFields); + + assertThat(rows, notNullValue()); + + List<String> expectedRows = List.of( + "Row[44, 2222, 00000000-0000-0000-0000-000000000002]", + "Row[88, 4444, 00000000-0000-0000-0000-000000000004]" + ); + + List<String> actualRows = rows.stream().map(handler::toString).collect(Collectors.toList()); + + assertThat(actualRows, equalTo(expectedRows)); + } + + @SuppressWarnings("DataFlowIssue") + private static List<RowWrapper> initScanAndGetResults( + ExecutionContext<RowWrapper> context, + @Nullable Predicate<RowWrapper> predicate, + @Nullable Function<RowWrapper, RowWrapper> projection, + @Nullable BitSet requiredFields + ) { + RowHandler<RowWrapper> handler = context.rowHandler(); + RowFactory<RowWrapper> factory; + if (requiredFields != null) { + factory = handler.factory(project(ROW_SCHEMA, requiredFields.stream().toArray())); + } else { + factory = handler.factory(ROW_SCHEMA); + } + + ScannableDataSource dataSource = new IterableDataSource( + Stream.of(ROWS).map(TUPLE_FACTORY::create).collect(Collectors.toList()) + ); + DataSourceScanNode<RowWrapper> node = new DataSourceScanNode<>( + context, factory, fromRowSchema(ROW_SCHEMA), dataSource, predicate, projection, requiredFields + ); + + DrainAllDownstream<RowWrapper> downstream = new DrainAllDownstream<>(); + + node.onRegister(downstream); + + context.execute(() -> node.request(Integer.MAX_VALUE), node::onError); + + return await(downstream.completion); + } + + static class IterableDataSource implements ScannableDataSource { + private final Iterable<InternalTuple> iterable; + + IterableDataSource(Iterable<InternalTuple> iterable) { + this.iterable = iterable; + } + + @Override + public Publisher<InternalTuple> scan() { + Iterator<InternalTuple> it = iterable.iterator(); + + return new Publisher<InternalTuple>() { + @Override + public void subscribe(Subscriber<? super InternalTuple> subscriber) { + Subscription subscription = new Subscription() { + @Override + public void request(long n) { + if (n <= 0) { + subscriber.onError(new IllegalArgumentException()); + + return; + } + + while (n > 0 && it.hasNext()) { + subscriber.onNext(it.next()); + + n--; + } + + if (n > 0) { + subscriber.onComplete(); + } + } + + @Override + public void cancel() { + // NO-OP + } + }; + + subscriber.onSubscribe(subscription); + } + }; + } + } + + static class DrainAllDownstream<T> implements Downstream<T> { + private final List<T> rows = new ArrayList<>(); + private final CompletableFuture<List<T>> completion = new CompletableFuture<>(); + + @Override + public void push(T row) { + rows.add(row); + } + + @Override + public void end() throws Exception { + completion.complete(rows); + } + + @Override + public void onError(Throwable e) { + completion.completeExceptionally(e); + } + } + + @Override + protected RowHandler<RowWrapper> rowHandler() { + return SqlRowHandler.INSTANCE; + } + + private static BinaryTupleSchema fromRowSchema(RowSchema schema) { + Element[] elements = new Element[schema.fields().size()]; + + int idx = 0; + for (TypeSpec spec : schema.fields()) { + assert spec instanceof BaseTypeSpec : spec; + + elements[idx++] = new Element(((BaseTypeSpec) spec).nativeType(), spec.isNullable()); + } + + return BinaryTupleSchema.create(elements); + } + + private static RowSchema project(RowSchema schema, int[] projection) { + RowSchema.Builder builder = RowSchema.builder(); + for (int i : projection) { + builder.addField(schema.fields().get(i)); + } + + return builder.build(); + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java index 2196753738..5a1569f007 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java @@ -47,7 +47,9 @@ import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry; import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl; import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor; import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.framework.ClusterServiceFactory; import org.apache.ignite.internal.sql.engine.framework.DataProvider; import org.apache.ignite.internal.sql.engine.framework.TestBuilders; @@ -77,7 +79,7 @@ import org.junit.jupiter.params.provider.MethodSource; /** * Tests to verify Outbox to Inbox interoperation. */ -public class ExchangeExecutionTest extends AbstractExecutionTest { +public class ExchangeExecutionTest extends AbstractExecutionTest<Object[]> { private static final String ROOT_NODE_NAME = "N1"; private static final String ANOTHER_NODE_NAME = "N2"; private static final List<String> NODE_NAMES = List.of(ROOT_NODE_NAME, ANOTHER_NODE_NAME); @@ -728,4 +730,9 @@ public class ExchangeExecutionTest extends AbstractExecutionTest { }; } } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java index 8e87011342..3127f68d82 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; @@ -63,7 +64,7 @@ import org.junit.jupiter.params.provider.MethodSource; * ExecutionTest. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ -public class ExecutionTest extends AbstractExecutionTest { +public class ExecutionTest extends AbstractExecutionTest<Object[]> { @Test public void testSimpleExecution() { // SELECT P.ID, P.NAME, PR.NAME AS PROJECT @@ -713,4 +714,9 @@ public class ExecutionTest extends AbstractExecutionTest { return args.stream(); } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java index 04fbd95a34..587f0a7802 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java @@ -37,8 +37,10 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper; import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates; import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.MapReduceAgg; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; @@ -51,7 +53,7 @@ import org.junit.jupiter.api.Test; * HashAggregateSingleGroupExecutionTest. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ -public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest { +public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest<Object[]> { @Test public void mapReduceSum() { @@ -433,4 +435,9 @@ public class HashAggregateSingleGroupExecutionTest extends AbstractExecutionTest return newHashAggNode(ctx, REDUCE, grpSets, hashRowType, reduceAggCall.getReduceCall()); } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java index 4f5ecb5834..129a04bcb8 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java @@ -29,6 +29,8 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; @@ -38,7 +40,7 @@ import org.junit.jupiter.api.Test; /** * Hash index based spool implementation test. */ -public class HashIndexSpoolExecutionTest extends AbstractExecutionTest { +public class HashIndexSpoolExecutionTest extends AbstractExecutionTest<Object[]> { @Test public void testIndexSpool() { ExecutionContext<Object[]> ctx = executionContext(true); @@ -203,4 +205,9 @@ public class HashIndexSpoolExecutionTest extends AbstractExecutionTest { this.expectedResultSize = expectedResultSize; } } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java index 26ca27b2fc..da4490849a 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java @@ -40,10 +40,12 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.exec.ScannableTable; import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition; import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation; @@ -58,7 +60,7 @@ import org.junit.jupiter.api.Test; /** * Test {@link IndexScanNode} execution. */ -public class IndexScanNodeExecutionTest extends AbstractExecutionTest { +public class IndexScanNodeExecutionTest extends AbstractExecutionTest<Object[]> { /** * Sorted index scan execution. @@ -307,4 +309,9 @@ public class IndexScanNodeExecutionTest extends AbstractExecutionTest { } } } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java index 08de8e9400..19f05c4b2a 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java @@ -28,12 +28,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.junit.jupiter.api.Test; /** * Test LimitNode execution. */ -public class LimitExecutionTest extends AbstractExecutionTest { +public class LimitExecutionTest extends AbstractExecutionTest<Object[]> { /** Tests correct results fetched with Limit node. */ @Test public void testLimit() { @@ -214,4 +216,9 @@ public class LimitExecutionTest extends AbstractExecutionTest { }, this::onError); } } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java index 9dd048749d..10b1bed4e7 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java @@ -42,6 +42,7 @@ import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl; import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; @@ -55,7 +56,7 @@ import org.junit.jupiter.params.provider.ValueSource; * MergeJoinExecutionTest; * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ -public class MergeJoinExecutionTest extends AbstractExecutionTest { +public class MergeJoinExecutionTest extends AbstractExecutionTest<Object[]> { @ParameterizedTest(name = "treat nulls as equals: {0}") @ValueSource(booleans = {true, false}) public void joinEmptyTables(boolean equalNulls) { @@ -538,4 +539,9 @@ public class MergeJoinExecutionTest extends AbstractExecutionTest { private static <T> Set<T> setOf(T... items) { return new HashSet<>(Arrays.asList(items)); } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java index d81a03331a..253509cd50 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java @@ -36,6 +36,7 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.junit.jupiter.api.Test; @@ -44,7 +45,7 @@ import org.junit.jupiter.api.Test; * NestedLoopJoinExecutionTest. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ -public class NestedLoopJoinExecutionTest extends AbstractExecutionTest { +public class NestedLoopJoinExecutionTest extends AbstractExecutionTest<Object[]> { @Test public void joinEmptyTables() { verifyJoin(EMPTY, EMPTY, INNER, EMPTY); @@ -364,4 +365,9 @@ public class NestedLoopJoinExecutionTest extends AbstractExecutionTest { private static <T> Set<T> setOf(T... items) { return new HashSet<>(Arrays.asList(items)); } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java index c51b1a21c3..b3afc53ad1 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java @@ -32,6 +32,8 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; @@ -41,7 +43,7 @@ import org.junit.jupiter.api.Test; * TreeIndexSpoolExecutionTest. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ -public class SortedIndexSpoolExecutionTest extends AbstractExecutionTest { +public class SortedIndexSpoolExecutionTest extends AbstractExecutionTest<Object[]> { @Test public void testIndexSpool() { ExecutionContext<Object[]> ctx = executionContext(); @@ -241,4 +243,8 @@ public class SortedIndexSpoolExecutionTest extends AbstractExecutionTest { } } + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index 391a0aa2bb..d4a25d3eea 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -45,10 +45,12 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl; import org.apache.ignite.internal.sql.engine.exec.TableRowConverter; import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor; import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions; @@ -71,7 +73,7 @@ import org.junit.jupiter.api.Test; /** * Tests execution flow of TableScanNode. */ -public class TableScanNodeExecutionTest extends AbstractExecutionTest { +public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]> { private final LinkedList<AutoCloseable> closeables = new LinkedList<>(); @@ -230,4 +232,9 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest { }; } } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java index 40bff438ae..da9f5029e5 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java @@ -29,6 +29,8 @@ import java.util.stream.IntStream; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; @@ -38,7 +40,7 @@ import org.junit.jupiter.api.Test; * TableSpoolExecutionTest. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 */ -public class TableSpoolExecutionTest extends AbstractExecutionTest { +public class TableSpoolExecutionTest extends AbstractExecutionTest<Object[]> { @Test public void testLazyTableSpool() { checkTableSpool( @@ -147,4 +149,9 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest { } } } + + @Override + protected RowHandler<Object[]> rowHandler() { + return ArrayRowHandler.INSTANCE; + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java index ad16fc39fe..3cb61a0dba 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java @@ -21,8 +21,6 @@ import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_N import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -52,6 +50,7 @@ import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex; import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; +import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView; import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.schema.TableDescriptor; import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl; @@ -363,20 +362,11 @@ public class TestBuilders { .map(ClusterTableBuilderImpl::build) .collect(Collectors.toMap(TestTable::name, Function.identity())); - Int2ObjectMap<IgniteTable> tablesById = new Int2ObjectOpenHashMap<>(tableByName.size()); - tableByName.forEach((name, table) -> tablesById.put(table.id(), table)); - IgniteSchema schema = new IgniteSchema(DEFAULT_SCHEMA_NAME, SCHEMA_VERSION, tableByName.values()); var schemaManager = new PredefinedSchemaManager(schema); var targetProvider = new ExecutionTargetProvider() { @Override - public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, int tableId) { - IgniteTable table = tablesById.get(tableId); - - if (table == null) { - throw new AssertionError("Table not found: " + tableId); - } - + public CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory factory, IgniteTable table) { Map<String, DataProvider<?>> dataProviders = dataProvidersByTableName.get(table.name()); if (nullOrEmpty(dataProviders)) { @@ -385,6 +375,11 @@ public class TestBuilders { return CompletableFuture.completedFuture(factory.allOf(List.copyOf(dataProviders.keySet()))); } + + @Override + public CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) { + return CompletableFuture.failedFuture(new AssertionError("Not supported")); + } }; List<LogicalNode> logicalNodes = nodeNames.stream() diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java index 4e7902e2f3..2621a0e35d 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -117,7 +117,7 @@ public class TestNode implements LifecycleAware { mailboxRegistry, messageService )); NoOpExecutableTableRegistry executableTableRegistry = new NoOpExecutableTableRegistry(); - ExecutionDependencyResolver dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry); + ExecutionDependencyResolver dependencyResolver = new ExecutionDependencyResolverImpl(executableTableRegistry, null); executionService = registerService(new ExecutionServiceImpl<>( messageService, diff --git a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java index 1c2ef6814d..5ebdfa4121 100644 --- a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java +++ b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java @@ -17,7 +17,11 @@ package org.apache.ignite.internal.systemview; +import java.util.List; +import java.util.concurrent.Flow.Publisher; +import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.schema.row.InternalTuple; /** * The system view manager is responsible for registering system views in the cluster. @@ -32,4 +36,21 @@ public interface SystemViewManager extends IgniteComponent { * @param view System view to register. */ void register(SystemView<?> view); + + /** + * Returns a list of nodes a view with given name can be found on. + * + * @param name Name of view of interest. + * @return A list of nodes owning a view, or empty list if there is no node in the cluster owning this view. + */ + List<String> owningNodes(String name); + + /** + * Opens a cursor over view with a given name and returns publisher emitting rows of the view. + * + * @param name Name of view of interest. + * @return A publisher. + * @throws IgniteInternalException if view with given name is not presented on local node. + */ + Publisher<InternalTuple> scanView(String name) throws IgniteInternalException; } diff --git a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java index 9f7ed77b38..c3d6e55aa9 100644 --- a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java +++ b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java @@ -20,26 +20,35 @@ package org.apache.ignite.internal.systemview; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow.Publisher; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.ignite.internal.catalog.CatalogCommand; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.cluster.management.NodeAttributesProvider; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; +import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.schema.row.InternalTuple; import org.apache.ignite.internal.systemview.utils.SystemViewUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.lang.ErrorGroups.Common; /** * SQL system views manager implementation. */ -public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesProvider { +public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesProvider, LogicalTopologyEventListener { /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(SystemViewManagerImpl.class); @@ -47,6 +56,8 @@ public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesP public static final String NODE_ATTRIBUTES_LIST_SEPARATOR = ","; + private final String localNodeName; + private final CatalogManager catalogManager; private final Map<String, String> nodeAttributes = new HashMap<>(); @@ -66,8 +77,11 @@ public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesP /** Future which is completed when system views are registered in the catalog. */ private final CompletableFuture<Void> viewsRegistrationFuture = new CompletableFuture<>(); + private volatile Map<String, List<String>> owningNodesByViewName = Map.of(); + /** Creates a system view manager. */ - public SystemViewManagerImpl(CatalogManager catalogManager) { + public SystemViewManagerImpl(String localNodeName, CatalogManager catalogManager) { + this.localNodeName = localNodeName; this.catalogManager = catalogManager; } @@ -113,6 +127,22 @@ public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesP busyLock.block(); } + @Override + public List<String> owningNodes(String name) { + return inBusyLock(busyLock, () -> owningNodesByViewName.getOrDefault(name, List.of())); + } + + @Override + public Publisher<InternalTuple> scanView(String name) { + if (views.get(name) == null) { + throw new IgniteInternalException( + Common.INTERNAL_ERR, + format("View with name '{}' not found on node '{}'", name, localNodeName) + ); + } + + throw new UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-20578"); + } /** {@inheritDoc} */ @Override @@ -136,10 +166,53 @@ public class SystemViewManagerImpl implements SystemViewManager, NodeAttributesP return nodeAttributes; } + @Override + public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) { + processNewTopology(newTopology); + } + + @Override + public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) { + processNewTopology(newTopology); + } + + @Override + public void onTopologyLeap(LogicalTopologySnapshot newTopology) { + processNewTopology(newTopology); + } + /** * Returns future which is completed when system views are registered in the catalog. */ public CompletableFuture<Void> completeRegistration() { return viewsRegistrationFuture; } + + private void processNewTopology(LogicalTopologySnapshot topology) { + Map<String, List<String>> owningNodesByViewName = new HashMap<>(); + + for (LogicalNode logicalNode : topology.nodes()) { + String systemViewsNames = logicalNode.systemAttributes().get(NODE_ATTRIBUTES_KEY); + + if (systemViewsNames == null) { + continue; + } + + Arrays.stream(systemViewsNames.split(NODE_ATTRIBUTES_LIST_SEPARATOR)) + .map(String::trim) + .forEach(viewName -> + owningNodesByViewName.computeIfAbsent(viewName, key -> new ArrayList<>()).add(logicalNode.name()) + ); + } + + for (String viewName : owningNodesByViewName.keySet()) { + owningNodesByViewName.compute(viewName, (key, value) -> { + assert value != null; + + return List.copyOf(value); + }); + } + + this.owningNodesByViewName = Map.copyOf(owningNodesByViewName); + } } diff --git a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java index b426faa14b..3c0c8f381f 100644 --- a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java +++ b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java @@ -24,8 +24,11 @@ import static org.apache.ignite.internal.systemview.SystemViewManagerImpl.NODE_A import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.util.CollectionUtils.first; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -36,11 +39,16 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogValidationException; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.schema.NativeType; import org.apache.ignite.internal.schema.NativeTypeSpec; @@ -49,6 +57,9 @@ import org.apache.ignite.internal.schema.SchemaTestUtils; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.util.AsyncCursor; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterNodeImpl; +import org.apache.ignite.network.NetworkAddress; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -192,6 +203,34 @@ public class SystemViewManagerTest extends BaseIgniteAbstractTest { viewMgr.stop(); } + @Test + @SuppressWarnings("DataFlowIssue") + void managerDerivesViewPlacementFromLogicalTopologyEvents() { + String viewName = "MY_VIEW"; + + assertThat(viewMgr.owningNodes(viewName), empty()); + + List<String> allNodes = List.of("A", "B", "C"); + + LogicalTopologySnapshot topologySnapshot = topologySnapshot(viewName, allNodes, 0, 2); + viewMgr.onNodeJoined(first(topologySnapshot.nodes()), topologySnapshot); + + assertThat(viewMgr.owningNodes(viewName), hasItem("A")); + assertThat(viewMgr.owningNodes(viewName), hasItem("C")); + + topologySnapshot = topologySnapshot(viewName, allNodes, 0, 1); + viewMgr.onNodeLeft(first(topologySnapshot.nodes()), topologySnapshot); + + assertThat(viewMgr.owningNodes(viewName), hasItem("A")); + assertThat(viewMgr.owningNodes(viewName), hasItem("B")); + + topologySnapshot = topologySnapshot(viewName, allNodes, 1, 2); + viewMgr.onTopologyLeap(topologySnapshot); + + assertThat(viewMgr.owningNodes(viewName), hasItem("B")); + assertThat(viewMgr.owningNodes(viewName), hasItem("C")); + } + private static SystemView<?> dummyView(String name) { return dummyView(name, NativeTypes.INT32); } @@ -214,4 +253,31 @@ public class SystemViewManagerTest extends BaseIgniteAbstractTest { }) .build(); } + + private static LogicalTopologySnapshot topologySnapshot(String viewName, List<String> allNodes, int... owningNodes) { + BitSet owningNodesSet = new BitSet(); + + for (int idx : owningNodes) { + owningNodesSet.set(idx); + } + + List<LogicalNode> topology = new ArrayList<>(allNodes.size()); + + for (int i = 0; i < allNodes.size(); i++) { + String name = allNodes.get(i); + + ClusterNode clusterNode = new ClusterNodeImpl(name, name, new NetworkAddress("127.0.0.1", 1010 + i)); + + Map<String, String> systemAttributes; + if (owningNodesSet.get(i)) { + systemAttributes = Map.of(NODE_ATTRIBUTES_KEY, viewName); + } else { + systemAttributes = Map.of(); + } + + topology.add(new LogicalNode(clusterNode, Map.of(), systemAttributes)); + } + + return new LogicalTopologySnapshot(1, topology); + } }