This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 953d0d4e53 IGNITE-20437 Sql. Support system views in execution (#2666)
953d0d4e53 is described below

commit 953d0d4e53dccf12f2b68b19d5a9e25000d05dfd
Author: korlov42 <kor...@gridgain.com>
AuthorDate: Fri Oct 6 12:03:25 2023 +0300

    IGNITE-20437 Sql. Support system views in execution (#2666)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |  10 +
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   6 +-
 modules/sql-engine/build.gradle                    |   1 +
 .../internal/sql/engine/SqlQueryProcessor.java     |  45 ++-
 .../exec/ExecutionDependencyResolverImpl.java      |  23 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |  45 ++-
 .../sql/engine/exec/ResolvedDependencies.java      |  17 +-
 .../sql/engine/exec/ScannableDataSource.java}      |  20 +-
 ...vider.java => ScannableDataSourceProvider.java} |  18 +-
 .../internal/sql/engine/exec/SqlRowHandler.java    |   3 +-
 .../exec/mapping/ExecutionTargetProvider.java      |  18 +-
 .../sql/engine/exec/mapping/FragmentMapper.java    |   7 +-
 .../sql/engine/exec/mapping/FragmentSplitter.java  |  48 ++-
 .../engine/exec/mapping/MappingServiceImpl.java    |  27 +-
 .../sql/engine/exec/rel/DataSourceScanNode.java    |  81 +++++
 .../ignite/internal/sql/engine/prepare/Cloner.java |   4 +-
 .../internal/sql/engine/prepare/Fragment.java      |  38 ++-
 .../internal/sql/engine/prepare/QuerySplitter.java |  49 ++-
 .../exec/ExecutionDependencyResolverSelfTest.java  |   2 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |  11 +-
 .../sql/engine/exec/rel/AbstractExecutionTest.java |  63 +++-
 .../exec/rel/AbstractSetOpExecutionTest.java       |   9 +-
 .../sql/engine/exec/rel/BaseAggregateTest.java     |   8 +-
 .../exec/rel/DataSourceScanNodeSelfTest.java       | 345 +++++++++++++++++++++
 .../sql/engine/exec/rel/ExchangeExecutionTest.java |   9 +-
 .../sql/engine/exec/rel/ExecutionTest.java         |   8 +-
 .../rel/HashAggregateSingleGroupExecutionTest.java |   9 +-
 .../exec/rel/HashIndexSpoolExecutionTest.java      |   9 +-
 .../exec/rel/IndexScanNodeExecutionTest.java       |   9 +-
 .../sql/engine/exec/rel/LimitExecutionTest.java    |   9 +-
 .../engine/exec/rel/MergeJoinExecutionTest.java    |   8 +-
 .../exec/rel/NestedLoopJoinExecutionTest.java      |   8 +-
 .../exec/rel/SortedIndexSpoolExecutionTest.java    |   8 +-
 .../exec/rel/TableScanNodeExecutionTest.java       |   9 +-
 .../engine/exec/rel/TableSpoolExecutionTest.java   |   9 +-
 .../sql/engine/framework/TestBuilders.java         |  19 +-
 .../internal/sql/engine/framework/TestNode.java    |   2 +-
 .../internal/systemview/SystemViewManager.java     |  21 ++
 .../internal/systemview/SystemViewManagerImpl.java |  77 ++++-
 .../internal/systemview/SystemViewManagerTest.java |  66 ++++
 41 files changed, 1051 insertions(+), 131 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 963280289a..ce198c36d4 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -254,6 +254,16 @@ public class ErrorGroups {
 
         /** Session closed error. Operation is rejected because SQL session 
was closed. */
         public static final int SESSION_CLOSED_ERR = 
SQL_ERR_GROUP.registerErrorCode((short) 11);
+
+        /**
+         * SQL engine was unable to map query on current cluster topology.
+         *
+         * <p>This may be due to a variety of reasons, but most probably 
because of all nodes hosting certain system view
+         * or a table partition went offline.
+         *
+         * <p>See error message for details.
+         */
+        public static final int MAPPING_ERR = 
SQL_ERR_GROUP.registerErrorCode((short) 12);
     }
 
     /** Meta storage error group. */
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 97a0019bca..2bbe6f8f67 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -110,6 +110,7 @@ import 
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModule;
 import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
@@ -405,7 +406,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 replicaSvc,
                 hybridClock,
                 catalogManager,
-                metricManager
+                metricManager,
+                new SystemViewManagerImpl(name, catalogManager)
         );
 
         // Preparing the result map.
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 9b1cc38fab..d25d7429e4 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -529,8 +529,9 @@ public class IgniteImpl implements Ignite {
                 delayDurationMsSupplier
         );
 
-        systemViewManager = new SystemViewManagerImpl(catalogManager);
+        systemViewManager = new SystemViewManagerImpl(name, catalogManager);
         nodeAttributesCollector.register(systemViewManager);
+        logicalTopology.addEventListener(systemViewManager);
 
         raftMgr.appendEntriesRequestInterceptor(new 
CheckCatalogVersionOnAppendEntries(catalogManager));
         raftMgr.actionRequestInterceptor(new 
CheckCatalogVersionOnActionRequest(catalogManager));
@@ -589,7 +590,8 @@ public class IgniteImpl implements Ignite {
                 replicaSvc,
                 clock,
                 catalogManager,
-                metricManager
+                metricManager,
+                systemViewManager
         );
 
         sql = new IgniteSqlImpl(qryEngine, new 
IgniteTransactionsImpl(txManager, observableTimestampTracker));
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 57beaa5bcb..94b8d4e999 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -38,6 +38,7 @@ dependencies {
     implementation project(':ignite-catalog')
     implementation project(':ignite-metrics')
     implementation project(':ignite-cluster-management')
+    implementation project(':ignite-system-view')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.caffeine
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index f80787f964..01fc27d788 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
 
@@ -72,6 +73,8 @@ import 
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
 import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
 import org.apache.ignite.internal.sql.engine.schema.CatalogSqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.session.Session;
 import org.apache.ignite.internal.sql.engine.session.SessionId;
@@ -82,16 +85,19 @@ import 
org.apache.ignite.internal.sql.engine.session.SessionProperty;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.sql.metrics.SqlClientMetricSource;
 import org.apache.ignite.internal.storage.DataStorageManager;
+import org.apache.ignite.internal.systemview.SystemViewManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.lang.SchemaNotFoundException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.sql.SqlException;
@@ -158,6 +164,10 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private final ReplicaService replicaService;
 
+    private final SqlSchemaManager sqlSchemaManager;
+
+    private final SystemViewManager systemViewManager;
+
     private volatile SessionManager sessionManager;
 
     private volatile QueryTaskExecutor taskExecutor;
@@ -166,8 +176,6 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private volatile PrepareService prepareSvc;
 
-    private volatile SqlSchemaManager sqlSchemaManager;
-
     /** Clock. */
     private final HybridClock clock;
 
@@ -192,7 +200,8 @@ public class SqlQueryProcessor implements QueryProcessor {
             ReplicaService replicaService,
             HybridClock clock,
             CatalogManager catalogManager,
-            MetricManager metricManager
+            MetricManager metricManager,
+            SystemViewManager systemViewManager
     ) {
         this.clusterSrvc = clusterSrvc;
         this.logicalTopologyService = logicalTopologyService;
@@ -204,6 +213,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         this.clock = clock;
         this.catalogManager = catalogManager;
         this.metricManager = metricManager;
+        this.systemViewManager = systemViewManager;
 
         sqlSchemaManager = new CatalogSqlSchemaManager(
                 catalogManager,
@@ -250,13 +260,16 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         var executableTableRegistry = new 
ExecutableTableRegistryImpl(tableManager, schemaManager, replicaService, clock, 
TABLE_CACHE_SIZE);
 
-        var dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry);
+        var dependencyResolver = new ExecutionDependencyResolverImpl(
+                executableTableRegistry,
+                view -> () -> systemViewManager.scanView(view.name())
+        );
 
         var executionTargetProvider = new ExecutionTargetProvider() {
             @Override
-            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, int tableId) {
-                return tableManager.tableAsync(tableId)
-                        .thenCompose(table -> 
table.internalTable().primaryReplicas())
+            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
+                return tableManager.tableAsync(table.id())
+                        .thenCompose(tbl -> 
tbl.internalTable().primaryReplicas())
                         .thenApply(replicas -> {
                             List<NodeWithTerm> assignments = replicas.stream()
                                     .map(primaryReplica -> new 
NodeWithTerm(primaryReplica.node().name(), primaryReplica.term()))
@@ -265,6 +278,24 @@ public class SqlQueryProcessor implements QueryProcessor {
                             return factory.partitioned(assignments);
                         });
             }
+
+            @Override
+            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+                List<String> nodes = 
systemViewManager.owningNodes(view.name());
+
+                if (nullOrEmpty(nodes)) {
+                    return CompletableFuture.failedFuture(
+                            new SqlException(Sql.MAPPING_ERR, format("The view 
with name '{}' could not be found on"
+                                    + " any active nodes in the cluster", 
view.name()))
+                    );
+                }
+
+                return CompletableFuture.completedFuture(
+                        view.distribution() == IgniteDistributions.single()
+                                ? factory.oneOf(nodes)
+                                : factory.allOf(nodes)
+                );
+            }
         };
 
         var mappingService = new MappingServiceImpl(nodeName, 
executionTargetProvider);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
index 90d35e9b35..c1351ca8eb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverImpl.java
@@ -28,10 +28,12 @@ import 
org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.DistributionFunction;
@@ -45,9 +47,14 @@ import 
org.apache.ignite.internal.sql.engine.trait.TraitUtils;
 public class ExecutionDependencyResolverImpl implements 
ExecutionDependencyResolver {
 
     private final ExecutableTableRegistry registry;
+    private final ScannableDataSourceProvider dataSourceProvider;
 
-    public ExecutionDependencyResolverImpl(ExecutableTableRegistry registry) {
+    public ExecutionDependencyResolverImpl(
+            ExecutableTableRegistry registry,
+            ScannableDataSourceProvider dataSourceProvider
+    ) {
         this.registry = registry;
+        this.dataSourceProvider = dataSourceProvider;
     }
 
     /**
@@ -56,6 +63,7 @@ public class ExecutionDependencyResolverImpl implements 
ExecutionDependencyResol
     @Override
     public CompletableFuture<ResolvedDependencies> 
resolveDependencies(Iterable<IgniteRel> rels, IgniteSchema schema) {
         Map<Integer, CompletableFuture<ExecutableTable>> tableMap = new 
HashMap<>();
+        Map<Integer, ScannableDataSource> dataSources = new HashMap<>();
 
         IgniteRelShuttle shuttle = new IgniteRelShuttle() {
             @Override
@@ -103,6 +111,17 @@ public class ExecutionDependencyResolverImpl implements 
ExecutionDependencyResol
                 return rel;
             }
 
+            @Override
+            public IgniteRel visit(IgniteSystemViewScan rel) {
+                IgniteSystemView view = 
rel.getTable().unwrap(IgniteSystemView.class);
+
+                assert view != null;
+
+                dataSources.put(view.id(), 
dataSourceProvider.forSystemView(view));
+
+                return rel;
+            }
+
             private void resolveDistributionFunction(IgniteDistribution 
distribution) {
                 DistributionFunction function = distribution.function();
 
@@ -135,7 +154,7 @@ public class ExecutionDependencyResolverImpl implements 
ExecutionDependencyResol
                             .map(e -> Map.entry(e.getKey(), 
e.getValue().join()))
                             .collect(Collectors.toMap(Entry::getKey, 
Entry::getValue));
 
-                    return new ResolvedDependencies(map);
+                    return new ResolvedDependencies(map, dataSources);
                 });
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 130bea8319..10612cda10 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -44,6 +44,8 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
@@ -52,6 +54,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.exec.rel.AbstractSetOpNode;
 import 
org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNode;
+import org.apache.ignite.internal.sql.engine.exec.rel.DataSourceScanNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.FilterNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.HashAggregateNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
@@ -108,9 +111,12 @@ import 
org.apache.ignite.internal.sql.engine.rel.set.IgniteMapSetOp;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteReduceIntersect;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
 import org.apache.ignite.internal.sql.engine.rule.LogicalScanConverterRule;
+import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
+import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -444,7 +450,33 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
     /** {@inheritDoc} */
     @Override
     public Node<RowT> visit(IgniteSystemViewScan rel) {
-        throw new UnsupportedOperationException("System view scan is not 
implemented");
+        RexNode condition = rel.condition();
+        List<RexNode> projects = rel.projects();
+        ImmutableBitSet requiredColumns = rel.requiredColumns();
+        IgniteDataSource igniteDataSource = 
rel.getTable().unwrapOrThrow(IgniteDataSource.class);
+
+        BinaryTupleSchema schema = 
fromTableDescriptor(igniteDataSource.descriptor());
+
+        ScannableDataSource dataSource = 
resolvedDependencies.dataSource(igniteDataSource.id());
+
+        IgniteTypeFactory typeFactory = ctx.getTypeFactory();
+
+        RelDataType rowType = igniteDataSource.getRowType(typeFactory, 
requiredColumns);
+
+        Predicate<RowT> filters = condition == null ? null : 
expressionFactory.predicate(condition, rowType);
+        Function<RowT, RowT> prj = projects == null ? null : 
expressionFactory.project(projects, rowType);
+
+        RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+        RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
+        return new DataSourceScanNode<>(
+                ctx,
+                rowFactory,
+                schema,
+                dataSource,
+                filters,
+                prj,
+                requiredColumns == null ? null : requiredColumns.toBitSet()
+        );
     }
 
     /** {@inheritDoc} */
@@ -859,4 +891,15 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
     public <T extends Node<RowT>> T go(IgniteRel rel) {
         return (T) visit(rel);
     }
+
+    private static BinaryTupleSchema fromTableDescriptor(TableDescriptor 
descriptor) {
+        Element[] elements = new Element[descriptor.columnsCount()];
+
+        int idx = 0;
+        for (ColumnDescriptor column : descriptor) {
+            elements[idx++] = new Element(column.physicalType(), 
column.nullable());
+        }
+
+        return BinaryTupleSchema.create(elements);
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
index 7dfffbebb6..2371b09f87 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ResolvedDependencies.java
@@ -27,9 +27,15 @@ public class ResolvedDependencies {
 
     private final Map<Integer, ExecutableTable> tableMap;
 
+    private final Map<Integer, ScannableDataSource> dataSourceMap;
+
     /** Constructor. */
-    public ResolvedDependencies(Map<Integer, ExecutableTable> tableMap) {
+    public ResolvedDependencies(
+            Map<Integer, ExecutableTable> tableMap,
+            Map<Integer, ScannableDataSource> dataSourceMap
+    ) {
         this.tableMap = tableMap;
+        this.dataSourceMap = dataSourceMap;
     }
 
     /**
@@ -56,6 +62,15 @@ public class ResolvedDependencies {
         return executableTable.tableDescriptor();
     }
 
+    /** Returns data source instance by given id. */
+    public ScannableDataSource dataSource(int dataSourceId) {
+        ScannableDataSource dataSource = dataSourceMap.get(dataSourceId);
+
+        assert dataSource != null : "DataSource does not exist: " + 
dataSourceId;
+
+        return dataSource;
+    }
+
     private ExecutableTable getTable(int tableId) {
         ExecutableTable executableTable = tableMap.get(tableId);
         assert executableTable != null : "ExecutableTable does not exist: " + 
tableId;
diff --git 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSource.java
similarity index 58%
copy from 
modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSource.java
index 1c2ef6814d..7453c899fb 100644
--- 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSource.java
@@ -15,21 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.systemview;
+package org.apache.ignite.internal.sql.engine.exec;
 
-import org.apache.ignite.internal.manager.IgniteComponent;
+import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.schema.row.InternalTuple;
 
 /**
- * The system view manager is responsible for registering system views in the 
cluster.
+ * Provides read operations over an abstract data source.
  */
-public interface SystemViewManager extends IgniteComponent {
-    /**
-     * Registers a system view.
-     *
-     * <p>Registration of views is completed when the system view manager 
starts. Therefore,
-     * it is necessary for other components to register the views before the 
manager is started.
-     *
-     * @param view System view to register.
-     */
-    void register(SystemView<?> view);
+@FunctionalInterface
+public interface ScannableDataSource {
+    Publisher<InternalTuple> scan();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSourceProvider.java
similarity index 58%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSourceProvider.java
index 8d9335c314..4449109a61 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableDataSourceProvider.java
@@ -15,22 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.exec.mapping;
+package org.apache.ignite.internal.sql.engine.exec;
 
-import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 
 /**
- * An integration point that helps the mapper to acquire an execution target 
of particular
- * relation from fragment.
+ * An integration point that helps the execution to scan over arbitrary source 
of rows.
  */
 @SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
-public interface ExecutionTargetProvider {
-    /**
-     * Returns an execution target for a table with given id.
-     *
-     * @param factory A factory to create target for given table.
-     * @param tableId A table id to create execution target for.
-     * @return A future representing the result.
-     */
-    CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory 
factory, int tableId);
+public interface ScannableDataSourceProvider {
+    ScannableDataSource forSystemView(IgniteSystemView view);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
index 6683214d84..03a87a63e3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlRowHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl.UNSPECIFIED_VALUE_PLACEHOLDER;
 
 import java.math.BigDecimal;
@@ -176,7 +177,7 @@ public class SqlRowHandler implements 
RowHandler<RowWrapper> {
             /** {@inheritDoc} */
             @Override
             public RowWrapper create(InternalTuple tuple) {
-                assert schemaLen == tuple.elementCount();
+                assert schemaLen == tuple.elementCount() : 
format("schemaLen={}, tupleSize={}", schemaLen, tuple.elementCount());
 
                 return new BinaryTupleRowWrapper(rowSchema, tuple);
             }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
index 8d9335c314..fa6f3a343c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetProvider.java
@@ -18,19 +18,29 @@
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 
 /**
  * An integration point that helps the mapper to acquire an execution target 
of particular
  * relation from fragment.
  */
-@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
 public interface ExecutionTargetProvider {
     /**
-     * Returns an execution target for a table with given id.
+     * Returns an execution target for a given table.
      *
      * @param factory A factory to create target for given table.
-     * @param tableId A table id to create execution target for.
+     * @param table A table to create execution target for.
      * @return A future representing the result.
      */
-    CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory 
factory, int tableId);
+    CompletableFuture<ExecutionTarget> forTable(ExecutionTargetFactory 
factory, IgniteTable table);
+
+    /**
+     * Returns an execution target for a given view.
+     *
+     * @param factory A factory to create target for given table.
+     * @param view A view to create execution target for.
+     * @return A future representing the result.
+     */
+    CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory 
factory, IgniteSystemView view);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index 202673fa83..86e7ba61a2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
 import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
 import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
@@ -413,10 +414,10 @@ class FragmentMapper {
         }
 
         private Mapping mapTableScan(long sourceId, 
ProjectableFilterableTableScan rel) {
-            IgniteTable igniteTable = 
rel.getTable().unwrapOrThrow(IgniteTable.class);
+            IgniteDataSource igniteDataSource = 
rel.getTable().unwrapOrThrow(IgniteDataSource.class);
 
-            ExecutionTarget target = targets.get(igniteTable.id());
-            assert target != null : "No colocation group for " + 
igniteTable.id();
+            ExecutionTarget target = targets.get(igniteDataSource.id());
+            assert target != null : "No colocation group for " + 
igniteDataSource.id();
 
             return newMapping(sourceId, target);
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
index 42f15dee32..f23ed61d6a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentSplitter.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
-import it.unimi.dsi.fastutil.ints.IntArraySet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -36,9 +36,11 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 
@@ -77,23 +79,54 @@ class FragmentSplitter extends IgniteRelShuttle {
         return res;
     }
 
+    @Override
+    public IgniteRel visit(IgniteSystemViewScan rel) {
+        IgniteSystemView view = rel.getTable().unwrap(IgniteSystemView.class);
+
+        assert view != null;
+
+        if (curr.seenRelations.add(view.id())) {
+            curr.systemViews.add(view);
+        }
+
+        return super.visit(rel);
+    }
+
     @Override
     public IgniteRel visit(IgniteTableScan rel) {
-        curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id());
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+
+        assert table != null;
+
+        if (curr.seenRelations.add(table.id())) {
+            curr.tables.add(table);
+        }
 
         return super.visit(rel);
     }
 
     @Override
     public IgniteRel visit(IgniteIndexScan rel) {
-        curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id());
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+
+        assert table != null;
+
+        if (curr.seenRelations.add(table.id())) {
+            curr.tables.add(table);
+        }
 
         return super.visit(rel);
     }
 
     @Override
     public IgniteRel visit(IgniteTableModify rel) {
-        curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id());
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+
+        assert table != null;
+
+        if (curr.seenRelations.add(table.id())) {
+            curr.tables.add(table);
+        }
 
         return super.visit(rel);
     }
@@ -182,8 +215,11 @@ class FragmentSplitter extends IgniteRelShuttle {
 
         private IgniteRel root;
 
+        private final IntSet seenRelations = new IntOpenHashSet();
+
         private final List<IgniteReceiver> remotes = new ArrayList<>();
-        private final IntSet tableIds = new IntArraySet();
+        private final List<IgniteTable> tables = new ArrayList<>();
+        private final List<IgniteSystemView> systemViews = new ArrayList<>();
 
         private FragmentProto(long id, boolean correlated, IgniteRel root) {
             this.id = id;
@@ -192,7 +228,7 @@ class FragmentSplitter extends IgniteRelShuttle {
         }
 
         Fragment build() {
-            return new Fragment(id, correlated, root, remotes, tableIds);
+            return new Fragment(id, correlated, root, remotes, tables, 
systemViews);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 8b0c1cfcff..e84f62c928 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec.mapping;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -34,6 +33,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
 import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 
 /**
  * An implementation of {@link MappingService}.
@@ -76,14 +77,17 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
 
         List<Fragment> fragments0 = Commons.transform(fragments, fragment -> 
fragment.attach(context.cluster()));
 
-        List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets = 
fragments0.stream()
-                .flatMap(fragment ->
-                        fragment.tableIds().intStream()
-                                .mapToObj(id ->
-                                        
targetProvider.forTable(context.targetFactory(), id)
-                                                .thenApply(target -> 
IntObjectPair.of(id, target))
+        List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
+                fragments0.stream().flatMap(fragment -> Stream.concat(
+                        fragment.tables().stream()
+                                .map(table -> 
targetProvider.forTable(context.targetFactory(), table)
+                                        .thenApply(target -> 
IntObjectPair.of(table.id(), target))
+                                ),
+                        fragment.systemViews().stream()
+                                .map(view -> 
targetProvider.forSystemView(context.targetFactory(), view)
+                                        .thenApply(target -> 
IntObjectPair.of(view.id(), target))
                                 )
-                )
+                ))
                 .collect(Collectors.toList());
 
         return allOf(targets.toArray(new CompletableFuture[0]))
@@ -155,12 +159,12 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
                     }
 
                     if (!lastAttemptSucceed) {
-                        throw new IgniteInternalException(INTERNAL_ERR, 
"Unable to map query", ex);
+                        throw new IgniteInternalException(Sql.MAPPING_ERR, 
"Unable to map query: " + ex.getMessage(), ex);
                     }
 
                     List<MappedFragment> result = new 
ArrayList<>(fragmentsToMap.size());
                     for (Fragment fragment : fragmentsToMap) {
-                        FragmentMapping mapping =  
mappingByFragmentId.get(fragment.fragmentId());
+                        FragmentMapping mapping = 
mappingByFragmentId.get(fragment.fragmentId());
 
                         ColocationGroup targetGroup = null;
                         if (!fragment.rootFragment()) {
@@ -242,7 +246,8 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
                     sender = new IgniteSender(sender.getCluster(), 
sender.getTraitSet(),
                             sender.getInput(), sender.exchangeId(), 
newTargetId, sender.distribution());
 
-                    fragment = new Fragment(fragment.fragmentId(), 
fragment.correlated(), sender, fragment.remotes(), fragment.tableIds());
+                    fragment = new Fragment(fragment.fragmentId(), 
fragment.correlated(), sender,
+                            fragment.remotes(), fragment.tables(), 
fragment.systemViews());
                 }
             }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNode.java
new file mode 100644
index 0000000000..24da64c175
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.BitSet;
+import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource;
+import 
org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
+import org.apache.ignite.internal.util.subscription.TransformingPublisher;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Execution node for scan over arbitrary {@link ScannableDataSource data 
source}.
+ */
+public class DataSourceScanNode<RowT> extends StorageScanNode<RowT> {
+
+    private final ScannableDataSource dataSource;
+
+    private final Function<InternalTuple, RowT> converter;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Execution context.
+     * @param rowFactory Row factory.
+     * @param schema Schema of the tuples returned by data source.
+     * @param dataSource A data source to scan.
+     * @param filters Optional filter to filter out rows.
+     * @param rowTransformer Optional projection function.
+     * @param requiredColumns Optional set of column of interest.
+     */
+    public DataSourceScanNode(
+            ExecutionContext<RowT> ctx,
+            RowHandler.RowFactory<RowT> rowFactory,
+            BinaryTupleSchema schema,
+            ScannableDataSource dataSource,
+            @Nullable Predicate<RowT> filters,
+            @Nullable Function<RowT, RowT> rowTransformer,
+            @Nullable BitSet requiredColumns
+    ) {
+        super(ctx, filters, rowTransformer);
+
+        this.dataSource = dataSource;
+
+        if (requiredColumns == null || requiredColumns.cardinality() == 
schema.elementCount()) {
+            converter = rowFactory::create;
+        } else {
+            int[] mapping = requiredColumns.stream().toArray();
+
+            converter = tuple -> rowFactory.create(new 
FieldDeserializingProjectedTuple(schema, tuple, mapping));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Publisher<RowT> scan() {
+        return new TransformingPublisher<>(dataSource.scan(), converter);
+    }
+}
+
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java
index c6491b6a5a..ed2918bfdb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Cloner.java
@@ -49,8 +49,8 @@ public class Cloner {
 
             IgniteRel newRoot = visit(src.root());
 
-            return new Fragment(src.fragmentId(), newRoot, src.correlated(),
-                    remotes, src.serialized(), src.tableIds());
+            return new Fragment(src.fragmentId(), src.correlated(), newRoot, 
src.serialized(),
+                    remotes, src.tables(), src.systemViews());
         } finally {
             remotes = null;
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
index 2fa37ec274..6fe68c3d62 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
@@ -19,14 +19,14 @@ package org.apache.ignite.internal.sql.engine.prepare;
 
 import static 
org.apache.ignite.internal.sql.engine.externalize.RelJsonWriter.toJson;
 
-import it.unimi.dsi.fastutil.ints.IntSet;
-import it.unimi.dsi.fastutil.ints.IntSets;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
@@ -45,7 +45,8 @@ public class Fragment {
     private final String rootSer;
 
     private final List<IgniteReceiver> remotes;
-    private final IntSet tableIds;
+    private final List<IgniteTable> tables;
+    private final List<IgniteSystemView> systemViews;
 
     private final boolean correlated;
 
@@ -56,21 +57,32 @@ public class Fragment {
      * @param correlated Whether some correlated variables should be set prior 
to fragment execution.
      * @param root Root node of the fragment.
      * @param remotes Remote sources of the fragment.
+     * @param tables A list of tables containing by this fragment.
+     * @param systemViews A list of system views containing by this fragment.
      */
-    public Fragment(long id, boolean correlated, IgniteRel root, 
List<IgniteReceiver> remotes, IntSet tableIds) {
-        this(id, root, correlated, remotes, null, tableIds);
+    public Fragment(long id, boolean correlated, IgniteRel root, 
List<IgniteReceiver> remotes,
+            List<IgniteTable> tables, List<IgniteSystemView> systemViews) {
+        this(id, correlated, root, null, remotes, tables, systemViews);
     }
 
     /**
      * Constructor.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     *
+     * @param id An identifier of this fragment.
+     * @param correlated Whether some correlated variables should be set prior 
to fragment execution.
+     * @param root Root node of the fragment.
+     * @param rootSer Serialised representation of a root. Optional.
+     * @param remotes Remote sources of the fragment.
+     * @param tables A list of tables containing by this fragment.
+     * @param systemViews A list of system views containing by this fragment.
      */
-    Fragment(long id, IgniteRel root, boolean correlated, List<IgniteReceiver> 
remotes,
-            @Nullable String rootSer, IntSet tableIds) {
+    Fragment(long id, boolean correlated, IgniteRel root, @Nullable String 
rootSer, List<IgniteReceiver> remotes,
+            List<IgniteTable> tables, List<IgniteSystemView> systemViews) {
         this.id = id;
         this.root = root;
         this.remotes = List.copyOf(remotes);
-        this.tableIds = IntSets.unmodifiable(tableIds);
+        this.tables = List.copyOf(tables);
+        this.systemViews = List.copyOf(systemViews);
         this.rootSer = rootSer != null ? rootSer : toJson(root);
         this.correlated = correlated;
     }
@@ -115,8 +127,12 @@ public class Fragment {
         return remotes;
     }
 
-    public IntSet tableIds() {
-        return tableIds;
+    public List<IgniteTable> tables() {
+        return tables;
+    }
+
+    public List<IgniteSystemView> systemViews() {
+        return systemViews;
     }
 
     public boolean rootFragment() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java
index e0da65870d..7ed65752a1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/QuerySplitter.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.prepare;
 
-import it.unimi.dsi.fastutil.ints.IntArraySet;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -30,10 +30,12 @@ import 
org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTrimExchange;
 import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 
@@ -130,7 +132,13 @@ public class QuerySplitter extends IgniteRelShuttle {
     /** {@inheritDoc} */
     @Override
     public IgniteRel visit(IgniteIndexScan rel) {
-        curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id());
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+
+        assert table != null;
+
+        if (curr.seenRelations.add(table.id())) {
+            curr.tables.add(table);
+        }
 
         return rel.clone(IdGenerator.nextId());
     }
@@ -138,7 +146,13 @@ public class QuerySplitter extends IgniteRelShuttle {
     /** {@inheritDoc} */
     @Override
     public IgniteRel visit(IgniteTableScan rel) {
-        curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id());
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+
+        assert table != null;
+
+        if (curr.seenRelations.add(table.id())) {
+            curr.tables.add(table);
+        }
 
         return rel.clone(IdGenerator.nextId());
     }
@@ -146,19 +160,42 @@ public class QuerySplitter extends IgniteRelShuttle {
     /** {@inheritDoc} */
     @Override
     public IgniteRel visit(IgniteTableModify rel) {
-        curr.tableIds.add(rel.getTable().unwrap(IgniteTable.class).id());
+        IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
+
+        assert table != null;
+
+        if (curr.seenRelations.add(table.id())) {
+            curr.tables.add(table);
+        }
 
         return super.visit(rel);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel visit(IgniteSystemViewScan rel) {
+        IgniteSystemView view = rel.getTable().unwrap(IgniteSystemView.class);
+
+        assert view != null;
+
+        if (curr.seenRelations.add(view.id())) {
+            curr.systemViews.add(view);
+        }
+
+        return rel.clone(IdGenerator.nextId());
+    }
+
     private static class FragmentProto {
         private final long id;
         private final boolean correlated;
 
         private IgniteRel root;
 
+        private final IntSet seenRelations = new IntOpenHashSet();
+
         private final List<IgniteReceiver> remotes = new ArrayList<>();
-        private final IntSet tableIds = new IntArraySet();
+        private final List<IgniteTable> tables = new ArrayList<>();
+        private final List<IgniteSystemView> systemViews = new ArrayList<>();
 
         private FragmentProto(long id, boolean correlated, IgniteRel root) {
             this.id = id;
@@ -167,7 +204,7 @@ public class QuerySplitter extends IgniteRelShuttle {
         }
 
         Fragment build() {
-            return new Fragment(id, correlated, root, remotes, tableIds);
+            return new Fragment(id, correlated, root, remotes, tables, 
systemViews);
         }
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
index d83ece2045..f6ce53c7fb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionDependencyResolverSelfTest.java
@@ -216,7 +216,7 @@ public class ExecutionDependencyResolverSelfTest extends 
AbstractPlannerTest {
         }
 
         CompletableFuture<ResolvedDependencies> resolveDependencies(String 
sql) {
-            ExecutionDependencyResolver resolver = new 
ExecutionDependencyResolverImpl(registry);
+            ExecutionDependencyResolver resolver = new 
ExecutionDependencyResolverImpl(registry, null);
 
             IgniteRel rel;
             try {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index d392e6f101..78bd82e109 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -95,6 +95,8 @@ import 
org.apache.ignite.internal.sql.engine.schema.CatalogColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
@@ -606,7 +608,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         NoOpExecutableTableRegistry executableTableRegistry = new 
NoOpExecutableTableRegistry();
 
-        ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry);
+        ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry, null);
 
         CalciteSchema rootSch = CalciteSchema.createRootSchema(false);
         rootSch.add(schema.getName(), schema);
@@ -616,13 +618,18 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         var targetProvider = new ExecutionTargetProvider() {
             @Override
-            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, int tableId) {
+            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
                 if (mappingException != null) {
                     return CompletableFuture.failedFuture(mappingException);
                 }
 
                 return 
CompletableFuture.completedFuture(factory.allOf(nodeNames));
             }
+
+            @Override
+            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+                return CompletableFuture.failedFuture(new AssertionError("Not 
supported"));
+            }
         };
 
         var mappingService = new MappingServiceImpl(nodeName, targetProvider);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index a78189bd4d..28ffb1549b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -36,6 +38,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.row.InternalTuple;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
@@ -61,11 +67,9 @@ import org.junit.jupiter.api.BeforeEach;
  * AbstractExecutionTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public abstract class AbstractExecutionTest extends IgniteAbstractTest {
+public abstract class AbstractExecutionTest<T> extends IgniteAbstractTest {
     public static final Object[][] EMPTY = new Object[0][];
 
-    private Throwable lastE;
-
     private QueryTaskExecutorImpl taskExecutor;
 
     @BeforeEach
@@ -81,17 +85,15 @@ public abstract class AbstractExecutionTest extends 
IgniteAbstractTest {
     @AfterEach
     public void afterTest() {
         taskExecutor.stop();
-
-        if (lastE != null) {
-            throw new AssertionError(lastE);
-        }
     }
 
-    protected ExecutionContext<Object[]> executionContext() {
+    protected abstract RowHandler<T> rowHandler();
+
+    protected ExecutionContext<T> executionContext() {
         return executionContext(false);
     }
 
-    protected ExecutionContext<Object[]> executionContext(boolean withDelays) {
+    protected ExecutionContext<T> executionContext(boolean withDelays) {
         if (withDelays) {
             StripedThreadPoolExecutor testExecutor = new 
IgniteTestStripedThreadPoolExecutor(8,
                     NamedThreadFactory.threadPrefix("fake-test-node", 
"sqlTestExec"),
@@ -120,17 +122,12 @@ public abstract class AbstractExecutionTest extends 
IgniteAbstractTest {
                 new ClusterNodeImpl("1", "fake-test-node", 
NetworkAddress.from("127.0.0.1:1111")),
                 "fake-test-node",
                 fragmentDesc,
-                ArrayRowHandler.INSTANCE,
+                rowHandler(),
                 Map.of(),
                 TxAttributes.fromTx(new NoOpTransaction("fake-test-node"))
         );
     }
 
-    private void handle(Thread t, Throwable ex) {
-        log.error(ex.getMessage(), ex);
-        lastE = ex;
-    }
-
     protected Object[] row(Object... fields) {
         return fields;
     }
@@ -341,7 +338,7 @@ public abstract class AbstractExecutionTest extends 
IgniteAbstractTest {
         }
     }
 
-    protected RowHandler.RowFactory<Object[]> rowFactory() {
+    static RowHandler.RowFactory<Object[]> rowFactory() {
         return new RowHandler.RowFactory<>() {
             @Override
             public RowHandler<Object[]> handler() {
@@ -369,4 +366,38 @@ public abstract class AbstractExecutionTest extends 
IgniteAbstractTest {
             }
         };
     }
+
+    static TupleFactory tupleFactoryFromSchema(BinaryTupleSchema schema) {
+        return new BinaryTupleFactory(schema);
+    }
+
+    @FunctionalInterface
+    interface TupleFactory {
+        InternalTuple create(Object... values);
+    }
+
+    private static class BinaryTupleFactory implements TupleFactory {
+        private final BinaryTupleSchema schema;
+
+        BinaryTupleFactory(BinaryTupleSchema schema) {
+            this.schema = schema;
+        }
+
+        @Override
+        public InternalTuple create(Object... values) {
+            if (schema.elementCount() != values.length) {
+                throw new IllegalArgumentException(
+                        format("Expecting {} elements, but was {}", 
schema.elementCount(), values.length)
+                );
+            }
+
+            BinaryTupleBuilder builder = new 
BinaryTupleBuilder(schema.elementCount());
+
+            for (int i = 0; i < schema.elementCount(); i++) {
+                BinaryRowConverter.appendValue(builder, schema.element(i), 
values[i]);
+            }
+
+            return new BinaryTuple(schema.elementCount(), builder.build());
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java
index 8b69f87b4e..08f081ab49 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpExecutionTest.java
@@ -36,13 +36,15 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.junit.jupiter.api.Test;
 
 /**
  * Abstract test for set operator (MINUS, INTERSECT) execution.
  */
-public abstract class AbstractSetOpExecutionTest extends AbstractExecutionTest 
{
+public abstract class AbstractSetOpExecutionTest extends 
AbstractExecutionTest<Object[]> {
 
     private static final int COLUMN_NUN = 2;
 
@@ -162,4 +164,9 @@ public abstract class AbstractSetOpExecutionTest extends 
AbstractExecutionTest {
 
     protected abstract AbstractSetOpNode<Object[]> 
setOpNodeFactory(ExecutionContext<Object[]> ctx,
             AggregateType type, int columnCount, boolean all, int inputsCnt);
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
index 00fa3ee1fd..27f06ffe28 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/BaseAggregateTest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -57,7 +58,7 @@ import org.junit.jupiter.params.provider.EnumSource;
  * A test class that defines basic test scenarios to verify the execution of 
each type of aggregate.
  */
 @SuppressWarnings("resource")
-public abstract class BaseAggregateTest extends AbstractExecutionTest {
+public abstract class BaseAggregateTest extends 
AbstractExecutionTest<Object[]> {
     @ParameterizedTest
     @EnumSource
     public void count(TestAggregateType testAgg) {
@@ -720,4 +721,9 @@ public abstract class BaseAggregateTest extends 
AbstractExecutionTest {
 
         MAP_REDUCE
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
new file mode 100644
index 0000000000..05645aeb75
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.row.InternalTuple;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.row.BaseTypeSpec;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.exec.row.TypeSpec;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/** Tests to verify {@link DataSourceScanNode}. */
+@SuppressWarnings("resource")
+public class DataSourceScanNodeSelfTest extends 
AbstractExecutionTest<RowWrapper> {
+    private static final RowSchema ROW_SCHEMA = RowSchema.builder()
+            .addField(NativeTypes.INT32)
+            .addField(NativeTypes.INT64)
+            .addField(NativeTypes.INT8)
+            .addField(NativeTypes.stringOf(64))
+            .addField(NativeTypes.UUID)
+            .build();
+
+    private static final BinaryTupleSchema TUPLE_SCHEMA = 
fromRowSchema(ROW_SCHEMA);
+
+    private static final TupleFactory TUPLE_FACTORY = 
tupleFactoryFromSchema(TUPLE_SCHEMA);
+
+    private static final Object[][] ROWS = {
+            {111, 11L, (byte) 1, "1111", new UUID(0L, 1L)},
+            {222, 22L, (byte) 2, "2222", new UUID(0L, 2L)},
+            {333, 33L, (byte) 3, "3333", new UUID(0L, 3L)},
+            {444, 44L, (byte) 4, "4444", new UUID(0L, 4L)},
+            {555, 55L, (byte) 5, "5555", new UUID(0L, 5L)},
+    };
+
+    @Test
+    void simpleTest() {
+        ExecutionContext<RowWrapper> context = executionContext();
+        List<RowWrapper> rows = initScanAndGetResults(context, null, null, 
null);
+
+        assertThat(rows, notNullValue());
+
+        List<String> expectedRows = List.of(
+                "Row[111, 11, 1, 1111, 00000000-0000-0000-0000-000000000001]",
+                "Row[222, 22, 2, 2222, 00000000-0000-0000-0000-000000000002]",
+                "Row[333, 33, 3, 3333, 00000000-0000-0000-0000-000000000003]",
+                "Row[444, 44, 4, 4444, 00000000-0000-0000-0000-000000000004]",
+                "Row[555, 55, 5, 5555, 00000000-0000-0000-0000-000000000005]"
+        );
+
+        RowHandler<RowWrapper> handler = context.rowHandler();
+
+        List<String> actualRows = 
rows.stream().map(handler::toString).collect(Collectors.toList());
+
+        assertThat(actualRows, equalTo(expectedRows));
+    }
+
+    @Test
+    void scanWithRequiredFields() {
+        ExecutionContext<RowWrapper> context = executionContext();
+        RowHandler<RowWrapper> handler = context.rowHandler();
+        List<RowWrapper> rows = initScanAndGetResults(context, null, null, 
ImmutableBitSet.of(1, 3, 4).toBitSet());
+
+        assertThat(rows, notNullValue());
+
+        List<String> expectedRows = List.of(
+                "Row[11, 1111, 00000000-0000-0000-0000-000000000001]",
+                "Row[22, 2222, 00000000-0000-0000-0000-000000000002]",
+                "Row[33, 3333, 00000000-0000-0000-0000-000000000003]",
+                "Row[44, 4444, 00000000-0000-0000-0000-000000000004]",
+                "Row[55, 5555, 00000000-0000-0000-0000-000000000005]"
+        );
+
+        List<String> actualRows = 
rows.stream().map(handler::toString).collect(Collectors.toList());
+
+        assertThat(actualRows, equalTo(expectedRows));
+    }
+
+    @Test
+    @SuppressWarnings("DataFlowIssue")
+    void scanWithProjection() {
+        ExecutionContext<RowWrapper> context = executionContext();
+        RowHandler<RowWrapper> handler = context.rowHandler();
+        RowFactory<RowWrapper> factory = handler.factory(ROW_SCHEMA);
+
+        Function<RowWrapper, RowWrapper> doubleFirstColumnProjection = row -> {
+            int size = handler.columnCount(row);
+
+            Object[] values = new Object[size];
+
+            values[0] = (Integer) handler.get(0, row) * 2;
+
+            for (int i = 1; i < size; i++) {
+                values[i] = handler.get(i, row);
+            }
+
+            return factory.create(values);
+        };
+
+        List<RowWrapper> rows = initScanAndGetResults(context, null, 
doubleFirstColumnProjection, null);
+
+        assertThat(rows, notNullValue());
+
+        List<String> expectedRows = List.of(
+                "Row[222, 11, 1, 1111, 00000000-0000-0000-0000-000000000001]",
+                "Row[444, 22, 2, 2222, 00000000-0000-0000-0000-000000000002]",
+                "Row[666, 33, 3, 3333, 00000000-0000-0000-0000-000000000003]",
+                "Row[888, 44, 4, 4444, 00000000-0000-0000-0000-000000000004]",
+                "Row[1110, 55, 5, 5555, 00000000-0000-0000-0000-000000000005]"
+        );
+
+        List<String> actualRows = 
rows.stream().map(handler::toString).collect(Collectors.toList());
+
+        assertThat(actualRows, equalTo(expectedRows));
+    }
+
+    @Test
+    @SuppressWarnings("DataFlowIssue")
+    void scanWithFilter() {
+        ExecutionContext<RowWrapper> context = executionContext();
+        RowHandler<RowWrapper> handler = context.rowHandler();
+
+        Predicate<RowWrapper> onlyEven = row -> ((Integer) handler.get(0, 
row)) % 2 == 0;
+
+        List<RowWrapper> rows = initScanAndGetResults(context, onlyEven, null, 
null);
+
+        assertThat(rows, notNullValue());
+
+        List<String> expectedRows = List.of(
+                "Row[222, 22, 2, 2222, 00000000-0000-0000-0000-000000000002]",
+                "Row[444, 44, 4, 4444, 00000000-0000-0000-0000-000000000004]"
+        );
+
+        List<String> actualRows = 
rows.stream().map(handler::toString).collect(Collectors.toList());
+
+        assertThat(actualRows, equalTo(expectedRows));
+    }
+
+    @Test
+    @SuppressWarnings("DataFlowIssue")
+    void scanWithAllOptions() {
+        ExecutionContext<RowWrapper> context = executionContext();
+        RowHandler<RowWrapper> handler = context.rowHandler();
+
+        BitSet requiredFields = ImmutableBitSet.of(1, 3, 4).toBitSet();
+
+        RowFactory<RowWrapper> factory = handler.factory(project(ROW_SCHEMA, 
requiredFields.stream().toArray()));
+
+        // predicate matching goes before projection transformation, thus this 
predicate is valid
+        Predicate<RowWrapper> onlyEven = row -> ((Long) handler.get(0, row)) % 
2 == 0;
+
+        Function<RowWrapper, RowWrapper> doubleFirstColumnProjection = row -> {
+            int size = handler.columnCount(row);
+
+            Object[] values = new Object[size];
+
+            values[0] = (Long) handler.get(0, row) * 2;
+
+            for (int i = 1; i < size; i++) {
+                values[i] = handler.get(i, row);
+            }
+
+            return factory.create(values);
+        };
+
+        List<RowWrapper> rows = initScanAndGetResults(context, onlyEven, 
doubleFirstColumnProjection, requiredFields);
+
+        assertThat(rows, notNullValue());
+
+        List<String> expectedRows = List.of(
+                "Row[44, 2222, 00000000-0000-0000-0000-000000000002]",
+                "Row[88, 4444, 00000000-0000-0000-0000-000000000004]"
+        );
+
+        List<String> actualRows = 
rows.stream().map(handler::toString).collect(Collectors.toList());
+
+        assertThat(actualRows, equalTo(expectedRows));
+    }
+
+    @SuppressWarnings("DataFlowIssue")
+    private static List<RowWrapper> initScanAndGetResults(
+            ExecutionContext<RowWrapper> context,
+            @Nullable Predicate<RowWrapper> predicate,
+            @Nullable Function<RowWrapper, RowWrapper> projection,
+            @Nullable BitSet requiredFields
+    ) {
+        RowHandler<RowWrapper> handler = context.rowHandler();
+        RowFactory<RowWrapper> factory;
+        if (requiredFields != null) {
+            factory = handler.factory(project(ROW_SCHEMA, 
requiredFields.stream().toArray()));
+        } else {
+            factory = handler.factory(ROW_SCHEMA);
+        }
+
+        ScannableDataSource dataSource = new IterableDataSource(
+                
Stream.of(ROWS).map(TUPLE_FACTORY::create).collect(Collectors.toList())
+        );
+        DataSourceScanNode<RowWrapper> node = new DataSourceScanNode<>(
+                context, factory, fromRowSchema(ROW_SCHEMA), dataSource, 
predicate, projection, requiredFields
+        );
+
+        DrainAllDownstream<RowWrapper> downstream = new DrainAllDownstream<>();
+
+        node.onRegister(downstream);
+
+        context.execute(() -> node.request(Integer.MAX_VALUE), node::onError);
+
+        return await(downstream.completion);
+    }
+
+    static class IterableDataSource implements ScannableDataSource {
+        private final Iterable<InternalTuple> iterable;
+
+        IterableDataSource(Iterable<InternalTuple> iterable) {
+            this.iterable = iterable;
+        }
+
+        @Override
+        public Publisher<InternalTuple> scan() {
+            Iterator<InternalTuple> it = iterable.iterator();
+
+            return new Publisher<InternalTuple>() {
+                @Override
+                public void subscribe(Subscriber<? super InternalTuple> 
subscriber) {
+                    Subscription subscription = new Subscription() {
+                        @Override
+                        public void request(long n) {
+                            if (n <= 0) {
+                                subscriber.onError(new 
IllegalArgumentException());
+
+                                return;
+                            }
+
+                            while (n > 0 && it.hasNext()) {
+                                subscriber.onNext(it.next());
+
+                                n--;
+                            }
+
+                            if (n > 0) {
+                                subscriber.onComplete();
+                            }
+                        }
+
+                        @Override
+                        public void cancel() {
+                            // NO-OP
+                        }
+                    };
+
+                    subscriber.onSubscribe(subscription);
+                }
+            };
+        }
+    }
+
+    static class DrainAllDownstream<T> implements Downstream<T> {
+        private final List<T> rows = new ArrayList<>();
+        private final CompletableFuture<List<T>> completion = new 
CompletableFuture<>();
+
+        @Override
+        public void push(T row) {
+            rows.add(row);
+        }
+
+        @Override
+        public void end() throws Exception {
+            completion.complete(rows);
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            completion.completeExceptionally(e);
+        }
+    }
+
+    @Override
+    protected RowHandler<RowWrapper> rowHandler() {
+        return SqlRowHandler.INSTANCE;
+    }
+
+    private static BinaryTupleSchema fromRowSchema(RowSchema schema) {
+        Element[] elements = new Element[schema.fields().size()];
+
+        int idx = 0;
+        for (TypeSpec spec : schema.fields()) {
+            assert spec instanceof BaseTypeSpec : spec;
+
+            elements[idx++] = new Element(((BaseTypeSpec) spec).nativeType(), 
spec.isNullable());
+        }
+
+        return BinaryTupleSchema.create(elements);
+    }
+
+    private static RowSchema project(RowSchema schema, int[] projection) {
+        RowSchema.Builder builder = RowSchema.builder();
+        for (int i : projection) {
+            builder.addField(schema.fields().get(i));
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 2196753738..5a1569f007 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -47,7 +47,9 @@ import 
org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
 import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.ClusterServiceFactory;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
@@ -77,7 +79,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 /**
  * Tests to verify Outbox to Inbox interoperation.
  */
-public class ExchangeExecutionTest extends AbstractExecutionTest {
+public class ExchangeExecutionTest extends AbstractExecutionTest<Object[]> {
     private static final String ROOT_NODE_NAME = "N1";
     private static final String ANOTHER_NODE_NAME = "N2";
     private static final List<String> NODE_NAMES = List.of(ROOT_NODE_NAME, 
ANOTHER_NODE_NAME);
@@ -728,4 +730,9 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest {
             };
         }
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 8e87011342..3127f68d82 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -63,7 +64,7 @@ import org.junit.jupiter.params.provider.MethodSource;
  * ExecutionTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class ExecutionTest extends AbstractExecutionTest {
+public class ExecutionTest extends AbstractExecutionTest<Object[]> {
     @Test
     public void testSimpleExecution() {
         // SELECT P.ID, P.NAME, PR.NAME AS PROJECT
@@ -713,4 +714,9 @@ public class ExecutionTest extends AbstractExecutionTest {
 
         return args.stream();
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
index 04fbd95a34..587f0a7802 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -37,8 +37,10 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates;
 import 
org.apache.ignite.internal.sql.engine.rel.agg.MapReduceAggregates.MapReduceAgg;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -51,7 +53,7 @@ import org.junit.jupiter.api.Test;
  * HashAggregateSingleGroupExecutionTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class HashAggregateSingleGroupExecutionTest extends 
AbstractExecutionTest {
+public class HashAggregateSingleGroupExecutionTest extends 
AbstractExecutionTest<Object[]> {
 
     @Test
     public void mapReduceSum() {
@@ -433,4 +435,9 @@ public class HashAggregateSingleGroupExecutionTest extends 
AbstractExecutionTest
 
         return newHashAggNode(ctx, REDUCE, grpSets, hashRowType, 
reduceAggCall.getReduceCall());
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
index 4f5ecb5834..129a04bcb8 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/HashIndexSpoolExecutionTest.java
@@ -29,6 +29,8 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -38,7 +40,7 @@ import org.junit.jupiter.api.Test;
 /**
  * Hash index based spool implementation test.
  */
-public class HashIndexSpoolExecutionTest extends AbstractExecutionTest {
+public class HashIndexSpoolExecutionTest extends 
AbstractExecutionTest<Object[]> {
     @Test
     public void testIndexSpool() {
         ExecutionContext<Object[]> ctx = executionContext(true);
@@ -203,4 +205,9 @@ public class HashIndexSpoolExecutionTest extends 
AbstractExecutionTest {
             this.expectedResultSize = expectedResultSize;
         }
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index 26ca27b2fc..da4490849a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -40,10 +40,12 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -58,7 +60,7 @@ import org.junit.jupiter.api.Test;
 /**
  * Test {@link IndexScanNode} execution.
  */
-public class IndexScanNodeExecutionTest extends AbstractExecutionTest {
+public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]> {
 
     /**
      * Sorted index scan execution.
@@ -307,4 +309,9 @@ public class IndexScanNodeExecutionTest extends 
AbstractExecutionTest {
             }
         }
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
index 08de8e9400..19f05c4b2a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
@@ -28,12 +28,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.junit.jupiter.api.Test;
 
 /**
  * Test LimitNode execution.
  */
-public class LimitExecutionTest extends AbstractExecutionTest {
+public class LimitExecutionTest extends AbstractExecutionTest<Object[]> {
     /** Tests correct results fetched with Limit node. */
     @Test
     public void testLimit() {
@@ -214,4 +216,9 @@ public class LimitExecutionTest extends 
AbstractExecutionTest {
             }, this::onError);
         }
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index 9dd048749d..10b1bed4e7 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -42,6 +42,7 @@ import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactoryImpl;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
@@ -55,7 +56,7 @@ import org.junit.jupiter.params.provider.ValueSource;
  * MergeJoinExecutionTest;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class MergeJoinExecutionTest extends AbstractExecutionTest {
+public class MergeJoinExecutionTest extends AbstractExecutionTest<Object[]> {
     @ParameterizedTest(name = "treat nulls as equals: {0}")
     @ValueSource(booleans = {true, false})
     public void joinEmptyTables(boolean equalNulls) {
@@ -538,4 +539,9 @@ public class MergeJoinExecutionTest extends 
AbstractExecutionTest {
     private static <T> Set<T> setOf(T... items) {
         return new HashSet<>(Arrays.asList(items));
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
index d81a03331a..253509cd50 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinExecutionTest.java
@@ -36,6 +36,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.junit.jupiter.api.Test;
@@ -44,7 +45,7 @@ import org.junit.jupiter.api.Test;
  * NestedLoopJoinExecutionTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class NestedLoopJoinExecutionTest extends AbstractExecutionTest {
+public class NestedLoopJoinExecutionTest extends 
AbstractExecutionTest<Object[]> {
     @Test
     public void joinEmptyTables() {
         verifyJoin(EMPTY, EMPTY, INNER, EMPTY);
@@ -364,4 +365,9 @@ public class NestedLoopJoinExecutionTest extends 
AbstractExecutionTest {
     private static <T> Set<T> setOf(T... items) {
         return new HashSet<>(Arrays.asList(items));
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java
index c51b1a21c3..b3afc53ad1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SortedIndexSpoolExecutionTest.java
@@ -32,6 +32,8 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -41,7 +43,7 @@ import org.junit.jupiter.api.Test;
  * TreeIndexSpoolExecutionTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class SortedIndexSpoolExecutionTest extends AbstractExecutionTest {
+public class SortedIndexSpoolExecutionTest extends 
AbstractExecutionTest<Object[]> {
     @Test
     public void testIndexSpool() {
         ExecutionContext<Object[]> ctx = executionContext();
@@ -241,4 +243,8 @@ public class SortedIndexSpoolExecutionTest extends 
AbstractExecutionTest {
         }
     }
 
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 391a0aa2bb..d4a25d3eea 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -45,10 +45,12 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl;
 import org.apache.ignite.internal.sql.engine.exec.TableRowConverter;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import 
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestTableDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
@@ -71,7 +73,7 @@ import org.junit.jupiter.api.Test;
 /**
  * Tests execution flow of TableScanNode.
  */
-public class TableScanNodeExecutionTest extends AbstractExecutionTest {
+public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]> {
 
     private final LinkedList<AutoCloseable> closeables = new LinkedList<>();
 
@@ -230,4 +232,9 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest {
             };
         }
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
index 40bff438ae..da9f5029e5 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolExecutionTest.java
@@ -29,6 +29,8 @@ import java.util.stream.IntStream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -38,7 +40,7 @@ import org.junit.jupiter.api.Test;
  * TableSpoolExecutionTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-public class TableSpoolExecutionTest extends AbstractExecutionTest {
+public class TableSpoolExecutionTest extends AbstractExecutionTest<Object[]> {
     @Test
     public void testLazyTableSpool() {
         checkTableSpool(
@@ -147,4 +149,9 @@ public class TableSpoolExecutionTest extends 
AbstractExecutionTest {
             }
         }
     }
+
+    @Override
+    protected RowHandler<Object[]> rowHandler() {
+        return ArrayRowHandler.INSTANCE;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index ad16fc39fe..3cb61a0dba 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -21,8 +21,6 @@ import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_N
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -52,6 +50,7 @@ import 
org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
@@ -363,20 +362,11 @@ public class TestBuilders {
                     .map(ClusterTableBuilderImpl::build)
                     .collect(Collectors.toMap(TestTable::name, 
Function.identity()));
 
-            Int2ObjectMap<IgniteTable> tablesById = new 
Int2ObjectOpenHashMap<>(tableByName.size());
-            tableByName.forEach((name, table) -> tablesById.put(table.id(), 
table));
-
             IgniteSchema schema = new IgniteSchema(DEFAULT_SCHEMA_NAME, 
SCHEMA_VERSION, tableByName.values());
             var schemaManager = new PredefinedSchemaManager(schema);
             var targetProvider = new ExecutionTargetProvider() {
                 @Override
-                public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, int tableId) {
-                    IgniteTable table = tablesById.get(tableId);
-
-                    if (table == null) {
-                        throw new AssertionError("Table not found: " + 
tableId);
-                    }
-
+                public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
                     Map<String, DataProvider<?>> dataProviders = 
dataProvidersByTableName.get(table.name());
 
                     if (nullOrEmpty(dataProviders)) {
@@ -385,6 +375,11 @@ public class TestBuilders {
 
                     return 
CompletableFuture.completedFuture(factory.allOf(List.copyOf(dataProviders.keySet())));
                 }
+
+                @Override
+                public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+                    return CompletableFuture.failedFuture(new 
AssertionError("Not supported"));
+                }
             };
 
             List<LogicalNode> logicalNodes = nodeNames.stream()
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 4e7902e2f3..2621a0e35d 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -117,7 +117,7 @@ public class TestNode implements LifecycleAware {
                 mailboxRegistry, messageService
         ));
         NoOpExecutableTableRegistry executableTableRegistry = new 
NoOpExecutableTableRegistry();
-        ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry);
+        ExecutionDependencyResolver dependencyResolver = new 
ExecutionDependencyResolverImpl(executableTableRegistry, null);
 
         executionService = registerService(new ExecutionServiceImpl<>(
                 messageService,
diff --git 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java
 
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java
index 1c2ef6814d..5ebdfa4121 100644
--- 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java
+++ 
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManager.java
@@ -17,7 +17,11 @@
 
 package org.apache.ignite.internal.systemview;
 
+import java.util.List;
+import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.schema.row.InternalTuple;
 
 /**
  * The system view manager is responsible for registering system views in the 
cluster.
@@ -32,4 +36,21 @@ public interface SystemViewManager extends IgniteComponent {
      * @param view System view to register.
      */
     void register(SystemView<?> view);
+
+    /**
+     * Returns a list of nodes a view with given name can be found on.
+     *
+     * @param name Name of view of interest.
+     * @return A list of nodes owning a view, or empty list if there is no 
node in the cluster owning this view.
+     */
+    List<String> owningNodes(String name);
+
+    /**
+     * Opens a cursor over view with a given name and returns publisher 
emitting rows of the view.
+     *
+     * @param name Name of view of interest.
+     * @return A publisher.
+     * @throws IgniteInternalException if view with given name is not 
presented on local node.
+     */
+    Publisher<InternalTuple> scanView(String name) throws 
IgniteInternalException;
 }
diff --git 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
 
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
index 9f7ed77b38..c3d6e55aa9 100644
--- 
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
+++ 
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
@@ -20,26 +20,35 @@ package org.apache.ignite.internal.systemview;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.cluster.management.NodeAttributesProvider;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.row.InternalTuple;
 import org.apache.ignite.internal.systemview.utils.SystemViewUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ErrorGroups.Common;
 
 /**
  * SQL system views manager implementation.
  */
-public class SystemViewManagerImpl implements SystemViewManager, 
NodeAttributesProvider {
+public class SystemViewManagerImpl implements SystemViewManager, 
NodeAttributesProvider, LogicalTopologyEventListener {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(SystemViewManagerImpl.class);
 
@@ -47,6 +56,8 @@ public class SystemViewManagerImpl implements 
SystemViewManager, NodeAttributesP
 
     public static final String NODE_ATTRIBUTES_LIST_SEPARATOR = ",";
 
+    private final String localNodeName;
+
     private final CatalogManager catalogManager;
 
     private final Map<String, String> nodeAttributes = new HashMap<>();
@@ -66,8 +77,11 @@ public class SystemViewManagerImpl implements 
SystemViewManager, NodeAttributesP
     /** Future which is completed when system views are registered in the 
catalog. */
     private final CompletableFuture<Void> viewsRegistrationFuture = new 
CompletableFuture<>();
 
+    private volatile Map<String, List<String>> owningNodesByViewName = 
Map.of();
+
     /** Creates a system view manager. */
-    public SystemViewManagerImpl(CatalogManager catalogManager) {
+    public SystemViewManagerImpl(String localNodeName, CatalogManager 
catalogManager) {
+        this.localNodeName = localNodeName;
         this.catalogManager = catalogManager;
     }
 
@@ -113,6 +127,22 @@ public class SystemViewManagerImpl implements 
SystemViewManager, NodeAttributesP
         busyLock.block();
     }
 
+    @Override
+    public List<String> owningNodes(String name) {
+        return inBusyLock(busyLock, () -> 
owningNodesByViewName.getOrDefault(name, List.of()));
+    }
+
+    @Override
+    public Publisher<InternalTuple> scanView(String name) {
+        if (views.get(name) == null) {
+            throw new IgniteInternalException(
+                    Common.INTERNAL_ERR,
+                    format("View with name '{}' not found on node '{}'", name, 
localNodeName)
+            );
+        }
+
+        throw new 
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-20578";);
+    }
 
     /** {@inheritDoc} */
     @Override
@@ -136,10 +166,53 @@ public class SystemViewManagerImpl implements 
SystemViewManager, NodeAttributesP
         return nodeAttributes;
     }
 
+    @Override
+    public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot 
newTopology) {
+        processNewTopology(newTopology);
+    }
+
+    @Override
+    public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
+        processNewTopology(newTopology);
+    }
+
+    @Override
+    public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+        processNewTopology(newTopology);
+    }
+
     /**
      * Returns future which is completed when system views are registered in 
the catalog.
      */
     public CompletableFuture<Void> completeRegistration() {
         return viewsRegistrationFuture;
     }
+
+    private void processNewTopology(LogicalTopologySnapshot topology) {
+        Map<String, List<String>> owningNodesByViewName = new HashMap<>();
+
+        for (LogicalNode logicalNode : topology.nodes()) {
+            String systemViewsNames = 
logicalNode.systemAttributes().get(NODE_ATTRIBUTES_KEY);
+
+            if (systemViewsNames == null) {
+                continue;
+            }
+
+            
Arrays.stream(systemViewsNames.split(NODE_ATTRIBUTES_LIST_SEPARATOR))
+                    .map(String::trim)
+                    .forEach(viewName ->
+                            owningNodesByViewName.computeIfAbsent(viewName, 
key -> new ArrayList<>()).add(logicalNode.name())
+                    );
+        }
+
+        for (String viewName : owningNodesByViewName.keySet()) {
+            owningNodesByViewName.compute(viewName, (key, value) -> {
+                assert value != null;
+
+                return List.copyOf(value);
+            });
+        }
+
+        this.owningNodesByViewName = Map.copyOf(owningNodesByViewName);
+    }
 }
diff --git 
a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
 
b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
index b426faa14b..3c0c8f381f 100644
--- 
a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
+++ 
b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
@@ -24,8 +24,11 @@ import static 
org.apache.ignite.internal.systemview.SystemViewManagerImpl.NODE_A
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.util.CollectionUtils.first;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,11 +39,16 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.schema.NativeType;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
@@ -49,6 +57,9 @@ import org.apache.ignite.internal.schema.SchemaTestUtils;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -192,6 +203,34 @@ public class SystemViewManagerTest extends 
BaseIgniteAbstractTest {
         viewMgr.stop();
     }
 
+    @Test
+    @SuppressWarnings("DataFlowIssue")
+    void managerDerivesViewPlacementFromLogicalTopologyEvents() {
+        String viewName = "MY_VIEW";
+
+        assertThat(viewMgr.owningNodes(viewName), empty());
+
+        List<String> allNodes = List.of("A", "B", "C");
+
+        LogicalTopologySnapshot topologySnapshot = topologySnapshot(viewName, 
allNodes, 0, 2);
+        viewMgr.onNodeJoined(first(topologySnapshot.nodes()), 
topologySnapshot);
+
+        assertThat(viewMgr.owningNodes(viewName), hasItem("A"));
+        assertThat(viewMgr.owningNodes(viewName), hasItem("C"));
+
+        topologySnapshot = topologySnapshot(viewName, allNodes, 0, 1);
+        viewMgr.onNodeLeft(first(topologySnapshot.nodes()), topologySnapshot);
+
+        assertThat(viewMgr.owningNodes(viewName), hasItem("A"));
+        assertThat(viewMgr.owningNodes(viewName), hasItem("B"));
+
+        topologySnapshot = topologySnapshot(viewName, allNodes, 1, 2);
+        viewMgr.onTopologyLeap(topologySnapshot);
+
+        assertThat(viewMgr.owningNodes(viewName), hasItem("B"));
+        assertThat(viewMgr.owningNodes(viewName), hasItem("C"));
+    }
+
     private static SystemView<?> dummyView(String name) {
         return dummyView(name, NativeTypes.INT32);
     }
@@ -214,4 +253,31 @@ public class SystemViewManagerTest extends 
BaseIgniteAbstractTest {
                 })
                 .build();
     }
+
+    private static LogicalTopologySnapshot topologySnapshot(String viewName, 
List<String> allNodes, int... owningNodes) {
+        BitSet owningNodesSet = new BitSet();
+
+        for (int idx : owningNodes) {
+            owningNodesSet.set(idx);
+        }
+
+        List<LogicalNode> topology = new ArrayList<>(allNodes.size());
+
+        for (int i = 0; i < allNodes.size(); i++) {
+            String name = allNodes.get(i);
+
+            ClusterNode clusterNode = new ClusterNodeImpl(name, name, new 
NetworkAddress("127.0.0.1", 1010 + i));
+
+            Map<String, String> systemAttributes;
+            if (owningNodesSet.get(i)) {
+                systemAttributes = Map.of(NODE_ATTRIBUTES_KEY, viewName);
+            } else {
+                systemAttributes = Map.of();
+            }
+
+            topology.add(new LogicalNode(clusterNode, Map.of(), 
systemAttributes));
+        }
+
+        return new LogicalTopologySnapshot(1, topology);
+    }
 }

Reply via email to