This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 29292de35a [IGNITE-20504] Sql. Improve test coverage of MappingService 
(#2768)
29292de35a is described below

commit 29292de35a29a435413faa7964caf782f1011d7c
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed Nov 8 07:43:26 2023 +0200

    [IGNITE-20504] Sql. Improve test coverage of MappingService (#2768)
---
 .../internal/sql/engine/prepare/MultiStepPlan.java |   3 +-
 .../ignite/internal/sql/engine/prepare/PlanId.java |   3 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   2 +-
 .../engine/exec/mapping/FragmentMappingTest.java   | 361 ++++++++++++++++
 .../sql/engine/exec/mapping/FragmentPrinter.java   | 375 +++++++++++++++++
 .../exec/mapping/MappingServiceImplTest.java       |  17 -
 .../sql/engine/exec/mapping/MappingTestRunner.java | 454 +++++++++++++++++++++
 .../exec/mapping/MappingTestRunnerSelfTest.java    | 157 +++++++
 .../sql/engine/framework/TestBuilders.java         |  66 ++-
 .../internal/sql/engine/framework/TestTable.java   |   8 +-
 .../mapping/_runner_overwrite_results.test         |  30 ++
 .../src/test/resources/mapping/_runner_self.test   |  31 ++
 .../resources/mapping/_runner_strip_results.test   |  21 +
 .../src/test/resources/mapping/correlated.test     | 183 +++++++++
 .../sql-engine/src/test/resources/mapping/dml.test |  91 +++++
 .../src/test/resources/mapping/merge_join.test     |  77 ++++
 .../src/test/resources/mapping/set_ops.test        |  48 +++
 .../src/test/resources/mapping/table_affinity.test |  53 +++
 .../test/resources/mapping/table_functions.test    |   8 +
 .../src/test/resources/mapping/table_identity.test | 111 +++++
 .../resources/mapping/table_identity_single.test   |  99 +++++
 .../src/test/resources/mapping/table_single.test   |  89 ++++
 .../src/test/resources/mapping/union.test          | 105 +++++
 .../src/test/resources/mapping/values.test         |  17 +
 24 files changed, 2379 insertions(+), 30 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
index 558755e1bb..e1978e34e2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
@@ -35,7 +35,8 @@ public class MultiStepPlan implements QueryPlan {
 
     private final IgniteRel root;
 
-    MultiStepPlan(PlanId id, SqlQueryType type, IgniteRel root, 
ResultSetMetadata meta) {
+    /** Constructor. */
+    public MultiStepPlan(PlanId id, SqlQueryType type, IgniteRel root, 
ResultSetMetadata meta) {
         this.id = id;
         this.type = type;
         this.root = root;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
index 112500b461..315ed3115e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
@@ -32,7 +32,8 @@ public class PlanId implements Serializable {
     private final UUID prepareServiceId;
     private final long planNumber;
 
-    PlanId(UUID prepareServiceId, long planNumber) {
+    /** Constructor. */
+    public PlanId(UUID prepareServiceId, long planNumber) {
         this.prepareServiceId = prepareServiceId;
         this.planNumber = planNumber;
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index c8a84e9d13..a9c0f23c87 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -1029,7 +1029,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             );
         }
 
-        return new TestTable(new TableDescriptorImpl(columns, distr), name, 
size, List.of());
+        return new TestTable(1, new TableDescriptorImpl(columns, distr), name, 
size, List.of());
     }
 
     private static class CapturingMailboxRegistry implements MailboxRegistry {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
new file mode 100644
index 0000000000..40a0c1a0c6
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.MappingTestRunner.TestSetup;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.ExecutionTargetProviderBuilder;
+import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
+import org.apache.ignite.internal.sql.engine.framework.TestTable;
+import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
+import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
+import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import 
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.AffinityDistribution;
+import 
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.IdentityDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for fragment mapping.
+ *
+ * <p>Test setup:
+ * <pre>
+ *     // add nodes named n1, n2, and n3 to the cluster.
+ *     addNodes("N1", "N2", "N3");
+ *
+ *     // The first statement adds a table with data located at node N1.
+ *     // The second one adds a table with data located at nodes N2, N3.
+ *     // The catalog is going to contain tables "T1_N1" and "T2_N2N3".
+ *     // Both tables have distribution affinity([0], tableId, zoneId)
+ *     addTable("T1", "N1");
+ *     addTable("T2", "N2", "N3");
+ *
+ *     // Adds table with identity(0) distribution. Can be used to mimic node 
system views.
+ *     addTableIdent("NT1", "N1");
+ *
+ *     // Adds table with single distribution. Can be used to mimic 
cluster-wide system views.
+ *     addTableSingle("CT2", "N1", "N2", "N3");
+ *
+ *     // Sets row count statistics to 100 for all tables named like T1_*
+ *     setRowCount("T1", 100);
+ *
+ *     var runner = new MappingTestRunner("tests_dir");
+ *     // run test cases
+ *     runner.run(this::initSchema, "file.test");
+ * </pre>
+ *
+ * <p>See {@link MappingTestRunner} for test file format description.
+ *
+ * @see MappingTestRunner
+ */
+public class FragmentMappingTest extends AbstractPlannerTest {
+
+    private static final Path LOCATION = 
Paths.get("src/test/resources/mapping");
+
+    private final TreeSet<String> nodeNames = new TreeSet<>();
+
+    private final TreeMap<String, Pair<IgniteDistribution, List<String>>> 
tables = new TreeMap<>();
+
+    private final Map<String, Integer> tableRows = new HashMap<>();
+
+    private final MappingTestRunner testRunner = new 
MappingTestRunner(LOCATION, this::parseQuery);
+
+    @Test
+    public void testValues() {
+        addNodes("N1", "N2");
+
+        testRunner.runTest(this::initSchema, "values.test");
+    }
+
+    @Test
+    public void testTable() {
+        addNodes("N1", "N2");
+
+        addTable("T1", "N1");
+
+        testRunner.runTest(this::initSchema, "table_affinity.test");
+    }
+
+    @Test
+    public void testDml() {
+        addNodes("N0", "N1", "N2", "N3");
+
+        addTable("T1", "N1");
+        addTable("T2", "N2", "N3");
+
+        testRunner.runTest(this::initSchema, "dml.test");
+    }
+
+    @Test
+    public void testUnion() {
+        addNodes("N1", "N2", "N3");
+
+        addTable("T1", "N1");
+        addTable("T1", "N1", "N2");
+
+        addTable("T2", "N1");
+        addTable("T2", "N1", "N2");
+
+        testRunner.runTest(this::initSchema, "union.test");
+    }
+
+    @Test
+    public void testSetOps() {
+        addNodes("N1", "N2", "N3");
+
+        addTable("T1", "N1");
+        addTable("T2", "N2");
+
+        testRunner.runTest(this::initSchema, "set_ops.test");
+    }
+
+    @Test
+    public void testMergeJoin() {
+        addNodes("N0", "N1", "N2", "N3", "N4");
+
+        addTable("T1", "N1");
+        addTable("T2", "N1");
+        addTable("T2", "N2");
+
+        setRowCount("T1", 200);
+        setRowCount("T2", 100);
+
+        testRunner.runTest(this::initSchema, "merge_join.test");
+    }
+
+    @Test
+    public void testTableIdentity() {
+        addNodes("N0", "N1", "N2");
+
+        addTableIdent("NT1", "N1");
+        addTableIdent("NT2", "N1");
+        addTableIdent("NT2", "N2");
+
+        testRunner.runTest(this::initSchema, "table_identity.test");
+    }
+
+    @Test
+    public void testTableSingleDistribution() {
+        addNodes("N0", "N1", "N2");
+
+        addTableSingle("CT1", "N1");
+        addTableSingle("CT2", "N1");
+        addTableSingle("CT2", "N2");
+
+        testRunner.runTest(this::initSchema, "table_single.test");
+    }
+
+    @Test
+    public void testTableSingleIdentityDistribution() {
+        addNodes("N0", "N1", "N2");
+
+        addTableSingle("CT", "N1");
+        addTableSingle("CT", "N2");
+
+        addTableIdent("NT", "N1");
+        addTableIdent("NT", "N2");
+
+        testRunner.runTest(this::initSchema, "table_identity_single.test");
+    }
+
+    @Test
+    public void testCorrelated() {
+        addNodes("N0", "N1", "N2");
+
+        addTable("T", "N1");
+        addTable("T", "N2");
+
+        addTableSingle("CT", "N1");
+        addTableSingle("CT", "N2");
+
+        addTableIdent("NT", "N1");
+        addTableIdent("NT", "N2");
+
+        testRunner.runTest(this::initSchema, "correlated.test");
+    }
+
+    @Test
+    public void testTableFunctions() {
+        addNodes("N0", "N1", "N2");
+
+        addTable("T", "N1");
+
+        addTableSingle("CT", "N1");
+        addTableSingle("CT", "N2");
+
+        addTableIdent("NT", "N1");
+        addTableIdent("NT", "N2");
+
+        testRunner.runTest(this::initSchema, "table_functions.test");
+    }
+
+    private void addNodes(String node, String... otherNodes) {
+        this.nodeNames.add(node);
+        this.nodeNames.addAll(Arrays.asList(otherNodes));
+    }
+
+    private void addTable(String name, String node, String... otherNodes) {
+        TreeSet<String> nodeNames = new TreeSet<>();
+        nodeNames.add(node);
+        nodeNames.addAll(Arrays.asList(otherNodes));
+
+        String tableName = formatName(name, nodeNames);
+        tables.put(tableName, new Pair<>(IgniteDistributions.affinity(-1, -1, 
-1), new ArrayList<>(nodeNames)));
+    }
+
+    private void addTableSingle(String name, String... nodes) {
+        TreeSet<String> nodeNames = new TreeSet<>(Arrays.asList(nodes));
+        String tableName = formatName(name, nodeNames);
+
+        tables.put(tableName, new Pair<>(IgniteDistributions.single(), new 
ArrayList<>(nodeNames)));
+    }
+
+    private void addTableIdent(String name, String node, String... otherNodes) 
{
+        TreeSet<String> nodeNames = new TreeSet<>();
+        nodeNames.add(node);
+        nodeNames.addAll(Arrays.asList(otherNodes));
+
+        String tableName = formatName(name, nodeNames);
+        tables.put(tableName, new Pair<>(IgniteDistributions.identity(0), new 
ArrayList<>(nodeNames)));
+    }
+
+    private void setRowCount(String tableName, int rowCount) {
+        this.tableRows.put(tableName, rowCount);
+    }
+
+    private IgniteRel parseQuery(IgniteSchema schema, String sqlStmt) {
+        try {
+            PlanningContext ctx = PlanningContext.builder()
+                    .parentContext(baseQueryContext(List.of(schema), null))
+                    .query(sqlStmt)
+                    .build();
+
+            try (IgnitePlanner planner = ctx.planner()) {
+                assertNotNull(planner);
+
+                return physicalPlan(planner, ctx.query());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to parse a statement: " + 
sqlStmt, e);
+        }
+    }
+
+    private TestSetup initSchema() {
+        ExecutionTargetProviderBuilder executionTargetProviderBuilder = 
TestBuilders.executionTargetProviderBuilder();
+        List<IgniteDataSource> dataSources = new ArrayList<>();
+        int objectId = 1;
+
+        Map<String, List<String>> table2NodeNames = new HashMap<>();
+
+        for (Map.Entry<String, Pair<IgniteDistribution, List<String>>> e : 
tables.entrySet()) {
+            String tableName = e.getKey();
+            String tableShortName = tableName.substring(0, 
tableName.indexOf('_'));
+            // Generate distinct row counts for each table to ensure that the 
optimizer produces the same results.
+            int tableSize = tableRows.getOrDefault(tableShortName, 100 + 
objectId);
+
+            for (String tableNodeName : e.getValue().getSecond()) {
+                if (!nodeNames.contains(tableNodeName)) {
+                    String message = format(
+                            "Expected node {} for table {}. Registered nodes: 
{}",
+                            tableNodeName, tableShortName, nodeNames
+                    );
+                    throw new IllegalArgumentException(message);
+                }
+            }
+
+            IgniteDistribution distribution = e.getValue().getFirst();
+
+            TableBuilder tableBuilder = TestBuilders.table().name(tableName);
+
+            // To mimic node system views, id column is node name alias.
+            if (distribution.function() instanceof IdentityDistribution) {
+                tableBuilder = tableBuilder.addColumn("ID", 
NativeTypes.stringOf(64));
+            } else {
+                // To mimic cluster wide system views
+                tableBuilder = tableBuilder.addColumn("ID", NativeTypes.INT32);
+            }
+
+            IgniteDistribution distributionToUse;
+            if (distribution.function() instanceof AffinityDistribution) {
+                distributionToUse = IgniteDistributions.affinity(0, objectId, 
1);
+            } else {
+                distributionToUse = distribution;
+            }
+
+            TestTable testTable = tableBuilder
+                    .addColumn("C1", NativeTypes.INT32)
+                    .addColumn("C2", NativeTypes.INT32)
+                    .size(tableSize)
+                    .tableId(objectId)
+                    .distribution(distributionToUse)
+                    .build();
+
+            dataSources.add(testTable);
+
+            table2NodeNames.put(tableName, e.getValue().getSecond());
+
+            objectId += 1;
+        }
+
+        executionTargetProviderBuilder.addTables(table2NodeNames);
+
+        IgniteSchema schema = new 
IgniteSchema(CatalogManager.DEFAULT_SCHEMA_NAME, 1, dataSources);
+        ExecutionTargetProvider executionTargetProvider = 
executionTargetProviderBuilder.build();
+        LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
+
+        return new TestSetup(executionTargetProvider, schema, 
logicalTopologySnapshot);
+    }
+
+    private static String formatName(String name, TreeSet<String> nodeNames) {
+        return name + "_" + String.join("", nodeNames);
+    }
+
+    private LogicalTopologySnapshot newLogicalTopology() {
+        List<LogicalNode> logicalNodes = nodeNames.stream()
+                .map(name -> {
+                    NetworkAddress addr = 
NetworkAddress.from("127.0.0.1:10000");
+                    return new LogicalNode(name, name, addr);
+                })
+                .collect(Collectors.toList());
+
+        return new LogicalTopologySnapshot(1, logicalNodes);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
new file mode 100644
index 0000000000..3894b176c8
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.prepare.Fragment;
+import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableFunctionScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import 
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.AffinityDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+
+/**
+ * Converts {@link MappedFragment} to text representation.
+ */
+final class FragmentPrinter extends IgniteRelShuttle {
+
+    static String FRAGMENT_PREFIX = "Fragment#";
+
+    private final Output output;
+
+    private FragmentPrinter(Output output) {
+        this.output = output;
+    }
+
+    static String fragmentsToString(List<MappedFragment> mappedFragments) {
+        TableDescriptorCollector collector = new TableDescriptorCollector();
+
+        for (MappedFragment mappedFragment : mappedFragments) {
+            Fragment fragment = mappedFragment.fragment();
+            collector.collect(fragment);
+        }
+
+        Output output = new Output(val -> {
+            if (val instanceof IgniteDistribution) {
+                IgniteDistribution distribution = (IgniteDistribution) val;
+                return formatDistribution(distribution, collector);
+            } else {
+                return String.valueOf(val);
+            }
+        });
+
+        for (MappedFragment mappedFragment : mappedFragments) {
+            FragmentPrinter printer = new FragmentPrinter(output);
+            printer.print(mappedFragment);
+
+            output.writeNewline();
+        }
+
+        return output.builder.toString();
+    }
+
+    void print(MappedFragment mappedFragment) {
+        Fragment fragment = mappedFragment.fragment();
+
+        output.setNewLinePadding(0);
+        output.writeFormattedString(FRAGMENT_PREFIX + "{}", 
fragment.fragmentId());
+
+        if (fragment.rootFragment()) {
+            output.writeString(" root");
+        }
+
+        if (fragment.correlated()) {
+            output.writeString(" correlated");
+        }
+
+        output.appendPadding();
+        output.setNewLinePadding(2);
+        output.writeNewline();
+
+        ColocationGroup target = mappedFragment.target();
+        if (target != null) {
+            output.appendPadding();
+            List<String> sortedNodeNames = target.nodeNames()
+                    .stream()
+                    .sorted(Comparator.naturalOrder())
+                    .collect(Collectors.toList());
+
+            output.writeKeyValue("targetNodes", sortedNodeNames.toString());
+            output.writeNewline();
+        }
+
+        output.setNewLinePadding(2);
+
+        output.appendPadding();
+        output.writeKeyValue("executionNodes", 
mappedFragment.nodes().toString());
+        output.writeNewline();
+
+        List<IgniteReceiver> remotes = mappedFragment.fragment().remotes();
+        if (!remotes.isEmpty()) {
+            List<Long> remotesVals = remotes.stream()
+                    .map(IgniteReceiver::sourceFragmentId)
+                    .collect(Collectors.toList());
+
+            output.appendPadding();
+            output.writeKeyValues("remoteFragments", remotesVals);
+            output.writeNewline();
+        }
+
+        if (!fragment.tables().isEmpty()) {
+            List<String> tables = fragment.tables().stream()
+                    .map(IgniteDataSource::name)
+                    .sorted(Comparator.naturalOrder())
+                    .collect(Collectors.toList());
+
+            output.appendPadding();
+            output.writeKeyValue("tables", tables.toString());
+            output.writeNewline();
+        }
+
+        if (!fragment.systemViews().isEmpty()) {
+            List<String> tables = fragment.systemViews().stream()
+                    .map(IgniteDataSource::name)
+                    .sorted(Comparator.naturalOrder())
+                    .collect(Collectors.toList());
+
+            output.appendPadding();
+            output.writeKeyValue("systemViews", tables.toString());
+            output.writeNewline();
+        }
+
+        output.appendPadding();
+        output.writeString("tree:");
+        output.writeNewline();
+
+        printRel(fragment.root());
+    }
+
+    private void printRel(IgniteRel rel) {
+        String prevPaddingStr = output.paddingString;
+        int prevPadding = output.newLinePadding;
+
+        output.setPaddingStr("  ");
+        output.setNewLinePadding(2);
+        try {
+            visit(rel);
+        } finally {
+            output.setNewLinePadding(prevPadding);
+        }
+
+        output.setPaddingStr(prevPaddingStr);
+        output.setNewLinePadding(prevPadding);
+    }
+
+    @Override
+    protected IgniteRel processNode(IgniteRel rel) {
+        int prevPadding = output.newLinePadding;
+        output.setNewLinePadding(output.newLinePadding + 1);
+        output.writeNewline();
+        try {
+            return super.processNode(rel);
+        } finally {
+            output.setNewLinePadding(prevPadding);
+        }
+    }
+
+    @Override
+    public IgniteRel visit(IgniteRel rel) {
+        output.appendPadding();
+        output.writeString(rel.getClass().getSimpleName());
+        return super.visit(rel);
+    }
+
+    @Override
+    public IgniteRel visit(IgniteSender rel) {
+        long targetId = rel.targetFragmentId();
+        long exchangeId = rel.exchangeId();
+
+        output.writeFormattedString("(targetFragment={}, exchange={}, 
distribution={})", targetId, exchangeId, rel.distribution());
+        return super.visit(rel);
+    }
+
+    @Override
+    public IgniteRel visit(IgniteReceiver rel) {
+        long sourceFragmentId = rel.sourceFragmentId();
+        long exchangeId = rel.exchangeId();
+
+        output.writeFormattedString("(sourceFragment={}, exchange={}, 
distribution={})", sourceFragmentId, exchangeId, rel.distribution());
+        return super.visit(rel);
+    }
+
+    @Override
+    public IgniteRel visit(IgniteIndexScan rel) {
+        long sourceId = rel.sourceId();
+        String tableName = String.join(".", rel.getTable().getQualifiedName());
+
+        output.writeFormattedString("(name={}, source={}, distribution={})", 
tableName, sourceId, rel.distribution());
+        return super.visit(rel);
+    }
+
+    @Override
+    public IgniteRel visit(IgniteTableScan rel) {
+        long sourceId = rel.sourceId();
+        String tableName = String.join(".", rel.getTable().getQualifiedName());
+
+        output.writeFormattedString("(name={}, source={}, distribution={})", 
tableName, sourceId, rel.distribution());
+        return super.visit(rel);
+    }
+
+    @Override
+    public IgniteRel visit(IgniteTableModify rel) {
+        String tableName = String.join(".", rel.getTable().getQualifiedName());
+
+        // TODO  https://issues.apache.org/jira/browse/IGNITE-20495 there is 
no sourceId on TableModifyNode
+        output.writeFormattedString("(name={}, source={}, distribution={})", 
tableName, "-1", rel.distribution());
+        return super.visit(rel);
+    }
+
+    @Override
+    public IgniteRel visit(IgniteTableFunctionScan rel) {
+        output.writeFormattedString("(source={}, distribution={})", 
rel.sourceId(), rel.distribution());
+        return super.visit(rel);
+    }
+
+    private static String formatDistribution(IgniteDistribution distribution, 
TableDescriptorCollector collector) {
+        if (distribution.function() instanceof AffinityDistribution) {
+            AffinityDistribution f = (AffinityDistribution) 
distribution.function();
+            IgniteTable igniteTable = collector.tables.get(f.tableId());
+
+            if (igniteTable == null) {
+                String error = format("Unknown tableId: {}. Existing: {}", 
collector.tables.keySet());
+                throw new IllegalStateException(error);
+            }
+
+            TableDescriptor tableDescriptor = igniteTable.descriptor();
+            String colocationColumns = 
igniteTable.distribution().getKeys().stream()
+                    .map(k -> tableDescriptor.columnDescriptor(k).name())
+                    .collect(Collectors.joining(",", "[", "]"));
+
+            return format("affinity[table: {}, columns: {}]", 
igniteTable.name(), colocationColumns);
+        } else {
+            return distribution.toString();
+        }
+    }
+
+    /** Collects table descriptors to use them table names, column names in 
text output. */
+    private static class TableDescriptorCollector extends IgniteRelShuttle {
+
+        private final Map<Integer, IgniteTable> tables = new HashMap<>();
+
+        void collect(Fragment fragment) {
+            fragment.root().accept(this);
+        }
+
+        @Override
+        public IgniteRel visit(IgniteIndexScan rel) {
+            IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
+            tables.put(igniteTable.id(), igniteTable);
+            return super.visit(rel);
+        }
+
+        @Override
+        public IgniteRel visit(IgniteTableScan rel) {
+            IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
+            tables.put(igniteTable.id(), igniteTable);
+            return super.visit(rel);
+        }
+
+        @Override
+        public IgniteRel visit(IgniteTableModify rel) {
+            IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
+            tables.put(igniteTable.id(), igniteTable);
+            return super.visit(rel);
+        }
+    }
+
+    private static class Output {
+
+        private final StringBuilder builder = new StringBuilder();
+
+        private int newLinePadding;
+
+        private String paddingString = " ";
+
+        private boolean blankLine;
+
+        private final Function<Object, String> formatter;
+
+        private Output(Function<Object, String> formatter) {
+            this.formatter = formatter;
+        }
+
+        /** Writes a list of objectIds: {@code name: [id1, id2, ..., idN]}. */
+        void writeKeyValues(String name, List<?> values) {
+            builder.append(name);
+            builder.append(": [");
+
+            for (Object value : values) {
+                builder.append(formatter.apply(value));
+                builder.append(", ");
+            }
+            builder.setLength(builder.length() - 2);
+
+            builder.append("]");
+        }
+
+        /** Writes string property: {@code name: value}. */
+        void writeKeyValue(String name, String value) {
+            builder.append(name);
+            builder.append(": ");
+            builder.append(value);
+        }
+
+        /** Writes the given. */
+        void writeString(String value) {
+            builder.append(value);
+        }
+
+        /** Writes a formatted string. */
+        void writeFormattedString(String value, Object... params) {
+            List<String> formattedParams = new ArrayList<>();
+            for (Object param : params) {
+                formattedParams.add(formatter.apply(param));
+            }
+
+            builder.append(format(value, formattedParams.toArray()));
+        }
+
+        void setPaddingStr(String val) {
+            this.paddingString = val;
+        }
+
+        void setNewLinePadding(int value) {
+            newLinePadding = value;
+        }
+
+        void writeNewline() {
+            blankLine = true;
+            builder.append(System.lineSeparator());
+        }
+
+        private void appendPadding() {
+            boolean wasBlank = blankLine;
+
+            if (wasBlank && newLinePadding > 0) {
+                builder.append(paddingString.repeat(newLinePadding));
+            }
+
+            if (wasBlank) {
+                blankLine = false;
+            }
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index cdf1dad118..af9d2c9592 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -15,23 +15,6 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.ignite.internal.sql.engine.exec.mapping;
 
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
new file mode 100644
index 0000000000..5c3bde8b95
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringTokenizer;
+import java.util.UUID;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.calcite.plan.RelOptUtil;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.prepare.PlanId;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A runner for mapping test cases.
+ *
+ * <p>Test case format:
+ * <pre>
+ *     #&lt;optional multiline description&gt;
+ *     &lt;node_name&gt;
+ *     ---
+ *     &lt;SQL statement (single line or multiline)&gt;
+ *     ---
+ *     &lt;expected fragments&gt;
+ *     ---
+ * </pre>
+ *
+ * <p>To bulk update test case results set system property {@code 
MAPPING_TESTS_OVERWRITE_RESULTS} to {@code true}:
+ *
+ * <p>As a result all test cases will have results that match actual ones.
+ * If there are mismatches tests still fail to report and possibly review them.
+ *
+ * <p>To remove all results set system property {@code 
MAPPING_TESTS_STRIP_RESULTS} to {@code true}:
+ *
+ * <p>As a result all test cases' results will be replaced with a dummy result.
+ * If there are mismatches tests still fail to report and possibly review them.
+ *
+ * @see FragmentPrinter
+ */
+final class MappingTestRunner {
+
+    private static final String OVERWRITE_RESULTS = 
"MAPPING_TESTS_OVERWRITE_RESULTS";
+
+    private static final String STRIP_RESULTS = "MAPPING_TESTS_STRIP_RESULTS";
+
+    /** Test setup. */
+    static final class TestSetup {
+
+        private final ExecutionTargetProvider executionTargetProvider;
+
+        private final IgniteSchema schema;
+
+        private final LogicalTopologySnapshot topologySnapshot;
+
+        TestSetup(ExecutionTargetProvider executionTargetProvider, 
IgniteSchema schema, LogicalTopologySnapshot topologySnapshot) {
+            this.executionTargetProvider = executionTargetProvider;
+            this.schema = schema;
+            this.topologySnapshot = topologySnapshot;
+        }
+    }
+
+    private final Path location;
+
+    private final BiFunction<IgniteSchema, String, IgniteRel> parseValidate;
+
+    private boolean overwriteResults;
+
+    private boolean stripResults;
+
+    private Path outputFile;
+
+    MappingTestRunner(Path location,
+            BiFunction<IgniteSchema, String, IgniteRel> parseValidate) {
+
+        this.location = location;
+        this.parseValidate = parseValidate;
+    }
+
+    /**
+     * Specifies a location to write results to. If not specified, all results 
are written to a test file,
+     *
+     * @param path Location
+     * @return this for chaining.
+     */
+    MappingTestRunner outputFile(Path path) {
+        this.outputFile = path;
+        return this;
+    }
+
+    void runTest(Supplier<TestSetup> init, String fileName) {
+        TestSetup setup = init.get();
+
+        initFlags();
+
+        Path testFile = location.resolve(fileName);
+        List<TestCaseDef> testCases = loadTestCases(testFile);
+
+        runTestCases(testFile, setup.schema, setup.executionTargetProvider, 
setup.topologySnapshot, parseValidate, testCases);
+    }
+
+    @TestOnly
+    void initFlags() {
+        overwriteResults = 
IgniteSystemProperties.getBoolean(OVERWRITE_RESULTS, false);
+        stripResults = IgniteSystemProperties.getBoolean(STRIP_RESULTS, false);
+
+        if (overwriteResults && stripResults) {
+            throw new IllegalStateException(format("Both {} and {} have been 
specified", OVERWRITE_RESULTS, STRIP_RESULTS));
+        }
+    }
+
+    private void runTestCases(Path testFile,
+            IgniteSchema schema,
+            ExecutionTargetProvider targetProvider,
+            LogicalTopologySnapshot snapshot,
+            BiFunction<IgniteSchema, String, IgniteRel> parse,
+            List<TestCaseDef> testCases) {
+
+        List<String> actualResults = new ArrayList<>();
+
+        for (TestCaseDef testDef : testCases) {
+            IgniteRel rel = parse.apply(schema, testDef.sql);
+            SqlQueryType sqlQueryType;
+            if (rel instanceof IgniteTableModify) {
+                sqlQueryType = SqlQueryType.DML;
+            } else {
+                sqlQueryType = SqlQueryType.QUERY;
+            }
+            ResultSetMetadataImpl resultSetMetadata = new 
ResultSetMetadataImpl(Collections.emptyList());
+            MultiStepPlan multiStepPlan = new MultiStepPlan(new 
PlanId(UUID.randomUUID(), 1), sqlQueryType, rel, resultSetMetadata);
+
+            String actualText = produceMapping(testDef.nodeName, 
targetProvider, snapshot, multiStepPlan);
+
+            actualResults.add(actualText);
+        }
+
+        reportResults(testCases, actualResults, testFile);
+    }
+
+    static List<TestCaseDef> loadTestCases(Path path) {
+        try {
+            Parser state = new Parser();
+            return state.parse(path);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private String produceMapping(String nodeName,
+            ExecutionTargetProvider targetProvider,
+            LogicalTopologySnapshot snapshot,
+            MultiStepPlan plan) {
+
+        MappingServiceImpl mappingService = new MappingServiceImpl(nodeName,
+                targetProvider,
+                EmptyCacheFactory.INSTANCE,
+                0,
+                Runnable::run
+        );
+        mappingService.onTopologyLeap(snapshot);
+
+        List<MappedFragment> mappedFragments;
+
+        try {
+            mappedFragments = await(mappingService.map(plan));
+        } catch (Exception e) {
+            StringBuilder sb = new StringBuilder();
+            sb.append(System.lineSeparator());
+            sb.append(RelOptUtil.toString(plan.root()));
+            sb.append(System.lineSeparator());
+
+            Throwable cause = e instanceof CompletionException ? e.getCause() 
: e;
+
+            throw new IllegalStateException("Failed to map a plan: " + sb, 
cause);
+        }
+
+        if (mappedFragments == null) {
+            throw new IllegalStateException("Mapped fragments");
+        }
+
+        return FragmentPrinter.fragmentsToString(mappedFragments);
+    }
+
+    static class TestCaseDef {
+
+        final int lineNo;
+
+        @Nullable
+        final String description;
+
+        final String nodeName;
+
+        final String sql;
+
+        final String result;
+
+        TestCaseDef(int lineNo, @Nullable String description, String nodeName, 
String sql, String res) {
+            this.lineNo = lineNo;
+            this.description = description;
+            this.nodeName = nodeName;
+            this.sql = sql;
+            this.result = res;
+        }
+    }
+
+    void reportResults(List<TestCaseDef> testCases, List<String> 
actualResults, Path fileName) {
+        doReportResults(testCases, actualResults, outputFile != null ? 
outputFile : fileName);
+    }
+
+    private void doReportResults(List<TestCaseDef> testCases, List<String> 
actualResults, Path outputFile) {
+        StringBuilder expected = new StringBuilder();
+        StringBuilder actual = new StringBuilder();
+
+        for (int i = 0; i < testCases.size(); i++) {
+            TestCaseDef testCaseDef = testCases.get(i);
+            String actualResult = actualResults.get(i);
+
+            appendTestCaseStr(i, expected, testCaseDef, 
testCaseDef.result.stripTrailing());
+            appendTestCaseStr(i, actual, testCaseDef, 
actualResult.stripTrailing());
+        }
+
+        if (overwriteResults) {
+            try {
+                Files.writeString(outputFile, actual.toString(), 
StandardCharsets.UTF_8);
+            } catch (IOException e) {
+                throw new UncheckedIOException("Failed to update a test file " 
+ outputFile, e);
+            }
+        } else if (stripResults) {
+            StringBuilder stripped = new StringBuilder();
+
+            for (int i = 0; i < testCases.size(); i++) {
+                TestCaseDef testCaseDef = testCases.get(i);
+                appendTestCaseStr(i, stripped, testCaseDef, "f");
+            }
+
+            try {
+                Files.writeString(outputFile, stripped.toString(), 
StandardCharsets.UTF_8);
+            } catch (IOException e) {
+                throw new UncheckedIOException("Failed to update a test file " 
+ outputFile, e);
+            }
+        }
+
+        if (!Objects.equals(expected.toString(), actual.toString())) {
+            String message = format("There are test failures. Location: 
({}:1)", outputFile.toAbsolutePath());
+            assertEquals(expected.toString(), actual.toString(), message);
+        }
+    }
+
+    private static void appendTestCaseStr(int idx, StringBuilder testCaseStr, 
TestCaseDef testCaseDef, String result) {
+        // Write description or a blank line (if this is not the first test 
case).
+        if (testCaseDef.description != null) {
+            testCaseStr.append(testCaseDef.description);
+            testCaseStr.append(System.lineSeparator());
+        } else if (idx > 0) {
+            testCaseStr.append(System.lineSeparator());
+        }
+
+        testCaseStr.append(testCaseDef.nodeName);
+        testCaseStr.append(System.lineSeparator());
+        testCaseStr.append(testCaseDef.sql);
+        testCaseStr.append(System.lineSeparator());
+        testCaseStr.append("---");
+        testCaseStr.append(System.lineSeparator());
+        testCaseStr.append(result);
+        testCaseStr.append(System.lineSeparator());
+        testCaseStr.append("---");
+        testCaseStr.append(System.lineSeparator());
+    }
+
+    enum ParseState {
+        NODE_NAME,
+        SQL_STMT,
+        FRAGMENTS
+    }
+
+    private static class Parser {
+        private static final String DELIMITER = "---";
+        private static final String COMMENT = "#";
+        private ParseState nextState = ParseState.NODE_NAME;
+        private int testCaseLineNo;
+        private int numFragments;
+
+        private final StringBuilder description = new StringBuilder();
+        private String nodeName;
+        private final StringBuilder sqlStmt = new StringBuilder();
+        private final StringBuilder result = new StringBuilder();
+
+        private void resetState() {
+            nextState = ParseState.NODE_NAME;
+            testCaseLineNo = -1;
+            numFragments = 0;
+            sqlStmt.setLength(0);
+            result.setLength(0);
+            description.setLength(0);
+        }
+
+        List<TestCaseDef> parse(Path path) throws IOException {
+            List<String> lines = Files.readAllLines(path, 
StandardCharsets.UTF_8);
+            List<TestCaseDef> testCases = new ArrayList<>();
+
+            int lineNo = 1;
+            String fileName = path.toAbsolutePath().toString();
+
+            for (int i = 0; i < lines.size(); i++) {
+                String line = lines.get(i);
+
+                if (line.isBlank()) {
+                    lineNo++;
+                    continue;
+                }
+
+                switch (nextState) {
+                    case NODE_NAME:
+                        // Read optional comments into description field
+                        if (line.startsWith(COMMENT)) {
+                            if (description.length() > 0) {
+                                description.append(System.lineSeparator());
+                            }
+
+                            description.append(line);
+                        } else {
+                            StringTokenizer tokenizer = new 
StringTokenizer(line.stripLeading());
+
+                            this.nodeName = tokenizer.nextToken();
+                            this.testCaseLineNo = i;
+
+                            if (tokenizer.hasMoreTokens()) {
+                                throw reportError(fileName, lineNo, "Expected 
mapping <node-name> but got: {}", line);
+                            }
+                            nextState = ParseState.SQL_STMT;
+                        }
+                        break;
+                    case SQL_STMT:
+                        if (line.startsWith(COMMENT)) {
+                            throw reportError(fileName, lineNo, "Comments are 
not allowed in sql statement text", line);
+                        }
+
+                        if (!line.stripLeading().startsWith(DELIMITER)) {
+                            if (sqlStmt.length() > 0) {
+                                sqlStmt.append(System.lineSeparator());
+                            }
+
+                            sqlStmt.append(line);
+                        } else {
+                            nextState = ParseState.FRAGMENTS;
+                        }
+                        break;
+                    case FRAGMENTS:
+                        if (line.startsWith(COMMENT)) {
+                            throw reportError(fileName, lineNo, "Comments are 
not allowed in fragment text", line);
+                        }
+
+                        if (!line.stripLeading().startsWith(DELIMITER)) {
+                            if (result.length() > 0) {
+                                result.append(System.lineSeparator());
+                            }
+
+                            if 
(line.startsWith(FragmentPrinter.FRAGMENT_PREFIX)) {
+                                if (numFragments > 0) {
+                                    result.append(System.lineSeparator());
+                                }
+                                numFragments++;
+                            }
+
+                            result.append(line);
+                        } else {
+                            TestCaseDef newTestCase = newTestCase();
+                            testCases.add(newTestCase);
+
+                            resetState();
+                        }
+                        break;
+                    default:
+                        throw new IllegalStateException("Unexpected state id: 
" + nextState);
+                }
+
+                lineNo++;
+            }
+
+            if (nextState == ParseState.FRAGMENTS && 
!result.toString().isBlank()) {
+                TestCaseDef newTestCase = newTestCase();
+                testCases.add(newTestCase);
+            } else if (nextState != ParseState.NODE_NAME) {
+                StringBuilder possibleTestCase = new StringBuilder();
+                possibleTestCase.append(System.lineSeparator());
+
+                for (int i = this.testCaseLineNo; i < lineNo; i++) {
+                    String line = lines.get(i - 1);
+                    if (i != this.testCaseLineNo) {
+                        possibleTestCase.append(System.lineSeparator());
+                    }
+                    possibleTestCase.append(line);
+                }
+
+                String error = format("Error at ({}:{}): Invalid test case: 
{}", fileName, this.testCaseLineNo, possibleTestCase);
+                throw new IllegalArgumentException(error);
+            }
+
+            return testCases;
+        }
+
+        private TestCaseDef newTestCase() {
+            String sql = sqlStmt.toString();
+            String res = result.toString().stripTrailing();
+            String desc = description.length() > 0 ? description.toString() : 
null;
+
+            return new TestCaseDef(testCaseLineNo, desc, nodeName, sql, res);
+        }
+
+        private static RuntimeException reportError(String fileName, int 
lineNo, String message, Object... params) {
+            String formatted = format(message, params);
+            String fullErrorMessage = format("({}:{}): {}", fileName, lineNo, 
formatted);
+
+            return new IllegalStateException(fullErrorMessage);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
new file mode 100644
index 0000000000..e00bc04e01
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.MappingTestRunner.TestCaseDef;
+import 
org.apache.ignite.internal.sql.engine.exec.mapping.MappingTestRunner.TestSetup;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.opentest4j.AssertionFailedError;
+
+/**
+ * Tests for {@link MappingTestRunner}.
+ */
+public class MappingTestRunnerSelfTest extends BaseIgniteAbstractTest {
+
+    private static final Path LOCATION = 
Paths.get("src/test/resources/mapping");
+
+    @TempDir
+    public Path tempDir;
+
+    @Test
+    public void testLoadTestCases() {
+        List<TestCaseDef> testCaseDefs = 
MappingTestRunner.loadTestCases(LOCATION.resolve("_runner_self.test"));
+        assertEquals(3, testCaseDefs.size());
+
+        TestCaseDef testCase1 = testCaseDefs.get(0);
+
+        String description = "# Start at node N1" + System.lineSeparator()
+                + "# Query: S1 S1" + System.lineSeparator()
+                + "# Fragments: F1";
+
+        assertEquals(description, testCase1.description);
+        assertEquals("N1", testCase1.nodeName);
+        assertEquals("S1" + System.lineSeparator() +  " S1", testCase1.sql);
+        assertEquals("Fragment#1" + System.lineSeparator() + "F1", 
testCase1.result);
+
+        TestCaseDef testCase2 = testCaseDefs.get(1);
+        assertNull(testCase2.description);
+        assertEquals("N1", testCase2.nodeName);
+        assertEquals("S2", testCase2.sql);
+        assertEquals("Fragment#1" + System.lineSeparator() + "F2", 
testCase2.result);
+
+        TestCaseDef testCase3 = testCaseDefs.get(2);
+        assertNull(testCase3.description);
+        assertEquals("N2", testCase3.nodeName);
+        assertEquals("S3", testCase3.sql);
+        assertEquals(
+                "Fragment#1" + System.lineSeparator() + "F1" + 
System.lineSeparator().repeat(2)
+                + "Fragment#2" + System.lineSeparator() + "F2", 
testCase3.result
+        );
+    }
+
+    @Test
+    @WithSystemProperty(key = "MAPPING_TESTS_OVERWRITE_RESULTS", value = 
"true")
+    public void testOverwriteResults() throws IOException {
+        Path file = tempDir.resolve("test0.test");
+
+        List<TestCaseDef> testCaseDefs = 
MappingTestRunner.loadTestCases(LOCATION.resolve("_runner_self.test"));
+        List<String> actualResults = testCaseDefs.stream()
+                .map(r -> "RRR" + System.lineSeparator() + r.result)
+                .collect(Collectors.toList());
+
+        MappingTestRunner runner = newTestRunner();
+        runner.initFlags();
+
+        assertThrows(AssertionFailedError.class,
+                () -> runner.reportResults(testCaseDefs, actualResults, file));
+
+        String expectedStr = 
Files.readString(LOCATION.resolve("_runner_overwrite_results.test"), 
StandardCharsets.UTF_8);
+        String actualStr = Files.readString(file, StandardCharsets.UTF_8);
+        assertEquals(expectedStr, actualStr);
+    }
+
+    @Test
+    @WithSystemProperty(key = "MAPPING_TESTS_STRIP_RESULTS", value = "true")
+    public void testStripResults() throws IOException {
+        Path file = tempDir.resolve("test0.test");
+
+        List<TestCaseDef> testCaseDefs = 
MappingTestRunner.loadTestCases(LOCATION.resolve("_runner_self.test"));
+        List<String> actualResults = testCaseDefs.stream().map(r -> 
r.result).collect(Collectors.toList());
+
+        MappingTestRunner runner = newTestRunner();
+        runner.initFlags();
+
+        runner.reportResults(testCaseDefs, actualResults, file);
+
+        String expectedStr = 
Files.readString(LOCATION.resolve("_runner_strip_results.test"), 
StandardCharsets.UTF_8);
+        String actualStr = Files.readString(file, StandardCharsets.UTF_8);
+        assertEquals(expectedStr, actualStr);
+    }
+
+    @Test
+    @WithSystemProperty(key = "MAPPING_TESTS_OVERWRITE_RESULTS", value = 
"true")
+    @WithSystemProperty(key = "MAPPING_TESTS_STRIP_RESULTS", value = "true")
+    public void testIncompatibleFlags() {
+        Path file = tempDir.resolve("test0.test");
+
+        // overwriteResults and stripResults flags can not be used together.
+
+        MappingTestRunner runner = newTestRunner().outputFile(file);
+
+        IllegalStateException err = assertThrows(IllegalStateException.class,
+                () -> runner.runTest(() -> {
+                    ExecutionTargetProvider targetProvider = 
Mockito.mock(ExecutionTargetProvider.class);
+                    IgniteSchema schema = new IgniteSchema("T", 1, List.of());
+                    LogicalNode node = new LogicalNode("N1", "N1", new 
NetworkAddress("addr", 1000));
+                    LogicalTopologySnapshot topologySnapshot = new 
LogicalTopologySnapshot(1, List.of(node));
+
+                    return new TestSetup(targetProvider, schema, 
topologySnapshot);
+                }, "_runner_self.test"));
+
+        String error = "Both MAPPING_TESTS_OVERWRITE_RESULTS and 
MAPPING_TESTS_STRIP_RESULTS have been specified";
+        assertThat(err.getMessage(), containsString(error));
+    }
+
+    private static MappingTestRunner newTestRunner() {
+        return new MappingTestRunner(LOCATION, (schema, sqlStmt) -> {
+            return Mockito.mock(IgniteRel.class);
+        });
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 2fc650a84f..200db05a0f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -91,7 +91,6 @@ import 
org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
 import org.apache.ignite.internal.systemview.api.SystemView;
-import org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.type.BitmaskNativeType;
 import org.apache.ignite.internal.type.DecimalNativeType;
 import org.apache.ignite.internal.type.NativeType;
@@ -315,6 +314,9 @@ public class TestBuilders {
         /** Sets the size of the table. */
         TableBuilder size(int size);
 
+        /** Sets id for the table. The caller must guarantee that provided id 
is unique. */
+        TableBuilder tableId(int id);
+
         /**
          * Builds a table.
          *
@@ -570,7 +572,7 @@ public class TestBuilders {
             Map<String, TestNode> nodes = nodeNames.stream()
                     .map(name -> {
                         var systemViewManager = new 
SystemViewManagerImpl(name, catalogManager);
-                        var targetProvider = new 
TestNodeExecutionTargetProvider(systemViewManager, owningNodesByTableName);
+                        var targetProvider = new 
TestNodeExecutionTargetProvider(systemViewManager::owningNodes, 
owningNodesByTableName);
                         var mappingService = new MappingServiceImpl(name, 
targetProvider, EmptyCacheFactory.INSTANCE, 0, Runnable::run);
 
                         systemViewManager.register(() -> systemViews);
@@ -631,6 +633,7 @@ public class TestBuilders {
         private String name;
         private IgniteDistribution distribution;
         private int size = 100_000;
+        private Integer tableId;
 
         /** {@inheritDoc} */
         @Override
@@ -709,6 +712,14 @@ public class TestBuilders {
             return this;
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public TableBuilder tableId(int id) {
+            this.tableId = id;
+
+            return this;
+        }
+
         /** {@inheritDoc} */
         @Override
         public TestTable build() {
@@ -731,6 +742,7 @@ public class TestBuilders {
                     .collect(Collectors.toList());
 
             return new TestTable(
+                    tableId != null ? tableId : TestTable.ID.incrementAndGet(),
                     tableDescriptor,
                     Objects.requireNonNull(name),
                     size,
@@ -1248,15 +1260,55 @@ public class TestBuilders {
         return newRow;
     }
 
+    /** Returns a builder for {@link ExecutionTargetProvider}. */
+    public static ExecutionTargetProviderBuilder 
executionTargetProviderBuilder() {
+        return new ExecutionTargetProviderBuilder();
+    }
+
+    /** A builder to create instances of {@link ExecutionTargetProvider}. */
+    public static final class ExecutionTargetProviderBuilder {
+
+        private final Map<String, List<String>> owningNodesByTableName = new 
HashMap<>();
+
+        private Function<String, List<String>> owningNodesBySystemViewName = 
(n) -> null;
+
+        private ExecutionTargetProviderBuilder() {
+
+        }
+
+        /** Adds tables to list of nodes mapping. */
+        public ExecutionTargetProviderBuilder addTables(Map<String, 
List<String>> tables) {
+            this.owningNodesByTableName.putAll(tables);
+            return this;
+        }
+
+        /**
+         * Sets a function that returns system views. Function accepts a view 
name and returns a list of nodes
+         * a system view is available at.
+         */
+        public ExecutionTargetProviderBuilder setSystemViews(Function<String, 
List<String>> systemViews) {
+            this.owningNodesBySystemViewName = systemViews;
+            return this;
+        }
+
+        /** Creates an instance of {@link ExecutionTargetProvider}. */
+        public ExecutionTargetProvider build() {
+            return new 
TestNodeExecutionTargetProvider(owningNodesBySystemViewName, 
Map.copyOf(owningNodesByTableName));
+        }
+    }
+
     private static class TestNodeExecutionTargetProvider implements 
ExecutionTargetProvider {
 
-        final SystemViewManager systemViewManager;
+        final Function<String, List<String>> owningNodesBySystemViewName;
 
         final Map<String, List<String>> owningNodesByTableName;
 
-        private TestNodeExecutionTargetProvider(SystemViewManager 
systemViewManager, Map<String, List<String>> owningNodesByTableName) {
-            this.systemViewManager = systemViewManager;
-            this.owningNodesByTableName = owningNodesByTableName;
+        private TestNodeExecutionTargetProvider(
+                Function<String, List<String>> owningNodesBySystemViewName,
+                Map<String, List<String>> owningNodesByTableName) {
+
+            this.owningNodesBySystemViewName = owningNodesBySystemViewName;
+            this.owningNodesByTableName = Map.copyOf(owningNodesByTableName);
         }
 
         @Override
@@ -1276,7 +1328,7 @@ public class TestBuilders {
 
         @Override
         public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
-            List<String> nodes = systemViewManager.owningNodes(view.name());
+            List<String> nodes = 
owningNodesBySystemViewName.apply(view.name());
 
             if (nullOrEmpty(nodes)) {
                 return CompletableFuture.failedFuture(
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
index ecb30b6cba..50ab6d1f42 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
@@ -36,29 +36,31 @@ public class TestTable extends IgniteTableImpl {
     private static final String DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE =
             "DataProvider is not configured [table={}, node={}]";
 
-    private static final AtomicInteger ID = new AtomicInteger();
+    static final AtomicInteger ID = new AtomicInteger();
 
     private final Map<String, DataProvider<?>> dataProviders;
 
     /** Constructor. */
     public TestTable(
+            int tableId,
             TableDescriptor descriptor,
             String name,
             double rowCnt,
             List<IgniteIndex> indexes
     ) {
-        this(descriptor, name, rowCnt, indexes, Map.of());
+        this(tableId, descriptor, name, rowCnt, indexes, Map.of());
     }
 
     /** Constructor. */
     public TestTable(
+            int tableId,
             TableDescriptor descriptor,
             String name,
             double rowCnt,
             List<IgniteIndex> indexList,
             Map<String, DataProvider<?>> dataProviders
     ) {
-        super(name, ID.incrementAndGet(), 1, descriptor, new 
TestStatistic(rowCnt),
+        super(name, tableId, 1, descriptor, new TestStatistic(rowCnt),
                 
indexList.stream().collect(Collectors.toUnmodifiableMap(IgniteIndex::name, 
Function.identity())));
 
         this.dataProviders = dataProviders;
diff --git 
a/modules/sql-engine/src/test/resources/mapping/_runner_overwrite_results.test 
b/modules/sql-engine/src/test/resources/mapping/_runner_overwrite_results.test
new file mode 100644
index 0000000000..603ee973a0
--- /dev/null
+++ 
b/modules/sql-engine/src/test/resources/mapping/_runner_overwrite_results.test
@@ -0,0 +1,30 @@
+# Start at node N1
+# Query: S1 S1
+# Fragments: F1
+N1
+S1
+ S1
+---
+RRR
+Fragment#1
+F1
+---
+
+N1
+S2
+---
+RRR
+Fragment#1
+F2
+---
+
+N2
+S3
+---
+RRR
+Fragment#1
+F1
+
+Fragment#2
+F2
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/_runner_self.test 
b/modules/sql-engine/src/test/resources/mapping/_runner_self.test
new file mode 100644
index 0000000000..23840e0bd2
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/_runner_self.test
@@ -0,0 +1,31 @@
+# Start at node N1
+# Query: S1 S1
+# Fragments: F1
+
+N1
+S1
+ S1
+---
+Fragment#1
+F1
+---
+
+N1
+S2
+---
+Fragment#1
+F2
+
+---
+
+N2
+S3
+---
+Fragment#1
+F1
+
+Fragment#2
+F2
+
+
+
diff --git 
a/modules/sql-engine/src/test/resources/mapping/_runner_strip_results.test 
b/modules/sql-engine/src/test/resources/mapping/_runner_strip_results.test
new file mode 100644
index 0000000000..cefce85d11
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/_runner_strip_results.test
@@ -0,0 +1,21 @@
+# Start at node N1
+# Query: S1 S1
+# Fragments: F1
+N1
+S1
+ S1
+---
+f
+---
+
+N1
+S2
+---
+f
+---
+
+N2
+S3
+---
+f
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test 
b/modules/sql-engine/src/test/resources/mapping/correlated.test
new file mode 100644
index 0000000000..3919943217
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -0,0 +1,183 @@
+N0
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n1
+---
+Fragment#4 root
+  executionNodes: [N0]
+  remoteFragments: [5]
+  tree:
+    IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T_N1]
+  tree:
+    IgniteSender(targetFragment=5, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T_N1, source=3, distribution=random)
+
+Fragment#5
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tables: [CT_N1]
+  tree:
+    IgniteSender(targetFragment=4, exchange=5, distribution=single)
+      IgniteProject
+        IgniteNestedLoopJoin
+          IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+          IgniteColocatedHashAggregate
+            IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+---
+
+N1
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n1
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tables: [CT_N1]
+  tree:
+    IgniteProject
+      IgniteNestedLoopJoin
+        IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+        IgniteColocatedHashAggregate
+          IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T_N1, source=3, distribution=random)
+---
+
+N0
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n2
+---
+Fragment#4 root
+  executionNodes: [N0]
+  remoteFragments: [5]
+  tree:
+    IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [T_N2]
+  tree:
+    IgniteSender(targetFragment=5, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T_N2, source=3, distribution=random)
+
+Fragment#5
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tables: [CT_N1]
+  tree:
+    IgniteSender(targetFragment=4, exchange=5, distribution=single)
+      IgniteProject
+        IgniteNestedLoopJoin
+          IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+          IgniteColocatedHashAggregate
+            IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+---
+
+N1
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n2
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tables: [CT_N1]
+  tree:
+    IgniteProject
+      IgniteNestedLoopJoin
+        IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+        IgniteColocatedHashAggregate
+          IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [T_N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T_N2, source=3, distribution=random)
+---
+
+N0
+SELECT t.c1 FROM t_n1 t JOIN table(system_range(1, 50)) as r ON t.id = r.x 
WHERE mod(r.x, 10) = 0
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  tree:
+    IgniteProject
+      IgniteMergeJoin
+        IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+        IgniteSort
+          IgniteFilter
+            IgniteTableFunctionScan(source=-1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [T_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteSort
+        IgniteTableScan(name=PUBLIC.T_N1, source=2, distribution=random)
+---
+
+N0
+SELECT t.c1 FROM ct_n1 t WHERE t.c1 < 5 AND
+EXISTS (SELECT x FROM table(system_range(t.c1, t.c2)) WHERE mod(x, 2) = 0)
+---
+Fragment#2 root
+  executionNodes: [N0]
+  remoteFragments: [3]
+  tree:
+    IgniteReceiver(sourceFragment=3, exchange=3, distribution=single)
+
+Fragment#3
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [CT_N1]
+  tree:
+    IgniteSender(targetFragment=2, exchange=3, distribution=single)
+      IgniteProject
+        IgniteFilter
+          IgniteCorrelatedNestedLoopJoin
+            IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+            IgniteColocatedHashAggregate
+              IgniteProject
+                IgniteFilter
+                  IgniteTableFunctionScan(source=-1, distribution=single)
+---
+
+N0
+SELECT t.c1 FROM t_n1 t WHERE t.c1 < 5 AND
+EXISTS (SELECT x FROM table(system_range(t.c1, t.c2)) WHERE mod(x, 2) = 0)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  tree:
+    IgniteProject
+      IgniteFilter
+        IgniteCorrelatedNestedLoopJoin
+          IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+          IgniteColocatedHashAggregate
+            IgniteProject
+              IgniteFilter
+                IgniteTableFunctionScan(source=-1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [T_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T_N1, source=2, distribution=affinity[table: 
T_N1, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/dml.test 
b/modules/sql-engine/src/test/resources/mapping/dml.test
new file mode 100644
index 0000000000..0209d8379e
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/dml.test
@@ -0,0 +1,91 @@
+N0
+INSERT INTO t1_n1 VALUES(1, 1, 1)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  tree:
+    IgniteProject
+      IgniteColocatedHashAggregate
+        IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [2]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableModify(name=PUBLIC.T1_N1, source=-1, 
distribution=affinity[table: T1_N1, columns: [ID]])
+        IgniteReceiver(sourceFragment=2, exchange=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N0]
+  tree:
+    IgniteSender(targetFragment=1, exchange=2, distribution=affinity[table: 
T1_N1, columns: [ID]])
+      IgniteValues
+---
+
+N1
+UPDATE t1_n1 SET c2 = 1000
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteProject
+      IgniteColocatedHashAggregate
+        IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableModify(name=PUBLIC.T1_N1, source=-1, 
distribution=affinity[table: T1_N1, columns: [ID]])
+        IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N2
+UPDATE t1_n1 SET c2 = 1000
+---
+Fragment#0 root
+  executionNodes: [N2]
+  remoteFragments: [1]
+  tree:
+    IgniteProject
+      IgniteColocatedHashAggregate
+        IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N2]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableModify(name=PUBLIC.T1_N1, source=-1, 
distribution=affinity[table: T1_N1, columns: [ID]])
+        IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N1
+UPDATE t2_n2n3 SET c2 = 1000
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteProject
+      IgniteColocatedHashAggregate
+        IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N2, N3]
+  tables: [T2_N2N3]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableModify(name=PUBLIC.T2_N2N3, source=-1, 
distribution=affinity[table: T2_N2N3, columns: [ID]])
+        IgniteTableScan(name=PUBLIC.T2_N2N3, source=2, 
distribution=affinity[table: T2_N2N3, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/merge_join.test 
b/modules/sql-engine/src/test/resources/mapping/merge_join.test
new file mode 100644
index 0000000000..87a22ca0e1
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/merge_join.test
@@ -0,0 +1,77 @@
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * 
FROM t1_n1 JOIN t2_n1 USING (id)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [T1_N1, T2_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteProject
+        IgniteMergeJoin
+          IgniteSort
+            IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+          IgniteSort
+            IgniteTableScan(name=PUBLIC.T2_N1, source=3, 
distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N1
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * 
FROM t1_n1 JOIN t2_n1 USING (id)
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1, T2_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteProject
+        IgniteMergeJoin
+          IgniteSort
+            IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+          IgniteSort
+            IgniteTableScan(name=PUBLIC.T2_N1, source=3, 
distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ * 
FROM t1_n1 JOIN t2_n2 USING (id)
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#4
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [5]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteProject
+        IgniteMergeJoin
+          IgniteSort
+            IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+          IgniteReceiver(sourceFragment=5, exchange=5, 
distribution=affinity[table: T2_N2, columns: [ID]])
+
+Fragment#5
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [T2_N2]
+  tree:
+    IgniteSender(targetFragment=4, exchange=5, distribution=affinity[table: 
T2_N2, columns: [ID]])
+      IgniteSort
+        IgniteTableScan(name=PUBLIC.T2_N2, source=3, 
distribution=affinity[table: T2_N2, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/set_ops.test 
b/modules/sql-engine/src/test/resources/mapping/set_ops.test
new file mode 100644
index 0000000000..e04d988ff6
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/set_ops.test
@@ -0,0 +1,48 @@
+N1
+SELECT /*+ DISABLE_RULE('MapReduceMinusConverterRule')*/ * FROM (SELECT c2 
FROM t1_n1 EXCEPT SELECT c2 FROM t2_n2)
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1, 2]
+  tree:
+    IgniteColocatedMinus
+      IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+      IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [T2_N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.T2_N2, source=3, distribution=random)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T1_N1, source=4, distribution=random)
+---
+
+N1
+SELECT /*+ DISABLE_RULE('ColocatedMinusConverterRule')*/ * FROM (SELECT c2 
FROM t1_n1 EXCEPT SELECT c2 FROM t2_n2)
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteReduceMinus
+      IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1, N2]
+  tables: [T1_N1, T2_N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteMapMinus
+        IgniteTableScan(name=PUBLIC.T1_N1, source=2, distribution=random)
+        IgniteTableScan(name=PUBLIC.T2_N2, source=3, distribution=random)
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_affinity.test 
b/modules/sql-engine/src/test/resources/mapping/table_affinity.test
new file mode 100644
index 0000000000..a7f704042a
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_affinity.test
@@ -0,0 +1,53 @@
+N1
+SELECT * FROM t1_n1
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N2
+SELECT * FROM t1_n1
+---
+Fragment#0 root
+  executionNodes: [N2]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N2]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N1
+SELECT * FROM t1_n1 WHERE ID = 0
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_functions.test 
b/modules/sql-engine/src/test/resources/mapping/table_functions.test
new file mode 100644
index 0000000000..4d85ff544f
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_functions.test
@@ -0,0 +1,8 @@
+N0
+SELECT * FROM TABLE(system_range(1, 5))
+---
+Fragment#0 root
+  executionNodes: [N0]
+  tree:
+    IgniteTableFunctionScan(source=-1, distribution=single)
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_identity.test 
b/modules/sql-engine/src/test/resources/mapping/table_identity.test
new file mode 100644
index 0000000000..f2d61df44c
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity.test
@@ -0,0 +1,111 @@
+N0
+SELECT * FROM nt1_n1, nt2_n2
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1, 2]
+  tree:
+    IgniteNestedLoopJoin
+      IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+      IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N0]
+  executionNodes: [N2]
+  tables: [NT2_N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT2_N2, source=3, distribution=identity[0])
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [NT1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
+
+N1
+SELECT * FROM nt1_n1, nt2_n2
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1, 2]
+  tree:
+    IgniteNestedLoopJoin
+      IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+      IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [NT2_N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT2_N2, source=3, distribution=identity[0])
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [NT1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
+
+N0
+SELECT * FROM nt1_n1, nt2_n1
+---
+Fragment#0 root
+  executionNodes: [N0]
+  remoteFragments: [1, 2]
+  tree:
+    IgniteNestedLoopJoin
+      IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+      IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [NT2_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT2_N1, source=3, distribution=identity[0])
+
+Fragment#1
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [NT1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
+
+N1
+SELECT * FROM nt1_n1, nt2_n1
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1, 2]
+  tree:
+    IgniteNestedLoopJoin
+      IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+      IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [NT2_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT2_N1, source=3, distribution=identity[0])
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [NT1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
diff --git 
a/modules/sql-engine/src/test/resources/mapping/table_identity_single.test 
b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
new file mode 100644
index 0000000000..12536e4c64
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
@@ -0,0 +1,99 @@
+N0
+SELECT * FROM CT_n1, NT_n1
+---
+Fragment#4 root
+  executionNodes: [N0]
+  remoteFragments: [5]
+  tree:
+    IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [NT_N1]
+  tree:
+    IgniteSender(targetFragment=5, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT_N1, source=3, distribution=identity[0])
+
+Fragment#5
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [2]
+  tables: [CT_N1]
+  tree:
+    IgniteSender(targetFragment=4, exchange=5, distribution=single)
+      IgniteNestedLoopJoin
+        IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+        IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+---
+
+N1
+SELECT * FROM CT_n1, NT_n1
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [2]
+  tables: [CT_N1]
+  tree:
+    IgniteNestedLoopJoin
+      IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+      IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [NT_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT_N1, source=3, distribution=identity[0])
+---
+
+N0
+SELECT * FROM CT_n1, NT_n2
+---
+Fragment#4 root
+  executionNodes: [N0]
+  remoteFragments: [5]
+  tree:
+    IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [NT_N2]
+  tree:
+    IgniteSender(targetFragment=5, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT_N2, source=3, distribution=identity[0])
+
+Fragment#5
+  targetNodes: [N0]
+  executionNodes: [N1]
+  remoteFragments: [2]
+  tables: [CT_N1]
+  tree:
+    IgniteSender(targetFragment=4, exchange=5, distribution=single)
+      IgniteNestedLoopJoin
+        IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+        IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+---
+
+N1
+SELECT * FROM CT_n1, NT_n2
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [2]
+  tables: [CT_N1]
+  tree:
+    IgniteNestedLoopJoin
+      IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+      IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N2]
+  tables: [NT_N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.NT_N2, source=3, distribution=identity[0])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_single.test 
b/modules/sql-engine/src/test/resources/mapping/table_single.test
new file mode 100644
index 0000000000..b652da3b5a
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_single.test
@@ -0,0 +1,89 @@
+N0
+SELECT * FROM ct1_n1, ct2_n1
+---
+Fragment#3 root
+  executionNodes: [N0]
+  remoteFragments: [4]
+  tree:
+    IgniteReceiver(sourceFragment=4, exchange=4, distribution=single)
+
+Fragment#4
+  targetNodes: [N0]
+  executionNodes: [N1]
+  tables: [CT1_N1, CT2_N1]
+  tree:
+    IgniteSender(targetFragment=3, exchange=4, distribution=single)
+      IgniteNestedLoopJoin
+        IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+        IgniteTableScan(name=PUBLIC.CT2_N1, source=2, distribution=single)
+---
+
+N1
+SELECT * FROM ct1_n1, ct2_n1
+---
+Fragment#0 root
+  executionNodes: [N1]
+  tables: [CT1_N1, CT2_N1]
+  tree:
+    IgniteNestedLoopJoin
+      IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.CT2_N1, source=2, distribution=single)
+---
+
+N0
+SELECT * FROM ct1_n1, ct2_n2
+---
+Fragment#5 root
+  executionNodes: [N0]
+  remoteFragments: [6]
+  tree:
+    IgniteReceiver(sourceFragment=6, exchange=6, distribution=single)
+
+Fragment#4
+  targetNodes: [N2]
+  executionNodes: [N1]
+  tables: [CT1_N1]
+  tree:
+    IgniteSender(targetFragment=6, exchange=4, distribution=single)
+      IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+
+Fragment#6
+  targetNodes: [N0]
+  executionNodes: [N2]
+  remoteFragments: [4]
+  tables: [CT2_N2]
+  tree:
+    IgniteSender(targetFragment=5, exchange=6, distribution=single)
+      IgniteNestedLoopJoin
+        IgniteReceiver(sourceFragment=4, exchange=4, distribution=single)
+        IgniteTableScan(name=PUBLIC.CT2_N2, source=2, distribution=single)
+---
+
+N1
+SELECT * FROM ct1_n1, ct2_n2
+---
+Fragment#5 root
+  executionNodes: [N1]
+  remoteFragments: [6]
+  tree:
+    IgniteReceiver(sourceFragment=6, exchange=6, distribution=single)
+
+Fragment#4
+  targetNodes: [N2]
+  executionNodes: [N1]
+  tables: [CT1_N1]
+  tree:
+    IgniteSender(targetFragment=6, exchange=4, distribution=single)
+      IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+
+Fragment#6
+  targetNodes: [N1]
+  executionNodes: [N2]
+  remoteFragments: [4]
+  tables: [CT2_N2]
+  tree:
+    IgniteSender(targetFragment=5, exchange=6, distribution=single)
+      IgniteNestedLoopJoin
+        IgniteReceiver(sourceFragment=4, exchange=4, distribution=single)
+        IgniteTableScan(name=PUBLIC.CT2_N2, source=2, distribution=single)
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/union.test 
b/modules/sql-engine/src/test/resources/mapping/union.test
new file mode 100644
index 0000000000..98c50a529c
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/union.test
@@ -0,0 +1,105 @@
+N1
+SELECT /*+ DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
+  (SELECT * FROM t1_n1 UNION SELECT * FROM t2_n1)
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1, 2]
+  tree:
+    IgniteReduceHashAggregate
+      IgniteMapHashAggregate
+        IgniteUnionAll
+          IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+          IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T2_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.T2_N1, source=3, 
distribution=affinity[table: T2_N1, columns: [ID]])
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T1_N1, source=4, 
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N1
+SELECT /*+ DISABLE_RULE('MapReduceHashAggregateConverterRule')*/ * FROM
+  (SELECT * FROM t1_n1 UNION SELECT * FROM t2_n1)
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1]
+  tables: [T1_N1, T2_N1]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteColocatedHashAggregate
+        IgniteUnionAll
+          IgniteTableScan(name=PUBLIC.T1_N1, source=2, 
distribution=affinity[table: T1_N1, columns: [ID]])
+          IgniteTableScan(name=PUBLIC.T2_N1, source=3, 
distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N1
+SELECT /*+ DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
+  (SELECT * FROM t1_n1n2 UNION SELECT * FROM t2_n1n2)
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1, 2]
+  tree:
+    IgniteReduceHashAggregate
+      IgniteMapHashAggregate
+        IgniteUnionAll
+          IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+          IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+  targetNodes: [N1]
+  executionNodes: [N1, N2]
+  tables: [T2_N1N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=2, distribution=single)
+      IgniteTableScan(name=PUBLIC.T2_N1N2, source=3, 
distribution=affinity[table: T2_N1N2, columns: [ID]])
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1, N2]
+  tables: [T1_N1N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteTableScan(name=PUBLIC.T1_N1N2, source=4, 
distribution=affinity[table: T1_N1N2, columns: [ID]])
+---
+
+N1
+SELECT /*+ DISABLE_RULE('MapReduceHashAggregateConverterRule')*/ * FROM
+  (SELECT * FROM t1_n1n2 UNION SELECT * FROM t2_n1n2)
+---
+Fragment#0 root
+  executionNodes: [N1]
+  remoteFragments: [1]
+  tree:
+    IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+  targetNodes: [N1]
+  executionNodes: [N1, N2]
+  tables: [T1_N1N2, T2_N1N2]
+  tree:
+    IgniteSender(targetFragment=0, exchange=1, distribution=single)
+      IgniteColocatedHashAggregate
+        IgniteUnionAll
+          IgniteTableScan(name=PUBLIC.T1_N1N2, source=2, 
distribution=affinity[table: T1_N1N2, columns: [ID]])
+          IgniteTableScan(name=PUBLIC.T2_N1N2, source=3, 
distribution=affinity[table: T2_N1N2, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/values.test 
b/modules/sql-engine/src/test/resources/mapping/values.test
new file mode 100644
index 0000000000..fe3cb632b5
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/values.test
@@ -0,0 +1,17 @@
+N1
+SELECT 1
+---
+Fragment#0 root
+  executionNodes: [N1]
+  tree:
+    IgniteValues
+---
+
+N2
+SELECT 1
+---
+Fragment#0 root
+  executionNodes: [N2]
+  tree:
+    IgniteValues
+---


Reply via email to