This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fee7473e864 IGNITE-25600 Sql. FragmentMappingTest need to have a
possibility to test cases with mapOnBackup=false (#6010)
fee7473e864 is described below
commit fee7473e864b970b1a83dfb6629f6ed454a3f9ab
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Thu Jun 12 13:25:16 2025 +0300
IGNITE-25600 Sql. FragmentMappingTest need to have a possibility to test
cases with mapOnBackup=false (#6010)
---
.../sql/engine/ItSqlMultiStatementTest.java | 32 ---------
.../engine/exec/mapping/FragmentMappingTest.java | 12 +++-
.../sql/engine/exec/mapping/MappingTestRunner.java | 38 +++++++---
.../sql/engine/framework/TestBuilders.java | 12 +++-
.../resources/mapping/test_backup_mapping.test | 83 ++++++++++++++++++++++
5 files changed, 133 insertions(+), 44 deletions(-)
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
index d428837d381..794ba820e95 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
-import static org.apache.ignite.internal.sql.engine.util.QueryChecker.matches;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
@@ -30,7 +29,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Iterator;
import java.util.List;
@@ -38,7 +36,6 @@ import java.util.NoSuchElementException;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -88,35 +85,6 @@ public class ItSqlMultiStatementTest extends
BaseSqlMultiStatementTest {
.check();
}
- // TODO: https://issues.apache.org/jira/browse/IGNITE-25600 move into
FragmentMappingTest
- @Test
- void testExplainMappingWithDifferentTx() {
- assertTrue(initialNodes() > 1);
-
- sql("CREATE ZONE Z1 (partitions 10, replicas " + initialNodes() + ")
storage profiles ['default']");
- sql("CREATE TABLE t(id INT PRIMARY KEY, col1 INT) ZONE Z1");
-
- InternalTransaction tx = (InternalTransaction) igniteTx().begin(new
TransactionOptions().readOnly(true));
-
- // rely on fact that all partition replicas will be present on every
node
- // expect single node here, like: exchangeSourceNodes: {1=[node_name]}
- assertQuery(tx, "EXPLAIN MAPPING FOR SELECT * FROM t")
- .matches(matches(".*exchangeSourceNodes:
\\{1=\\[[^,]*\\]\\}.*"))
- .check();
-
- tx.commit();
-
- tx = (InternalTransaction) igniteTx().begin(new
TransactionOptions().readOnly(false));
-
- // expect multiple nodes, like: exchangeSourceNodes: {1=[node_name1,
node_name2]}
- // sequential call the same query also check mapping cache correctness
- assertQuery(tx, "EXPLAIN MAPPING FOR SELECT * FROM t")
- .matches(matches(".*exchangeSourceNodes:
\\{1=\\[.*,+.*\\]\\}.*"))
- .check();
-
- tx.commit();
- }
-
/** Checks single statement execution using multi-statement API. */
@Test
void singleStatementQuery() {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
index fb8d61a3cc8..3450a98f612 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMappingTest.java
@@ -28,8 +28,10 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -71,6 +73,10 @@ import org.junit.jupiter.api.Test;
* addTable("T1", "N1");
* addTable("T2", "N2", "N3");
*
+ * // Near semantic defines table T1 with primary partitions on nodes N0
and N1
+ * // together with backups placed on N1 and N2.
+ * addTable("T1", List.of(List.of("N0", "N1"), List.of("N1", "N2")));
+ *
* // Adds table with identity(0) distribution. Can be used to mimic node
system views.
* addTableIdent("NT1", "N1");
*
@@ -268,6 +274,8 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
addTable("T2", List.of(List.of("N0", "N1", "N2")));
addTable("T3", List.of(List.of("N1", "N2")));
+ addTable("T4", List.of(List.of("N0", "N1"), List.of("N2", "N1")));
+
testRunner.runTest(this::initSchema, "test_backup_mapping.test");
}
@@ -297,7 +305,7 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
String tableName = name;
for (List<String> partitionNodes : assignments) {
- tableName = formatName(tableName, new TreeSet<>(partitionNodes));
+ tableName = formatName(tableName, new
LinkedHashSet<>(partitionNodes));
}
tables.put(
@@ -438,7 +446,7 @@ public class FragmentMappingTest extends
AbstractPlannerTest {
}
}
- private static String formatName(String name, TreeSet<String> nodeNames) {
+ private static String formatName(String name, Set<String> nodeNames) {
return name + "_" + String.join("", nodeNames);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
index f9a30d8733a..2dd412de50d 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingTestRunner.java
@@ -62,8 +62,8 @@ import org.jetbrains.annotations.TestOnly;
* <pre>
* #<optional multiline description>
* <node_name>
- * ---
* <SQL statement (single line or multiline)>
+ * [READ_FROM_PRIMARY] - optional pragma, used where primary reads are
only possible.
* ---
* <expected fragments>
* ---
@@ -177,7 +177,8 @@ final class MappingTestRunner {
MultiStepPlan multiStepPlan = new MultiStepPlan(new
PlanId(UUID.randomUUID(), 1), sqlQueryType, rel,
resultSetMetadata, parameterMetadata,
schema.catalogVersion(), null);
- String actualText = produceMapping(testDef.nodeName,
executionDistributionProvider, snapshot, multiStepPlan);
+ String actualText =
+ produceMapping(testDef.nodeName,
executionDistributionProvider, snapshot, multiStepPlan, testDef.primaryRead);
actualResults.add(actualText);
}
@@ -198,7 +199,8 @@ final class MappingTestRunner {
String nodeName,
ExecutionDistributionProvider executionDistributionProvider,
LogicalTopologySnapshot snapshot,
- MultiStepPlan plan
+ MultiStepPlan plan,
+ boolean readFromPrimaryOnly
) {
PartitionPruner partitionPruner = new PartitionPrunerImpl();
@@ -216,7 +218,8 @@ final class MappingTestRunner {
List<MappedFragment> mappedFragments;
try {
- mappedFragments = await(mappingService.map(plan,
MappingParameters.EMPTY));
+ mappedFragments = await(mappingService.map(plan,
readFromPrimaryOnly
+ ? MappingParameters.EMPTY :
MappingParameters.MAP_ON_BACKUPS));
} catch (Exception e) {
String explanation = System.lineSeparator()
+ RelOptUtil.toString(plan.getRel())
@@ -247,12 +250,15 @@ final class MappingTestRunner {
final String result;
- TestCaseDef(int lineNo, @Nullable String description, String nodeName,
String sql, String res) {
+ final boolean primaryRead;
+
+ TestCaseDef(int lineNo, @Nullable String description, String nodeName,
String sql, String res, boolean primaryRead) {
this.lineNo = lineNo;
this.description = description;
this.nodeName = nodeName;
this.sql = sql;
this.result = res;
+ this.primaryRead = primaryRead;
}
}
@@ -313,6 +319,7 @@ final class MappingTestRunner {
.append(System.lineSeparator())
.append(testCaseDef.sql)
.append(System.lineSeparator())
+ .append(testCaseDef.primaryRead ? (Parser.PRIMARY_READ +
System.lineSeparator()) : "")
.append("---")
.append(System.lineSeparator())
.append(result)
@@ -324,12 +331,14 @@ final class MappingTestRunner {
enum ParseState {
NODE_NAME,
SQL_STMT,
+ READ_FROM_PRIMARY,
FRAGMENTS
}
private static class Parser {
private static final String DELIMITER = "---";
private static final String COMMENT = "#";
+ private static final String PRIMARY_READ = "READ_FROM_PRIMARY";
private ParseState nextState = ParseState.NODE_NAME;
private int testCaseLineNo;
private int numFragments;
@@ -338,6 +347,7 @@ final class MappingTestRunner {
private String nodeName;
private final StringBuilder sqlStmt = new StringBuilder();
private final StringBuilder result = new StringBuilder();
+ private boolean primaryRead;
private void resetState() {
nextState = ParseState.NODE_NAME;
@@ -346,6 +356,7 @@ final class MappingTestRunner {
sqlStmt.setLength(0);
result.setLength(0);
description.setLength(0);
+ primaryRead = false;
}
List<TestCaseDef> parse(Path path) throws IOException {
@@ -389,15 +400,24 @@ final class MappingTestRunner {
throw reportError(fileName, lineNo, "Comments are
not allowed in sql statement text", line);
}
- if (!line.stripLeading().startsWith(DELIMITER)) {
+ String line0 = line.stripLeading();
+
+ if (line0.startsWith(DELIMITER)) {
+ nextState = ParseState.FRAGMENTS;
+ } else if (line0.startsWith(PRIMARY_READ)) {
+ nextState = ParseState.READ_FROM_PRIMARY;
+ } else {
if (sqlStmt.length() > 0) {
sqlStmt.append(System.lineSeparator());
}
sqlStmt.append(line);
- } else {
- nextState = ParseState.FRAGMENTS;
}
+
+ break;
+ case READ_FROM_PRIMARY:
+ primaryRead = true;
+ nextState = ParseState.FRAGMENTS;
break;
case FRAGMENTS:
if (line.startsWith(COMMENT)) {
@@ -458,7 +478,7 @@ final class MappingTestRunner {
String res = result.toString().stripTrailing();
String desc = description.length() > 0 ? description.toString() :
null;
- return new TestCaseDef(testCaseLineNo, desc, nodeName, sql, res);
+ return new TestCaseDef(testCaseLineNo, desc, nodeName, sql, res,
primaryRead);
}
private static RuntimeException reportError(String fileName, int
lineNo, String message, Object... params) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 7e718d20783..69b8d65de44 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -1779,7 +1779,17 @@ public class TestBuilders {
throw new AssertionError("Assignments are not
configured for table " + tableName);
}
- return assignments;
+ if (includeBackups) {
+ return assignments;
+ } else {
+ List<List<String>> primaryAssignments = new
ArrayList<>();
+
+ for (List<String> assign : assignments) {
+ primaryAssignments.add(List.of(assign.get(0)));
+ }
+
+ return primaryAssignments;
+ }
};
return new TestExecutionDistributionProvider(
diff --git
a/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
b/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
index a714bae97d1..35d93aba95f 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_backup_mapping.test
@@ -1,3 +1,44 @@
+# simple select and root is colocated with first partition
+N0
+SELECT * FROM t4_n0n1_n2n1
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N0, N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N0, N1]
+ tables: [T4_N0N1_N2N1]
+ partitions: {N0=[0:2], N1=[1:2]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T4_N0N1_N2N1, source=2, partitions=2,
distribution=affinity[table: T4_N0N1_N2N1, columns: [ID]])
+---
+# simple select and root is colocated with first partition
+N0
+SELECT * FROM t4_n0n1_n2n1
+READ_FROM_PRIMARY
+---
+Fragment#0 root
+ executionNodes: [N0]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N0, N2]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N0]
+ executionNodes: [N0, N2]
+ tables: [T4_N0N1_N2N1]
+ partitions: {N0=[0:2], N2=[1:2]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T4_N0N1_N2N1, source=2, partitions=2,
distribution=affinity[table: T4_N0N1_N2N1, columns: [ID]])
+---
# root of the query is colocated with first partition
N0
SELECT * FROM t1_n0n1_n1n2
@@ -38,6 +79,27 @@ Fragment#1
Sender(targetFragment=0, exchange=1, distribution=single)
TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
---
+# root of the query is colocated with both partitions
+N1
+SELECT * FROM t1_n0n1_n1n2
+READ_FROM_PRIMARY
+---
+Fragment#0 root
+ executionNodes: [N1]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N0, N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N1]
+ executionNodes: [N0, N1]
+ tables: [T1_N0N1_N1N2]
+ partitions: {N0=[0:2], N1=[1:2]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
# root of the query is colocated with second partition
N2
SELECT * FROM t1_n0n1_n1n2
@@ -58,6 +120,27 @@ Fragment#1
Sender(targetFragment=0, exchange=1, distribution=single)
TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
---
+# root of the query is colocated with second partition
+N2
+SELECT * FROM t1_n0n1_n1n2
+READ_FROM_PRIMARY
+---
+Fragment#0 root
+ executionNodes: [N2]
+ remoteFragments: [1]
+ exchangeSourceNodes: {1=[N0, N1]}
+ tree:
+ Receiver(sourceFragment=1, exchange=1, distribution=single)
+
+Fragment#1
+ targetNodes: [N2]
+ executionNodes: [N0, N1]
+ tables: [T1_N0N1_N1N2]
+ partitions: {N0=[0:2], N1=[1:2]}
+ tree:
+ Sender(targetFragment=0, exchange=1, distribution=single)
+ TableScan(name=PUBLIC.T1_N0N1_N1N2, source=2, partitions=2,
distribution=affinity[table: T1_N0N1_N1N2, columns: [ID]])
+---
# although root is colocated with one of the partitions, algorithm prefers to
colocate join stage
N0
SELECT * FROM t2_n0n1n2 as t1, t3_n1n2 as t2 WHERE t1.id = t2.id AND t1.c1 = 5