This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 29292de35a [IGNITE-20504] Sql. Improve test coverage of MappingService
(#2768)
29292de35a is described below
commit 29292de35a29a435413faa7964caf782f1011d7c
Author: Max Zhuravkov <[email protected]>
AuthorDate: Wed Nov 8 07:43:26 2023 +0200
[IGNITE-20504] Sql. Improve test coverage of MappingService (#2768)
---
.../internal/sql/engine/prepare/MultiStepPlan.java | 3 +-
.../ignite/internal/sql/engine/prepare/PlanId.java | 3 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 2 +-
.../engine/exec/mapping/FragmentMappingTest.java | 361 ++++++++++++++++
.../sql/engine/exec/mapping/FragmentPrinter.java | 375 +++++++++++++++++
.../exec/mapping/MappingServiceImplTest.java | 17 -
.../sql/engine/exec/mapping/MappingTestRunner.java | 454 +++++++++++++++++++++
.../exec/mapping/MappingTestRunnerSelfTest.java | 157 +++++++
.../sql/engine/framework/TestBuilders.java | 66 ++-
.../internal/sql/engine/framework/TestTable.java | 8 +-
.../mapping/_runner_overwrite_results.test | 30 ++
.../src/test/resources/mapping/_runner_self.test | 31 ++
.../resources/mapping/_runner_strip_results.test | 21 +
.../src/test/resources/mapping/correlated.test | 183 +++++++++
.../sql-engine/src/test/resources/mapping/dml.test | 91 +++++
.../src/test/resources/mapping/merge_join.test | 77 ++++
.../src/test/resources/mapping/set_ops.test | 48 +++
.../src/test/resources/mapping/table_affinity.test | 53 +++
.../test/resources/mapping/table_functions.test | 8 +
.../src/test/resources/mapping/table_identity.test | 111 +++++
.../resources/mapping/table_identity_single.test | 99 +++++
.../src/test/resources/mapping/table_single.test | 89 ++++
.../src/test/resources/mapping/union.test | 105 +++++
.../src/test/resources/mapping/values.test | 17 +
24 files changed, 2379 insertions(+), 30 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
index 558755e1bb..e1978e34e2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
@@ -35,7 +35,8 @@ public class MultiStepPlan implements QueryPlan {
private final IgniteRel root;
- MultiStepPlan(PlanId id, SqlQueryType type, IgniteRel root,
ResultSetMetadata meta) {
+ /** Constructor. */
+ public MultiStepPlan(PlanId id, SqlQueryType type, IgniteRel root,
ResultSetMetadata meta) {
this.id = id;
this.type = type;
this.root = root;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
index 112500b461..315ed3115e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlanId.java
@@ -32,7 +32,8 @@ public class PlanId implements Serializable {
private final UUID prepareServiceId;
private final long planNumber;
- PlanId(UUID prepareServiceId, long planNumber) {
+ /** Constructor. */
+ public PlanId(UUID prepareServiceId, long planNumber) {
this.prepareServiceId = prepareServiceId;
this.planNumber = planNumber;
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index c8a84e9d13..a9c0f23c87 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -1029,7 +1029,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
);
}
- return new TestTable(new TableDescriptorImpl(columns, distr), name,
size, List.of());
+ return new TestTable(1, new TableDescriptorImpl(columns, distr), name,
size, List.of());
}
private static class CapturingMailboxRegistry implements MailboxRegistry {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
new file mode 100644
index 0000000000..40a0c1a0c6
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.MappingTestRunner.TestSetup;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.ExecutionTargetProviderBuilder;
+import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
+import org.apache.ignite.internal.sql.engine.framework.TestTable;
+import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
+import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
+import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.AffinityDistribution;
+import
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.IdentityDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for fragment mapping.
+ *
+ * <p>Test setup:
+ * <pre>
+ * // add nodes named n1, n2, and n3 to the cluster.
+ * addNodes("N1", "N2", "N3");
+ *
+ * // The first statement adds a table with data located at node N1.
+ * // The second one adds a table with data located at nodes N2, N3.
+ * // The catalog is going to contain tables "T1_N1" and "T2_N2N3".
+ * // Both tables have distribution affinity([0], tableId, zoneId)
+ * addTable("T1", "N1");
+ * addTable("T2", "N2", "N3");
+ *
+ * // Adds table with identity(0) distribution. Can be used to mimic node
system views.
+ * addTableIdent("NT1", "N1");
+ *
+ * // Adds table with single distribution. Can be used to mimic
cluster-wide system views.
+ * addTableSingle("CT2", "N1", "N2", "N3");
+ *
+ * // Sets row count statistics to 100 for all tables named like T1_*
+ * setRowCount("T1", 100);
+ *
+ * var runner = new MappingTestRunner("tests_dir");
+ * // run test cases
+ * runner.run(this::initSchema, "file.test");
+ * </pre>
+ *
+ * <p>See {@link MappingTestRunner} for test file format description.
+ *
+ * @see MappingTestRunner
+ */
+public class FragmentMappingTest extends AbstractPlannerTest {
+
+ private static final Path LOCATION =
Paths.get("src/test/resources/mapping");
+
+ private final TreeSet<String> nodeNames = new TreeSet<>();
+
+ private final TreeMap<String, Pair<IgniteDistribution, List<String>>>
tables = new TreeMap<>();
+
+ private final Map<String, Integer> tableRows = new HashMap<>();
+
+ private final MappingTestRunner testRunner = new
MappingTestRunner(LOCATION, this::parseQuery);
+
+ @Test
+ public void testValues() {
+ addNodes("N1", "N2");
+
+ testRunner.runTest(this::initSchema, "values.test");
+ }
+
+ @Test
+ public void testTable() {
+ addNodes("N1", "N2");
+
+ addTable("T1", "N1");
+
+ testRunner.runTest(this::initSchema, "table_affinity.test");
+ }
+
+ @Test
+ public void testDml() {
+ addNodes("N0", "N1", "N2", "N3");
+
+ addTable("T1", "N1");
+ addTable("T2", "N2", "N3");
+
+ testRunner.runTest(this::initSchema, "dml.test");
+ }
+
+ @Test
+ public void testUnion() {
+ addNodes("N1", "N2", "N3");
+
+ addTable("T1", "N1");
+ addTable("T1", "N1", "N2");
+
+ addTable("T2", "N1");
+ addTable("T2", "N1", "N2");
+
+ testRunner.runTest(this::initSchema, "union.test");
+ }
+
+ @Test
+ public void testSetOps() {
+ addNodes("N1", "N2", "N3");
+
+ addTable("T1", "N1");
+ addTable("T2", "N2");
+
+ testRunner.runTest(this::initSchema, "set_ops.test");
+ }
+
+ @Test
+ public void testMergeJoin() {
+ addNodes("N0", "N1", "N2", "N3", "N4");
+
+ addTable("T1", "N1");
+ addTable("T2", "N1");
+ addTable("T2", "N2");
+
+ setRowCount("T1", 200);
+ setRowCount("T2", 100);
+
+ testRunner.runTest(this::initSchema, "merge_join.test");
+ }
+
+ @Test
+ public void testTableIdentity() {
+ addNodes("N0", "N1", "N2");
+
+ addTableIdent("NT1", "N1");
+ addTableIdent("NT2", "N1");
+ addTableIdent("NT2", "N2");
+
+ testRunner.runTest(this::initSchema, "table_identity.test");
+ }
+
+ @Test
+ public void testTableSingleDistribution() {
+ addNodes("N0", "N1", "N2");
+
+ addTableSingle("CT1", "N1");
+ addTableSingle("CT2", "N1");
+ addTableSingle("CT2", "N2");
+
+ testRunner.runTest(this::initSchema, "table_single.test");
+ }
+
+ @Test
+ public void testTableSingleIdentityDistribution() {
+ addNodes("N0", "N1", "N2");
+
+ addTableSingle("CT", "N1");
+ addTableSingle("CT", "N2");
+
+ addTableIdent("NT", "N1");
+ addTableIdent("NT", "N2");
+
+ testRunner.runTest(this::initSchema, "table_identity_single.test");
+ }
+
+ @Test
+ public void testCorrelated() {
+ addNodes("N0", "N1", "N2");
+
+ addTable("T", "N1");
+ addTable("T", "N2");
+
+ addTableSingle("CT", "N1");
+ addTableSingle("CT", "N2");
+
+ addTableIdent("NT", "N1");
+ addTableIdent("NT", "N2");
+
+ testRunner.runTest(this::initSchema, "correlated.test");
+ }
+
+ @Test
+ public void testTableFunctions() {
+ addNodes("N0", "N1", "N2");
+
+ addTable("T", "N1");
+
+ addTableSingle("CT", "N1");
+ addTableSingle("CT", "N2");
+
+ addTableIdent("NT", "N1");
+ addTableIdent("NT", "N2");
+
+ testRunner.runTest(this::initSchema, "table_functions.test");
+ }
+
+ private void addNodes(String node, String... otherNodes) {
+ this.nodeNames.add(node);
+ this.nodeNames.addAll(Arrays.asList(otherNodes));
+ }
+
+ private void addTable(String name, String node, String... otherNodes) {
+ TreeSet<String> nodeNames = new TreeSet<>();
+ nodeNames.add(node);
+ nodeNames.addAll(Arrays.asList(otherNodes));
+
+ String tableName = formatName(name, nodeNames);
+ tables.put(tableName, new Pair<>(IgniteDistributions.affinity(-1, -1,
-1), new ArrayList<>(nodeNames)));
+ }
+
+ private void addTableSingle(String name, String... nodes) {
+ TreeSet<String> nodeNames = new TreeSet<>(Arrays.asList(nodes));
+ String tableName = formatName(name, nodeNames);
+
+ tables.put(tableName, new Pair<>(IgniteDistributions.single(), new
ArrayList<>(nodeNames)));
+ }
+
+ private void addTableIdent(String name, String node, String... otherNodes)
{
+ TreeSet<String> nodeNames = new TreeSet<>();
+ nodeNames.add(node);
+ nodeNames.addAll(Arrays.asList(otherNodes));
+
+ String tableName = formatName(name, nodeNames);
+ tables.put(tableName, new Pair<>(IgniteDistributions.identity(0), new
ArrayList<>(nodeNames)));
+ }
+
+ private void setRowCount(String tableName, int rowCount) {
+ this.tableRows.put(tableName, rowCount);
+ }
+
+ private IgniteRel parseQuery(IgniteSchema schema, String sqlStmt) {
+ try {
+ PlanningContext ctx = PlanningContext.builder()
+ .parentContext(baseQueryContext(List.of(schema), null))
+ .query(sqlStmt)
+ .build();
+
+ try (IgnitePlanner planner = ctx.planner()) {
+ assertNotNull(planner);
+
+ return physicalPlan(planner, ctx.query());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse a statement: " +
sqlStmt, e);
+ }
+ }
+
+ private TestSetup initSchema() {
+ ExecutionTargetProviderBuilder executionTargetProviderBuilder =
TestBuilders.executionTargetProviderBuilder();
+ List<IgniteDataSource> dataSources = new ArrayList<>();
+ int objectId = 1;
+
+ Map<String, List<String>> table2NodeNames = new HashMap<>();
+
+ for (Map.Entry<String, Pair<IgniteDistribution, List<String>>> e :
tables.entrySet()) {
+ String tableName = e.getKey();
+ String tableShortName = tableName.substring(0,
tableName.indexOf('_'));
+ // Generate distinct row counts for each table to ensure that the
optimizer produces the same results.
+ int tableSize = tableRows.getOrDefault(tableShortName, 100 +
objectId);
+
+ for (String tableNodeName : e.getValue().getSecond()) {
+ if (!nodeNames.contains(tableNodeName)) {
+ String message = format(
+ "Expected node {} for table {}. Registered nodes:
{}",
+ tableNodeName, tableShortName, nodeNames
+ );
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ IgniteDistribution distribution = e.getValue().getFirst();
+
+ TableBuilder tableBuilder = TestBuilders.table().name(tableName);
+
+ // To mimic node system views, id column is node name alias.
+ if (distribution.function() instanceof IdentityDistribution) {
+ tableBuilder = tableBuilder.addColumn("ID",
NativeTypes.stringOf(64));
+ } else {
+ // To mimic cluster wide system views
+ tableBuilder = tableBuilder.addColumn("ID", NativeTypes.INT32);
+ }
+
+ IgniteDistribution distributionToUse;
+ if (distribution.function() instanceof AffinityDistribution) {
+ distributionToUse = IgniteDistributions.affinity(0, objectId,
1);
+ } else {
+ distributionToUse = distribution;
+ }
+
+ TestTable testTable = tableBuilder
+ .addColumn("C1", NativeTypes.INT32)
+ .addColumn("C2", NativeTypes.INT32)
+ .size(tableSize)
+ .tableId(objectId)
+ .distribution(distributionToUse)
+ .build();
+
+ dataSources.add(testTable);
+
+ table2NodeNames.put(tableName, e.getValue().getSecond());
+
+ objectId += 1;
+ }
+
+ executionTargetProviderBuilder.addTables(table2NodeNames);
+
+ IgniteSchema schema = new
IgniteSchema(CatalogManager.DEFAULT_SCHEMA_NAME, 1, dataSources);
+ ExecutionTargetProvider executionTargetProvider =
executionTargetProviderBuilder.build();
+ LogicalTopologySnapshot logicalTopologySnapshot = newLogicalTopology();
+
+ return new TestSetup(executionTargetProvider, schema,
logicalTopologySnapshot);
+ }
+
+ private static String formatName(String name, TreeSet<String> nodeNames) {
+ return name + "_" + String.join("", nodeNames);
+ }
+
+ private LogicalTopologySnapshot newLogicalTopology() {
+ List<LogicalNode> logicalNodes = nodeNames.stream()
+ .map(name -> {
+ NetworkAddress addr =
NetworkAddress.from("127.0.0.1:10000");
+ return new LogicalNode(name, name, addr);
+ })
+ .collect(Collectors.toList());
+
+ return new LogicalTopologySnapshot(1, logicalNodes);
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
new file mode 100644
index 0000000000..3894b176c8
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentPrinter.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.sql.engine.prepare.Fragment;
+import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableFunctionScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
+import
org.apache.ignite.internal.sql.engine.trait.DistributionFunction.AffinityDistribution;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+
+/**
+ * Converts {@link MappedFragment} to text representation.
+ */
+final class FragmentPrinter extends IgniteRelShuttle {
+
+ static String FRAGMENT_PREFIX = "Fragment#";
+
+ private final Output output;
+
+ private FragmentPrinter(Output output) {
+ this.output = output;
+ }
+
+ static String fragmentsToString(List<MappedFragment> mappedFragments) {
+ TableDescriptorCollector collector = new TableDescriptorCollector();
+
+ for (MappedFragment mappedFragment : mappedFragments) {
+ Fragment fragment = mappedFragment.fragment();
+ collector.collect(fragment);
+ }
+
+ Output output = new Output(val -> {
+ if (val instanceof IgniteDistribution) {
+ IgniteDistribution distribution = (IgniteDistribution) val;
+ return formatDistribution(distribution, collector);
+ } else {
+ return String.valueOf(val);
+ }
+ });
+
+ for (MappedFragment mappedFragment : mappedFragments) {
+ FragmentPrinter printer = new FragmentPrinter(output);
+ printer.print(mappedFragment);
+
+ output.writeNewline();
+ }
+
+ return output.builder.toString();
+ }
+
+ void print(MappedFragment mappedFragment) {
+ Fragment fragment = mappedFragment.fragment();
+
+ output.setNewLinePadding(0);
+ output.writeFormattedString(FRAGMENT_PREFIX + "{}",
fragment.fragmentId());
+
+ if (fragment.rootFragment()) {
+ output.writeString(" root");
+ }
+
+ if (fragment.correlated()) {
+ output.writeString(" correlated");
+ }
+
+ output.appendPadding();
+ output.setNewLinePadding(2);
+ output.writeNewline();
+
+ ColocationGroup target = mappedFragment.target();
+ if (target != null) {
+ output.appendPadding();
+ List<String> sortedNodeNames = target.nodeNames()
+ .stream()
+ .sorted(Comparator.naturalOrder())
+ .collect(Collectors.toList());
+
+ output.writeKeyValue("targetNodes", sortedNodeNames.toString());
+ output.writeNewline();
+ }
+
+ output.setNewLinePadding(2);
+
+ output.appendPadding();
+ output.writeKeyValue("executionNodes",
mappedFragment.nodes().toString());
+ output.writeNewline();
+
+ List<IgniteReceiver> remotes = mappedFragment.fragment().remotes();
+ if (!remotes.isEmpty()) {
+ List<Long> remotesVals = remotes.stream()
+ .map(IgniteReceiver::sourceFragmentId)
+ .collect(Collectors.toList());
+
+ output.appendPadding();
+ output.writeKeyValues("remoteFragments", remotesVals);
+ output.writeNewline();
+ }
+
+ if (!fragment.tables().isEmpty()) {
+ List<String> tables = fragment.tables().stream()
+ .map(IgniteDataSource::name)
+ .sorted(Comparator.naturalOrder())
+ .collect(Collectors.toList());
+
+ output.appendPadding();
+ output.writeKeyValue("tables", tables.toString());
+ output.writeNewline();
+ }
+
+ if (!fragment.systemViews().isEmpty()) {
+ List<String> tables = fragment.systemViews().stream()
+ .map(IgniteDataSource::name)
+ .sorted(Comparator.naturalOrder())
+ .collect(Collectors.toList());
+
+ output.appendPadding();
+ output.writeKeyValue("systemViews", tables.toString());
+ output.writeNewline();
+ }
+
+ output.appendPadding();
+ output.writeString("tree:");
+ output.writeNewline();
+
+ printRel(fragment.root());
+ }
+
+ private void printRel(IgniteRel rel) {
+ String prevPaddingStr = output.paddingString;
+ int prevPadding = output.newLinePadding;
+
+ output.setPaddingStr(" ");
+ output.setNewLinePadding(2);
+ try {
+ visit(rel);
+ } finally {
+ output.setNewLinePadding(prevPadding);
+ }
+
+ output.setPaddingStr(prevPaddingStr);
+ output.setNewLinePadding(prevPadding);
+ }
+
+ @Override
+ protected IgniteRel processNode(IgniteRel rel) {
+ int prevPadding = output.newLinePadding;
+ output.setNewLinePadding(output.newLinePadding + 1);
+ output.writeNewline();
+ try {
+ return super.processNode(rel);
+ } finally {
+ output.setNewLinePadding(prevPadding);
+ }
+ }
+
+ @Override
+ public IgniteRel visit(IgniteRel rel) {
+ output.appendPadding();
+ output.writeString(rel.getClass().getSimpleName());
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteSender rel) {
+ long targetId = rel.targetFragmentId();
+ long exchangeId = rel.exchangeId();
+
+ output.writeFormattedString("(targetFragment={}, exchange={},
distribution={})", targetId, exchangeId, rel.distribution());
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteReceiver rel) {
+ long sourceFragmentId = rel.sourceFragmentId();
+ long exchangeId = rel.exchangeId();
+
+ output.writeFormattedString("(sourceFragment={}, exchange={},
distribution={})", sourceFragmentId, exchangeId, rel.distribution());
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteIndexScan rel) {
+ long sourceId = rel.sourceId();
+ String tableName = String.join(".", rel.getTable().getQualifiedName());
+
+ output.writeFormattedString("(name={}, source={}, distribution={})",
tableName, sourceId, rel.distribution());
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteTableScan rel) {
+ long sourceId = rel.sourceId();
+ String tableName = String.join(".", rel.getTable().getQualifiedName());
+
+ output.writeFormattedString("(name={}, source={}, distribution={})",
tableName, sourceId, rel.distribution());
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteTableModify rel) {
+ String tableName = String.join(".", rel.getTable().getQualifiedName());
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20495 there is
no sourceId on TableModifyNode
+ output.writeFormattedString("(name={}, source={}, distribution={})",
tableName, "-1", rel.distribution());
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteTableFunctionScan rel) {
+ output.writeFormattedString("(source={}, distribution={})",
rel.sourceId(), rel.distribution());
+ return super.visit(rel);
+ }
+
+ private static String formatDistribution(IgniteDistribution distribution,
TableDescriptorCollector collector) {
+ if (distribution.function() instanceof AffinityDistribution) {
+ AffinityDistribution f = (AffinityDistribution)
distribution.function();
+ IgniteTable igniteTable = collector.tables.get(f.tableId());
+
+ if (igniteTable == null) {
+ String error = format("Unknown tableId: {}. Existing: {}",
collector.tables.keySet());
+ throw new IllegalStateException(error);
+ }
+
+ TableDescriptor tableDescriptor = igniteTable.descriptor();
+ String colocationColumns =
igniteTable.distribution().getKeys().stream()
+ .map(k -> tableDescriptor.columnDescriptor(k).name())
+ .collect(Collectors.joining(",", "[", "]"));
+
+ return format("affinity[table: {}, columns: {}]",
igniteTable.name(), colocationColumns);
+ } else {
+ return distribution.toString();
+ }
+ }
+
+ /** Collects table descriptors to use them table names, column names in
text output. */
+ private static class TableDescriptorCollector extends IgniteRelShuttle {
+
+ private final Map<Integer, IgniteTable> tables = new HashMap<>();
+
+ void collect(Fragment fragment) {
+ fragment.root().accept(this);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteIndexScan rel) {
+ IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
+ tables.put(igniteTable.id(), igniteTable);
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteTableScan rel) {
+ IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
+ tables.put(igniteTable.id(), igniteTable);
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteTableModify rel) {
+ IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
+ tables.put(igniteTable.id(), igniteTable);
+ return super.visit(rel);
+ }
+ }
+
+ private static class Output {
+
+ private final StringBuilder builder = new StringBuilder();
+
+ private int newLinePadding;
+
+ private String paddingString = " ";
+
+ private boolean blankLine;
+
+ private final Function<Object, String> formatter;
+
+ private Output(Function<Object, String> formatter) {
+ this.formatter = formatter;
+ }
+
+ /** Writes a list of objectIds: {@code name: [id1, id2, ..., idN]}. */
+ void writeKeyValues(String name, List<?> values) {
+ builder.append(name);
+ builder.append(": [");
+
+ for (Object value : values) {
+ builder.append(formatter.apply(value));
+ builder.append(", ");
+ }
+ builder.setLength(builder.length() - 2);
+
+ builder.append("]");
+ }
+
+ /** Writes string property: {@code name: value}. */
+ void writeKeyValue(String name, String value) {
+ builder.append(name);
+ builder.append(": ");
+ builder.append(value);
+ }
+
+ /** Writes the given. */
+ void writeString(String value) {
+ builder.append(value);
+ }
+
+ /** Writes a formatted string. */
+ void writeFormattedString(String value, Object... params) {
+ List<String> formattedParams = new ArrayList<>();
+ for (Object param : params) {
+ formattedParams.add(formatter.apply(param));
+ }
+
+ builder.append(format(value, formattedParams.toArray()));
+ }
+
+ void setPaddingStr(String val) {
+ this.paddingString = val;
+ }
+
+ void setNewLinePadding(int value) {
+ newLinePadding = value;
+ }
+
+ void writeNewline() {
+ blankLine = true;
+ builder.append(System.lineSeparator());
+ }
+
+ private void appendPadding() {
+ boolean wasBlank = blankLine;
+
+ if (wasBlank && newLinePadding > 0) {
+ builder.append(paddingString.repeat(newLinePadding));
+ }
+
+ if (wasBlank) {
+ blankLine = false;
+ }
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index cdf1dad118..af9d2c9592 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -15,23 +15,6 @@
* limitations under the License.
*/
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.ignite.internal.sql.engine.exec.mapping;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
new file mode 100644
index 0000000000..5c3bde8b95
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringTokenizer;
+import java.util.UUID;
+import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.calcite.plan.RelOptUtil;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.prepare.PlanId;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * A runner for mapping test cases.
+ *
+ * <p>Test case format:
+ * <pre>
+ * #<optional multiline description>
+ * <node_name>
+ * ---
+ * <SQL statement (single line or multiline)>
+ * ---
+ * <expected fragments>
+ * ---
+ * </pre>
+ *
+ * <p>To bulk update test case results set system property {@code
MAPPING_TESTS_OVERWRITE_RESULTS} to {@code true}:
+ *
+ * <p>As a result all test cases will have results that match actual ones.
+ * If there are mismatches tests still fail to report and possibly review them.
+ *
+ * <p>To remove all results set system property {@code
MAPPING_TESTS_STRIP_RESULTS} to {@code true}:
+ *
+ * <p>As a result all test cases' results will be replaced with a dummy result.
+ * If there are mismatches tests still fail to report and possibly review them.
+ *
+ * @see FragmentPrinter
+ */
+final class MappingTestRunner {
+
+ private static final String OVERWRITE_RESULTS =
"MAPPING_TESTS_OVERWRITE_RESULTS";
+
+ private static final String STRIP_RESULTS = "MAPPING_TESTS_STRIP_RESULTS";
+
+ /** Test setup. */
+ static final class TestSetup {
+
+ private final ExecutionTargetProvider executionTargetProvider;
+
+ private final IgniteSchema schema;
+
+ private final LogicalTopologySnapshot topologySnapshot;
+
+ TestSetup(ExecutionTargetProvider executionTargetProvider,
IgniteSchema schema, LogicalTopologySnapshot topologySnapshot) {
+ this.executionTargetProvider = executionTargetProvider;
+ this.schema = schema;
+ this.topologySnapshot = topologySnapshot;
+ }
+ }
+
+ private final Path location;
+
+ private final BiFunction<IgniteSchema, String, IgniteRel> parseValidate;
+
+ private boolean overwriteResults;
+
+ private boolean stripResults;
+
+ private Path outputFile;
+
+ MappingTestRunner(Path location,
+ BiFunction<IgniteSchema, String, IgniteRel> parseValidate) {
+
+ this.location = location;
+ this.parseValidate = parseValidate;
+ }
+
+ /**
+ * Specifies a location to write results to. If not specified, all results
are written to a test file,
+ *
+ * @param path Location
+ * @return this for chaining.
+ */
+ MappingTestRunner outputFile(Path path) {
+ this.outputFile = path;
+ return this;
+ }
+
+ void runTest(Supplier<TestSetup> init, String fileName) {
+ TestSetup setup = init.get();
+
+ initFlags();
+
+ Path testFile = location.resolve(fileName);
+ List<TestCaseDef> testCases = loadTestCases(testFile);
+
+ runTestCases(testFile, setup.schema, setup.executionTargetProvider,
setup.topologySnapshot, parseValidate, testCases);
+ }
+
+ @TestOnly
+ void initFlags() {
+ overwriteResults =
IgniteSystemProperties.getBoolean(OVERWRITE_RESULTS, false);
+ stripResults = IgniteSystemProperties.getBoolean(STRIP_RESULTS, false);
+
+ if (overwriteResults && stripResults) {
+ throw new IllegalStateException(format("Both {} and {} have been
specified", OVERWRITE_RESULTS, STRIP_RESULTS));
+ }
+ }
+
+ private void runTestCases(Path testFile,
+ IgniteSchema schema,
+ ExecutionTargetProvider targetProvider,
+ LogicalTopologySnapshot snapshot,
+ BiFunction<IgniteSchema, String, IgniteRel> parse,
+ List<TestCaseDef> testCases) {
+
+ List<String> actualResults = new ArrayList<>();
+
+ for (TestCaseDef testDef : testCases) {
+ IgniteRel rel = parse.apply(schema, testDef.sql);
+ SqlQueryType sqlQueryType;
+ if (rel instanceof IgniteTableModify) {
+ sqlQueryType = SqlQueryType.DML;
+ } else {
+ sqlQueryType = SqlQueryType.QUERY;
+ }
+ ResultSetMetadataImpl resultSetMetadata = new
ResultSetMetadataImpl(Collections.emptyList());
+ MultiStepPlan multiStepPlan = new MultiStepPlan(new
PlanId(UUID.randomUUID(), 1), sqlQueryType, rel, resultSetMetadata);
+
+ String actualText = produceMapping(testDef.nodeName,
targetProvider, snapshot, multiStepPlan);
+
+ actualResults.add(actualText);
+ }
+
+ reportResults(testCases, actualResults, testFile);
+ }
+
+ static List<TestCaseDef> loadTestCases(Path path) {
+ try {
+ Parser state = new Parser();
+ return state.parse(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private String produceMapping(String nodeName,
+ ExecutionTargetProvider targetProvider,
+ LogicalTopologySnapshot snapshot,
+ MultiStepPlan plan) {
+
+ MappingServiceImpl mappingService = new MappingServiceImpl(nodeName,
+ targetProvider,
+ EmptyCacheFactory.INSTANCE,
+ 0,
+ Runnable::run
+ );
+ mappingService.onTopologyLeap(snapshot);
+
+ List<MappedFragment> mappedFragments;
+
+ try {
+ mappedFragments = await(mappingService.map(plan));
+ } catch (Exception e) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(System.lineSeparator());
+ sb.append(RelOptUtil.toString(plan.root()));
+ sb.append(System.lineSeparator());
+
+ Throwable cause = e instanceof CompletionException ? e.getCause()
: e;
+
+ throw new IllegalStateException("Failed to map a plan: " + sb,
cause);
+ }
+
+ if (mappedFragments == null) {
+ throw new IllegalStateException("Mapped fragments");
+ }
+
+ return FragmentPrinter.fragmentsToString(mappedFragments);
+ }
+
+ static class TestCaseDef {
+
+ final int lineNo;
+
+ @Nullable
+ final String description;
+
+ final String nodeName;
+
+ final String sql;
+
+ final String result;
+
+ TestCaseDef(int lineNo, @Nullable String description, String nodeName,
String sql, String res) {
+ this.lineNo = lineNo;
+ this.description = description;
+ this.nodeName = nodeName;
+ this.sql = sql;
+ this.result = res;
+ }
+ }
+
+ void reportResults(List<TestCaseDef> testCases, List<String>
actualResults, Path fileName) {
+ doReportResults(testCases, actualResults, outputFile != null ?
outputFile : fileName);
+ }
+
+ private void doReportResults(List<TestCaseDef> testCases, List<String>
actualResults, Path outputFile) {
+ StringBuilder expected = new StringBuilder();
+ StringBuilder actual = new StringBuilder();
+
+ for (int i = 0; i < testCases.size(); i++) {
+ TestCaseDef testCaseDef = testCases.get(i);
+ String actualResult = actualResults.get(i);
+
+ appendTestCaseStr(i, expected, testCaseDef,
testCaseDef.result.stripTrailing());
+ appendTestCaseStr(i, actual, testCaseDef,
actualResult.stripTrailing());
+ }
+
+ if (overwriteResults) {
+ try {
+ Files.writeString(outputFile, actual.toString(),
StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to update a test file "
+ outputFile, e);
+ }
+ } else if (stripResults) {
+ StringBuilder stripped = new StringBuilder();
+
+ for (int i = 0; i < testCases.size(); i++) {
+ TestCaseDef testCaseDef = testCases.get(i);
+ appendTestCaseStr(i, stripped, testCaseDef, "f");
+ }
+
+ try {
+ Files.writeString(outputFile, stripped.toString(),
StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to update a test file "
+ outputFile, e);
+ }
+ }
+
+ if (!Objects.equals(expected.toString(), actual.toString())) {
+ String message = format("There are test failures. Location:
({}:1)", outputFile.toAbsolutePath());
+ assertEquals(expected.toString(), actual.toString(), message);
+ }
+ }
+
+ private static void appendTestCaseStr(int idx, StringBuilder testCaseStr,
TestCaseDef testCaseDef, String result) {
+ // Write description or a blank line (if this is not the first test
case).
+ if (testCaseDef.description != null) {
+ testCaseStr.append(testCaseDef.description);
+ testCaseStr.append(System.lineSeparator());
+ } else if (idx > 0) {
+ testCaseStr.append(System.lineSeparator());
+ }
+
+ testCaseStr.append(testCaseDef.nodeName);
+ testCaseStr.append(System.lineSeparator());
+ testCaseStr.append(testCaseDef.sql);
+ testCaseStr.append(System.lineSeparator());
+ testCaseStr.append("---");
+ testCaseStr.append(System.lineSeparator());
+ testCaseStr.append(result);
+ testCaseStr.append(System.lineSeparator());
+ testCaseStr.append("---");
+ testCaseStr.append(System.lineSeparator());
+ }
+
+ enum ParseState {
+ NODE_NAME,
+ SQL_STMT,
+ FRAGMENTS
+ }
+
+ private static class Parser {
+ private static final String DELIMITER = "---";
+ private static final String COMMENT = "#";
+ private ParseState nextState = ParseState.NODE_NAME;
+ private int testCaseLineNo;
+ private int numFragments;
+
+ private final StringBuilder description = new StringBuilder();
+ private String nodeName;
+ private final StringBuilder sqlStmt = new StringBuilder();
+ private final StringBuilder result = new StringBuilder();
+
+ private void resetState() {
+ nextState = ParseState.NODE_NAME;
+ testCaseLineNo = -1;
+ numFragments = 0;
+ sqlStmt.setLength(0);
+ result.setLength(0);
+ description.setLength(0);
+ }
+
+ List<TestCaseDef> parse(Path path) throws IOException {
+ List<String> lines = Files.readAllLines(path,
StandardCharsets.UTF_8);
+ List<TestCaseDef> testCases = new ArrayList<>();
+
+ int lineNo = 1;
+ String fileName = path.toAbsolutePath().toString();
+
+ for (int i = 0; i < lines.size(); i++) {
+ String line = lines.get(i);
+
+ if (line.isBlank()) {
+ lineNo++;
+ continue;
+ }
+
+ switch (nextState) {
+ case NODE_NAME:
+ // Read optional comments into description field
+ if (line.startsWith(COMMENT)) {
+ if (description.length() > 0) {
+ description.append(System.lineSeparator());
+ }
+
+ description.append(line);
+ } else {
+ StringTokenizer tokenizer = new
StringTokenizer(line.stripLeading());
+
+ this.nodeName = tokenizer.nextToken();
+ this.testCaseLineNo = i;
+
+ if (tokenizer.hasMoreTokens()) {
+ throw reportError(fileName, lineNo, "Expected
mapping <node-name> but got: {}", line);
+ }
+ nextState = ParseState.SQL_STMT;
+ }
+ break;
+ case SQL_STMT:
+ if (line.startsWith(COMMENT)) {
+ throw reportError(fileName, lineNo, "Comments are
not allowed in sql statement text", line);
+ }
+
+ if (!line.stripLeading().startsWith(DELIMITER)) {
+ if (sqlStmt.length() > 0) {
+ sqlStmt.append(System.lineSeparator());
+ }
+
+ sqlStmt.append(line);
+ } else {
+ nextState = ParseState.FRAGMENTS;
+ }
+ break;
+ case FRAGMENTS:
+ if (line.startsWith(COMMENT)) {
+ throw reportError(fileName, lineNo, "Comments are
not allowed in fragment text", line);
+ }
+
+ if (!line.stripLeading().startsWith(DELIMITER)) {
+ if (result.length() > 0) {
+ result.append(System.lineSeparator());
+ }
+
+ if
(line.startsWith(FragmentPrinter.FRAGMENT_PREFIX)) {
+ if (numFragments > 0) {
+ result.append(System.lineSeparator());
+ }
+ numFragments++;
+ }
+
+ result.append(line);
+ } else {
+ TestCaseDef newTestCase = newTestCase();
+ testCases.add(newTestCase);
+
+ resetState();
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state id:
" + nextState);
+ }
+
+ lineNo++;
+ }
+
+ if (nextState == ParseState.FRAGMENTS &&
!result.toString().isBlank()) {
+ TestCaseDef newTestCase = newTestCase();
+ testCases.add(newTestCase);
+ } else if (nextState != ParseState.NODE_NAME) {
+ StringBuilder possibleTestCase = new StringBuilder();
+ possibleTestCase.append(System.lineSeparator());
+
+ for (int i = this.testCaseLineNo; i < lineNo; i++) {
+ String line = lines.get(i - 1);
+ if (i != this.testCaseLineNo) {
+ possibleTestCase.append(System.lineSeparator());
+ }
+ possibleTestCase.append(line);
+ }
+
+ String error = format("Error at ({}:{}): Invalid test case:
{}", fileName, this.testCaseLineNo, possibleTestCase);
+ throw new IllegalArgumentException(error);
+ }
+
+ return testCases;
+ }
+
+ private TestCaseDef newTestCase() {
+ String sql = sqlStmt.toString();
+ String res = result.toString().stripTrailing();
+ String desc = description.length() > 0 ? description.toString() :
null;
+
+ return new TestCaseDef(testCaseLineNo, desc, nodeName, sql, res);
+ }
+
+ private static RuntimeException reportError(String fileName, int
lineNo, String message, Object... params) {
+ String formatted = format(message, params);
+ String fullErrorMessage = format("({}:{}): {}", fileName, lineNo,
formatted);
+
+ return new IllegalStateException(fullErrorMessage);
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
new file mode 100644
index 0000000000..e00bc04e01
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunnerSelfTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.MappingTestRunner.TestCaseDef;
+import
org.apache.ignite.internal.sql.engine.exec.mapping.MappingTestRunner.TestSetup;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.opentest4j.AssertionFailedError;
+
+/**
+ * Tests for {@link MappingTestRunner}.
+ */
+public class MappingTestRunnerSelfTest extends BaseIgniteAbstractTest {
+
+ private static final Path LOCATION =
Paths.get("src/test/resources/mapping");
+
+ @TempDir
+ public Path tempDir;
+
+ @Test
+ public void testLoadTestCases() {
+ List<TestCaseDef> testCaseDefs =
MappingTestRunner.loadTestCases(LOCATION.resolve("_runner_self.test"));
+ assertEquals(3, testCaseDefs.size());
+
+ TestCaseDef testCase1 = testCaseDefs.get(0);
+
+ String description = "# Start at node N1" + System.lineSeparator()
+ + "# Query: S1 S1" + System.lineSeparator()
+ + "# Fragments: F1";
+
+ assertEquals(description, testCase1.description);
+ assertEquals("N1", testCase1.nodeName);
+ assertEquals("S1" + System.lineSeparator() + " S1", testCase1.sql);
+ assertEquals("Fragment#1" + System.lineSeparator() + "F1",
testCase1.result);
+
+ TestCaseDef testCase2 = testCaseDefs.get(1);
+ assertNull(testCase2.description);
+ assertEquals("N1", testCase2.nodeName);
+ assertEquals("S2", testCase2.sql);
+ assertEquals("Fragment#1" + System.lineSeparator() + "F2",
testCase2.result);
+
+ TestCaseDef testCase3 = testCaseDefs.get(2);
+ assertNull(testCase3.description);
+ assertEquals("N2", testCase3.nodeName);
+ assertEquals("S3", testCase3.sql);
+ assertEquals(
+ "Fragment#1" + System.lineSeparator() + "F1" +
System.lineSeparator().repeat(2)
+ + "Fragment#2" + System.lineSeparator() + "F2",
testCase3.result
+ );
+ }
+
+ @Test
+ @WithSystemProperty(key = "MAPPING_TESTS_OVERWRITE_RESULTS", value =
"true")
+ public void testOverwriteResults() throws IOException {
+ Path file = tempDir.resolve("test0.test");
+
+ List<TestCaseDef> testCaseDefs =
MappingTestRunner.loadTestCases(LOCATION.resolve("_runner_self.test"));
+ List<String> actualResults = testCaseDefs.stream()
+ .map(r -> "RRR" + System.lineSeparator() + r.result)
+ .collect(Collectors.toList());
+
+ MappingTestRunner runner = newTestRunner();
+ runner.initFlags();
+
+ assertThrows(AssertionFailedError.class,
+ () -> runner.reportResults(testCaseDefs, actualResults, file));
+
+ String expectedStr =
Files.readString(LOCATION.resolve("_runner_overwrite_results.test"),
StandardCharsets.UTF_8);
+ String actualStr = Files.readString(file, StandardCharsets.UTF_8);
+ assertEquals(expectedStr, actualStr);
+ }
+
+ @Test
+ @WithSystemProperty(key = "MAPPING_TESTS_STRIP_RESULTS", value = "true")
+ public void testStripResults() throws IOException {
+ Path file = tempDir.resolve("test0.test");
+
+ List<TestCaseDef> testCaseDefs =
MappingTestRunner.loadTestCases(LOCATION.resolve("_runner_self.test"));
+ List<String> actualResults = testCaseDefs.stream().map(r ->
r.result).collect(Collectors.toList());
+
+ MappingTestRunner runner = newTestRunner();
+ runner.initFlags();
+
+ runner.reportResults(testCaseDefs, actualResults, file);
+
+ String expectedStr =
Files.readString(LOCATION.resolve("_runner_strip_results.test"),
StandardCharsets.UTF_8);
+ String actualStr = Files.readString(file, StandardCharsets.UTF_8);
+ assertEquals(expectedStr, actualStr);
+ }
+
+ @Test
+ @WithSystemProperty(key = "MAPPING_TESTS_OVERWRITE_RESULTS", value =
"true")
+ @WithSystemProperty(key = "MAPPING_TESTS_STRIP_RESULTS", value = "true")
+ public void testIncompatibleFlags() {
+ Path file = tempDir.resolve("test0.test");
+
+ // overwriteResults and stripResults flags can not be used together.
+
+ MappingTestRunner runner = newTestRunner().outputFile(file);
+
+ IllegalStateException err = assertThrows(IllegalStateException.class,
+ () -> runner.runTest(() -> {
+ ExecutionTargetProvider targetProvider =
Mockito.mock(ExecutionTargetProvider.class);
+ IgniteSchema schema = new IgniteSchema("T", 1, List.of());
+ LogicalNode node = new LogicalNode("N1", "N1", new
NetworkAddress("addr", 1000));
+ LogicalTopologySnapshot topologySnapshot = new
LogicalTopologySnapshot(1, List.of(node));
+
+ return new TestSetup(targetProvider, schema,
topologySnapshot);
+ }, "_runner_self.test"));
+
+ String error = "Both MAPPING_TESTS_OVERWRITE_RESULTS and
MAPPING_TESTS_STRIP_RESULTS have been specified";
+ assertThat(err.getMessage(), containsString(error));
+ }
+
+ private static MappingTestRunner newTestRunner() {
+ return new MappingTestRunner(LOCATION, (schema, sqlStmt) -> {
+ return Mockito.mock(IgniteRel.class);
+ });
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 2fc650a84f..200db05a0f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -91,7 +91,6 @@ import
org.apache.ignite.internal.sql.engine.util.EmptyCacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemView;
-import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.type.BitmaskNativeType;
import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
@@ -315,6 +314,9 @@ public class TestBuilders {
/** Sets the size of the table. */
TableBuilder size(int size);
+ /** Sets id for the table. The caller must guarantee that provided id
is unique. */
+ TableBuilder tableId(int id);
+
/**
* Builds a table.
*
@@ -570,7 +572,7 @@ public class TestBuilders {
Map<String, TestNode> nodes = nodeNames.stream()
.map(name -> {
var systemViewManager = new
SystemViewManagerImpl(name, catalogManager);
- var targetProvider = new
TestNodeExecutionTargetProvider(systemViewManager, owningNodesByTableName);
+ var targetProvider = new
TestNodeExecutionTargetProvider(systemViewManager::owningNodes,
owningNodesByTableName);
var mappingService = new MappingServiceImpl(name,
targetProvider, EmptyCacheFactory.INSTANCE, 0, Runnable::run);
systemViewManager.register(() -> systemViews);
@@ -631,6 +633,7 @@ public class TestBuilders {
private String name;
private IgniteDistribution distribution;
private int size = 100_000;
+ private Integer tableId;
/** {@inheritDoc} */
@Override
@@ -709,6 +712,14 @@ public class TestBuilders {
return this;
}
+ /** {@inheritDoc} */
+ @Override
+ public TableBuilder tableId(int id) {
+ this.tableId = id;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override
public TestTable build() {
@@ -731,6 +742,7 @@ public class TestBuilders {
.collect(Collectors.toList());
return new TestTable(
+ tableId != null ? tableId : TestTable.ID.incrementAndGet(),
tableDescriptor,
Objects.requireNonNull(name),
size,
@@ -1248,15 +1260,55 @@ public class TestBuilders {
return newRow;
}
+ /** Returns a builder for {@link ExecutionTargetProvider}. */
+ public static ExecutionTargetProviderBuilder
executionTargetProviderBuilder() {
+ return new ExecutionTargetProviderBuilder();
+ }
+
+ /** A builder to create instances of {@link ExecutionTargetProvider}. */
+ public static final class ExecutionTargetProviderBuilder {
+
+ private final Map<String, List<String>> owningNodesByTableName = new
HashMap<>();
+
+ private Function<String, List<String>> owningNodesBySystemViewName =
(n) -> null;
+
+ private ExecutionTargetProviderBuilder() {
+
+ }
+
+ /** Adds tables to list of nodes mapping. */
+ public ExecutionTargetProviderBuilder addTables(Map<String,
List<String>> tables) {
+ this.owningNodesByTableName.putAll(tables);
+ return this;
+ }
+
+ /**
+ * Sets a function that returns system views. Function accepts a view
name and returns a list of nodes
+ * a system view is available at.
+ */
+ public ExecutionTargetProviderBuilder setSystemViews(Function<String,
List<String>> systemViews) {
+ this.owningNodesBySystemViewName = systemViews;
+ return this;
+ }
+
+ /** Creates an instance of {@link ExecutionTargetProvider}. */
+ public ExecutionTargetProvider build() {
+ return new
TestNodeExecutionTargetProvider(owningNodesBySystemViewName,
Map.copyOf(owningNodesByTableName));
+ }
+ }
+
private static class TestNodeExecutionTargetProvider implements
ExecutionTargetProvider {
- final SystemViewManager systemViewManager;
+ final Function<String, List<String>> owningNodesBySystemViewName;
final Map<String, List<String>> owningNodesByTableName;
- private TestNodeExecutionTargetProvider(SystemViewManager
systemViewManager, Map<String, List<String>> owningNodesByTableName) {
- this.systemViewManager = systemViewManager;
- this.owningNodesByTableName = owningNodesByTableName;
+ private TestNodeExecutionTargetProvider(
+ Function<String, List<String>> owningNodesBySystemViewName,
+ Map<String, List<String>> owningNodesByTableName) {
+
+ this.owningNodesBySystemViewName = owningNodesBySystemViewName;
+ this.owningNodesByTableName = Map.copyOf(owningNodesByTableName);
}
@Override
@@ -1276,7 +1328,7 @@ public class TestBuilders {
@Override
public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
- List<String> nodes = systemViewManager.owningNodes(view.name());
+ List<String> nodes =
owningNodesBySystemViewName.apply(view.name());
if (nullOrEmpty(nodes)) {
return CompletableFuture.failedFuture(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
index ecb30b6cba..50ab6d1f42 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestTable.java
@@ -36,29 +36,31 @@ public class TestTable extends IgniteTableImpl {
private static final String DATA_PROVIDER_NOT_CONFIGURED_MESSAGE_TEMPLATE =
"DataProvider is not configured [table={}, node={}]";
- private static final AtomicInteger ID = new AtomicInteger();
+ static final AtomicInteger ID = new AtomicInteger();
private final Map<String, DataProvider<?>> dataProviders;
/** Constructor. */
public TestTable(
+ int tableId,
TableDescriptor descriptor,
String name,
double rowCnt,
List<IgniteIndex> indexes
) {
- this(descriptor, name, rowCnt, indexes, Map.of());
+ this(tableId, descriptor, name, rowCnt, indexes, Map.of());
}
/** Constructor. */
public TestTable(
+ int tableId,
TableDescriptor descriptor,
String name,
double rowCnt,
List<IgniteIndex> indexList,
Map<String, DataProvider<?>> dataProviders
) {
- super(name, ID.incrementAndGet(), 1, descriptor, new
TestStatistic(rowCnt),
+ super(name, tableId, 1, descriptor, new TestStatistic(rowCnt),
indexList.stream().collect(Collectors.toUnmodifiableMap(IgniteIndex::name,
Function.identity())));
this.dataProviders = dataProviders;
diff --git
a/modules/sql-engine/src/test/resources/mapping/_runner_overwrite_results.test
b/modules/sql-engine/src/test/resources/mapping/_runner_overwrite_results.test
new file mode 100644
index 0000000000..603ee973a0
--- /dev/null
+++
b/modules/sql-engine/src/test/resources/mapping/_runner_overwrite_results.test
@@ -0,0 +1,30 @@
+# Start at node N1
+# Query: S1 S1
+# Fragments: F1
+N1
+S1
+ S1
+---
+RRR
+Fragment#1
+F1
+---
+
+N1
+S2
+---
+RRR
+Fragment#1
+F2
+---
+
+N2
+S3
+---
+RRR
+Fragment#1
+F1
+
+Fragment#2
+F2
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/_runner_self.test
b/modules/sql-engine/src/test/resources/mapping/_runner_self.test
new file mode 100644
index 0000000000..23840e0bd2
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/_runner_self.test
@@ -0,0 +1,31 @@
+# Start at node N1
+# Query: S1 S1
+# Fragments: F1
+
+N1
+S1
+ S1
+---
+Fragment#1
+F1
+---
+
+N1
+S2
+---
+Fragment#1
+F2
+
+---
+
+N2
+S3
+---
+Fragment#1
+F1
+
+Fragment#2
+F2
+
+
+
diff --git
a/modules/sql-engine/src/test/resources/mapping/_runner_strip_results.test
b/modules/sql-engine/src/test/resources/mapping/_runner_strip_results.test
new file mode 100644
index 0000000000..cefce85d11
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/_runner_strip_results.test
@@ -0,0 +1,21 @@
+# Start at node N1
+# Query: S1 S1
+# Fragments: F1
+N1
+S1
+ S1
+---
+f
+---
+
+N1
+S2
+---
+f
+---
+
+N2
+S3
+---
+f
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test
b/modules/sql-engine/src/test/resources/mapping/correlated.test
new file mode 100644
index 0000000000..3919943217
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -0,0 +1,183 @@
+N0
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n1
+---
+Fragment#4 root
+ executionNodes: [N0]
+ remoteFragments: [5]
+ tree:
+ IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T_N1]
+ tree:
+ IgniteSender(targetFragment=5, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T_N1, source=3, distribution=random)
+
+Fragment#5
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tables: [CT_N1]
+ tree:
+ IgniteSender(targetFragment=4, exchange=5, distribution=single)
+ IgniteProject
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+---
+
+N1
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n1
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tables: [CT_N1]
+ tree:
+ IgniteProject
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T_N1, source=3, distribution=random)
+---
+
+N0
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n2
+---
+Fragment#4 root
+ executionNodes: [N0]
+ remoteFragments: [5]
+ tree:
+ IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [T_N2]
+ tree:
+ IgniteSender(targetFragment=5, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T_N2, source=3, distribution=random)
+
+Fragment#5
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tables: [CT_N1]
+ tree:
+ IgniteSender(targetFragment=4, exchange=5, distribution=single)
+ IgniteProject
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+---
+
+N1
+SELECT (SELECT count(*) FROM ct_n1) FROM t_n2
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tables: [CT_N1]
+ tree:
+ IgniteProject
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteTableScan(name=PUBLIC.CT_N1, source=2, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [T_N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T_N2, source=3, distribution=random)
+---
+
+N0
+SELECT t.c1 FROM t_n1 t JOIN table(system_range(1, 50)) as r ON t.id = r.x
WHERE mod(r.x, 10) = 0
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ tree:
+ IgniteProject
+ IgniteMergeJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteSort
+ IgniteFilter
+ IgniteTableFunctionScan(source=-1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [T_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteSort
+ IgniteTableScan(name=PUBLIC.T_N1, source=2, distribution=random)
+---
+
+N0
+SELECT t.c1 FROM ct_n1 t WHERE t.c1 < 5 AND
+EXISTS (SELECT x FROM table(system_range(t.c1, t.c2)) WHERE mod(x, 2) = 0)
+---
+Fragment#2 root
+ executionNodes: [N0]
+ remoteFragments: [3]
+ tree:
+ IgniteReceiver(sourceFragment=3, exchange=3, distribution=single)
+
+Fragment#3
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [CT_N1]
+ tree:
+ IgniteSender(targetFragment=2, exchange=3, distribution=single)
+ IgniteProject
+ IgniteFilter
+ IgniteCorrelatedNestedLoopJoin
+ IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteProject
+ IgniteFilter
+ IgniteTableFunctionScan(source=-1, distribution=single)
+---
+
+N0
+SELECT t.c1 FROM t_n1 t WHERE t.c1 < 5 AND
+EXISTS (SELECT x FROM table(system_range(t.c1, t.c2)) WHERE mod(x, 2) = 0)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ tree:
+ IgniteProject
+ IgniteFilter
+ IgniteCorrelatedNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteProject
+ IgniteFilter
+ IgniteTableFunctionScan(source=-1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [T_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T_N1, source=2, distribution=affinity[table:
T_N1, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/dml.test
b/modules/sql-engine/src/test/resources/mapping/dml.test
new file mode 100644
index 0000000000..0209d8379e
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/dml.test
@@ -0,0 +1,91 @@
+N0
+INSERT INTO t1_n1 VALUES(1, 1, 1)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ tree:
+ IgniteProject
+ IgniteColocatedHashAggregate
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [2]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableModify(name=PUBLIC.T1_N1, source=-1,
distribution=affinity[table: T1_N1, columns: [ID]])
+ IgniteReceiver(sourceFragment=2, exchange=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N0]
+ tree:
+ IgniteSender(targetFragment=1, exchange=2, distribution=affinity[table:
T1_N1, columns: [ID]])
+ IgniteValues
+---
+
+N1
+UPDATE t1_n1 SET c2 = 1000
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteProject
+ IgniteColocatedHashAggregate
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableModify(name=PUBLIC.T1_N1, source=-1,
distribution=affinity[table: T1_N1, columns: [ID]])
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N2
+UPDATE t1_n1 SET c2 = 1000
+---
+Fragment#0 root
+ executionNodes: [N2]
+ remoteFragments: [1]
+ tree:
+ IgniteProject
+ IgniteColocatedHashAggregate
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N2]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableModify(name=PUBLIC.T1_N1, source=-1,
distribution=affinity[table: T1_N1, columns: [ID]])
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N1
+UPDATE t2_n2n3 SET c2 = 1000
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteProject
+ IgniteColocatedHashAggregate
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N2, N3]
+ tables: [T2_N2N3]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableModify(name=PUBLIC.T2_N2N3, source=-1,
distribution=affinity[table: T2_N2N3, columns: [ID]])
+ IgniteTableScan(name=PUBLIC.T2_N2N3, source=2,
distribution=affinity[table: T2_N2N3, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/merge_join.test
b/modules/sql-engine/src/test/resources/mapping/merge_join.test
new file mode 100644
index 0000000000..87a22ca0e1
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/merge_join.test
@@ -0,0 +1,77 @@
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ *
FROM t1_n1 JOIN t2_n1 USING (id)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [T1_N1, T2_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteProject
+ IgniteMergeJoin
+ IgniteSort
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+ IgniteSort
+ IgniteTableScan(name=PUBLIC.T2_N1, source=3,
distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N1
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ *
FROM t1_n1 JOIN t2_n1 USING (id)
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1, T2_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteProject
+ IgniteMergeJoin
+ IgniteSort
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+ IgniteSort
+ IgniteTableScan(name=PUBLIC.T2_N1, source=3,
distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N0
+SELECT /*+ ENFORCE_JOIN_ORDER, DISABLE_RULE('NestedLoopJoinConverter') */ *
FROM t1_n1 JOIN t2_n2 USING (id)
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#4
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [5]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteProject
+ IgniteMergeJoin
+ IgniteSort
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+ IgniteReceiver(sourceFragment=5, exchange=5,
distribution=affinity[table: T2_N2, columns: [ID]])
+
+Fragment#5
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [T2_N2]
+ tree:
+ IgniteSender(targetFragment=4, exchange=5, distribution=affinity[table:
T2_N2, columns: [ID]])
+ IgniteSort
+ IgniteTableScan(name=PUBLIC.T2_N2, source=3,
distribution=affinity[table: T2_N2, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/set_ops.test
b/modules/sql-engine/src/test/resources/mapping/set_ops.test
new file mode 100644
index 0000000000..e04d988ff6
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/set_ops.test
@@ -0,0 +1,48 @@
+N1
+SELECT /*+ DISABLE_RULE('MapReduceMinusConverterRule')*/ * FROM (SELECT c2
FROM t1_n1 EXCEPT SELECT c2 FROM t2_n2)
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1, 2]
+ tree:
+ IgniteColocatedMinus
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [T2_N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.T2_N2, source=3, distribution=random)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T1_N1, source=4, distribution=random)
+---
+
+N1
+SELECT /*+ DISABLE_RULE('ColocatedMinusConverterRule')*/ * FROM (SELECT c2
FROM t1_n1 EXCEPT SELECT c2 FROM t2_n2)
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteReduceMinus
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1, N2]
+ tables: [T1_N1, T2_N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteMapMinus
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2, distribution=random)
+ IgniteTableScan(name=PUBLIC.T2_N2, source=3, distribution=random)
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_affinity.test
b/modules/sql-engine/src/test/resources/mapping/table_affinity.test
new file mode 100644
index 0000000000..a7f704042a
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_affinity.test
@@ -0,0 +1,53 @@
+N1
+SELECT * FROM t1_n1
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N2
+SELECT * FROM t1_n1
+---
+Fragment#0 root
+ executionNodes: [N2]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N2]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N1
+SELECT * FROM t1_n1 WHERE ID = 0
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_functions.test
b/modules/sql-engine/src/test/resources/mapping/table_functions.test
new file mode 100644
index 0000000000..4d85ff544f
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_functions.test
@@ -0,0 +1,8 @@
+N0
+SELECT * FROM TABLE(system_range(1, 5))
+---
+Fragment#0 root
+ executionNodes: [N0]
+ tree:
+ IgniteTableFunctionScan(source=-1, distribution=single)
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_identity.test
b/modules/sql-engine/src/test/resources/mapping/table_identity.test
new file mode 100644
index 0000000000..f2d61df44c
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity.test
@@ -0,0 +1,111 @@
+N0
+SELECT * FROM nt1_n1, nt2_n2
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1, 2]
+ tree:
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N0]
+ executionNodes: [N2]
+ tables: [NT2_N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT2_N2, source=3, distribution=identity[0])
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [NT1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
+
+N1
+SELECT * FROM nt1_n1, nt2_n2
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1, 2]
+ tree:
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [NT2_N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT2_N2, source=3, distribution=identity[0])
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [NT1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
+
+N0
+SELECT * FROM nt1_n1, nt2_n1
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1, 2]
+ tree:
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [NT2_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT2_N1, source=3, distribution=identity[0])
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [NT1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
+
+N1
+SELECT * FROM nt1_n1, nt2_n1
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1, 2]
+ tree:
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [NT2_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT2_N1, source=3, distribution=identity[0])
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [NT1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT1_N1, source=4, distribution=identity[0])
+---
diff --git
a/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
new file mode 100644
index 0000000000..12536e4c64
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_identity_single.test
@@ -0,0 +1,99 @@
+N0
+SELECT * FROM CT_n1, NT_n1
+---
+Fragment#4 root
+ executionNodes: [N0]
+ remoteFragments: [5]
+ tree:
+ IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [NT_N1]
+ tree:
+ IgniteSender(targetFragment=5, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT_N1, source=3, distribution=identity[0])
+
+Fragment#5
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [2]
+ tables: [CT_N1]
+ tree:
+ IgniteSender(targetFragment=4, exchange=5, distribution=single)
+ IgniteNestedLoopJoin
+ IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+---
+
+N1
+SELECT * FROM CT_n1, NT_n1
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [2]
+ tables: [CT_N1]
+ tree:
+ IgniteNestedLoopJoin
+ IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [NT_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT_N1, source=3, distribution=identity[0])
+---
+
+N0
+SELECT * FROM CT_n1, NT_n2
+---
+Fragment#4 root
+ executionNodes: [N0]
+ remoteFragments: [5]
+ tree:
+ IgniteReceiver(sourceFragment=5, exchange=5, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [NT_N2]
+ tree:
+ IgniteSender(targetFragment=5, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT_N2, source=3, distribution=identity[0])
+
+Fragment#5
+ targetNodes: [N0]
+ executionNodes: [N1]
+ remoteFragments: [2]
+ tables: [CT_N1]
+ tree:
+ IgniteSender(targetFragment=4, exchange=5, distribution=single)
+ IgniteNestedLoopJoin
+ IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+---
+
+N1
+SELECT * FROM CT_n1, NT_n2
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [2]
+ tables: [CT_N1]
+ tree:
+ IgniteNestedLoopJoin
+ IgniteTableScan(name=PUBLIC.CT_N1, source=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N2]
+ tables: [NT_N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.NT_N2, source=3, distribution=identity[0])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/table_single.test
b/modules/sql-engine/src/test/resources/mapping/table_single.test
new file mode 100644
index 0000000000..b652da3b5a
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/table_single.test
@@ -0,0 +1,89 @@
+N0
+SELECT * FROM ct1_n1, ct2_n1
+---
+Fragment#3 root
+ executionNodes: [N0]
+ remoteFragments: [4]
+ tree:
+ IgniteReceiver(sourceFragment=4, exchange=4, distribution=single)
+
+Fragment#4
+ targetNodes: [N0]
+ executionNodes: [N1]
+ tables: [CT1_N1, CT2_N1]
+ tree:
+ IgniteSender(targetFragment=3, exchange=4, distribution=single)
+ IgniteNestedLoopJoin
+ IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.CT2_N1, source=2, distribution=single)
+---
+
+N1
+SELECT * FROM ct1_n1, ct2_n1
+---
+Fragment#0 root
+ executionNodes: [N1]
+ tables: [CT1_N1, CT2_N1]
+ tree:
+ IgniteNestedLoopJoin
+ IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.CT2_N1, source=2, distribution=single)
+---
+
+N0
+SELECT * FROM ct1_n1, ct2_n2
+---
+Fragment#5 root
+ executionNodes: [N0]
+ remoteFragments: [6]
+ tree:
+ IgniteReceiver(sourceFragment=6, exchange=6, distribution=single)
+
+Fragment#4
+ targetNodes: [N2]
+ executionNodes: [N1]
+ tables: [CT1_N1]
+ tree:
+ IgniteSender(targetFragment=6, exchange=4, distribution=single)
+ IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+
+Fragment#6
+ targetNodes: [N0]
+ executionNodes: [N2]
+ remoteFragments: [4]
+ tables: [CT2_N2]
+ tree:
+ IgniteSender(targetFragment=5, exchange=6, distribution=single)
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=4, exchange=4, distribution=single)
+ IgniteTableScan(name=PUBLIC.CT2_N2, source=2, distribution=single)
+---
+
+N1
+SELECT * FROM ct1_n1, ct2_n2
+---
+Fragment#5 root
+ executionNodes: [N1]
+ remoteFragments: [6]
+ tree:
+ IgniteReceiver(sourceFragment=6, exchange=6, distribution=single)
+
+Fragment#4
+ targetNodes: [N2]
+ executionNodes: [N1]
+ tables: [CT1_N1]
+ tree:
+ IgniteSender(targetFragment=6, exchange=4, distribution=single)
+ IgniteTableScan(name=PUBLIC.CT1_N1, source=1, distribution=single)
+
+Fragment#6
+ targetNodes: [N1]
+ executionNodes: [N2]
+ remoteFragments: [4]
+ tables: [CT2_N2]
+ tree:
+ IgniteSender(targetFragment=5, exchange=6, distribution=single)
+ IgniteNestedLoopJoin
+ IgniteReceiver(sourceFragment=4, exchange=4, distribution=single)
+ IgniteTableScan(name=PUBLIC.CT2_N2, source=2, distribution=single)
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/union.test
b/modules/sql-engine/src/test/resources/mapping/union.test
new file mode 100644
index 0000000000..98c50a529c
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/union.test
@@ -0,0 +1,105 @@
+N1
+SELECT /*+ DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
+ (SELECT * FROM t1_n1 UNION SELECT * FROM t2_n1)
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1, 2]
+ tree:
+ IgniteReduceHashAggregate
+ IgniteMapHashAggregate
+ IgniteUnionAll
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T2_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.T2_N1, source=3,
distribution=affinity[table: T2_N1, columns: [ID]])
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T1_N1, source=4,
distribution=affinity[table: T1_N1, columns: [ID]])
+---
+
+N1
+SELECT /*+ DISABLE_RULE('MapReduceHashAggregateConverterRule')*/ * FROM
+ (SELECT * FROM t1_n1 UNION SELECT * FROM t2_n1)
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1]
+ tables: [T1_N1, T2_N1]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteUnionAll
+ IgniteTableScan(name=PUBLIC.T1_N1, source=2,
distribution=affinity[table: T1_N1, columns: [ID]])
+ IgniteTableScan(name=PUBLIC.T2_N1, source=3,
distribution=affinity[table: T2_N1, columns: [ID]])
+---
+
+N1
+SELECT /*+ DISABLE_RULE('ColocatedHashAggregateConverterRule')*/ * FROM
+ (SELECT * FROM t1_n1n2 UNION SELECT * FROM t2_n1n2)
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1, 2]
+ tree:
+ IgniteReduceHashAggregate
+ IgniteMapHashAggregate
+ IgniteUnionAll
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+ IgniteReceiver(sourceFragment=2, exchange=2, distribution=single)
+
+Fragment#2
+ targetNodes: [N1]
+ executionNodes: [N1, N2]
+ tables: [T2_N1N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=2, distribution=single)
+ IgniteTableScan(name=PUBLIC.T2_N1N2, source=3,
distribution=affinity[table: T2_N1N2, columns: [ID]])
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1, N2]
+ tables: [T1_N1N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteTableScan(name=PUBLIC.T1_N1N2, source=4,
distribution=affinity[table: T1_N1N2, columns: [ID]])
+---
+
+N1
+SELECT /*+ DISABLE_RULE('MapReduceHashAggregateConverterRule')*/ * FROM
+ (SELECT * FROM t1_n1n2 UNION SELECT * FROM t2_n1n2)
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ tree:
+ IgniteReceiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N1, N2]
+ tables: [T1_N1N2, T2_N1N2]
+ tree:
+ IgniteSender(targetFragment=0, exchange=1, distribution=single)
+ IgniteColocatedHashAggregate
+ IgniteUnionAll
+ IgniteTableScan(name=PUBLIC.T1_N1N2, source=2,
distribution=affinity[table: T1_N1N2, columns: [ID]])
+ IgniteTableScan(name=PUBLIC.T2_N1N2, source=3,
distribution=affinity[table: T2_N1N2, columns: [ID]])
+---
diff --git a/modules/sql-engine/src/test/resources/mapping/values.test
b/modules/sql-engine/src/test/resources/mapping/values.test
new file mode 100644
index 0000000000..fe3cb632b5
--- /dev/null
+++ b/modules/sql-engine/src/test/resources/mapping/values.test
@@ -0,0 +1,17 @@
+N1
+SELECT 1
+---
+Fragment#0 root
+ executionNodes: [N1]
+ tree:
+ IgniteValues
+---
+
+N2
+SELECT 1
+---
+Fragment#0 root
+ executionNodes: [N2]
+ tree:
+ IgniteValues
+---