This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 037ef649ad6 Fix sort elimination when table join self in TableModel
037ef649ad6 is described below
commit 037ef649ad6b78eb76e6ea536c5a0fe4a84d3fc6
Author: Weihao Li <[email protected]>
AuthorDate: Wed Jun 25 08:47:45 2025 +0800
Fix sort elimination when table join self in TableModel
---
.../distribute/TableDistributedPlanGenerator.java | 7 ++-
.../optimizations/TransformSortToStreamSort.java | 33 +++++++++++---
.../plan/relational/analyzer/JoinTest.java | 52 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7f43fc7446b..e8f56070d07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -1457,6 +1457,7 @@ public class TableDistributedPlanGenerator
final Optional<OrderingScheme> newOrderingScheme =
tableScanOrderingSchema(
analysis.getTableColumnSchema(deviceTableScanNode.getQualifiedObjectName()),
+ deviceTableScanNode.getAssignments(),
newOrderingSymbols,
newSortOrders,
lastIsTimeRelated,
@@ -1489,6 +1490,7 @@ public class TableDistributedPlanGenerator
private Optional<OrderingScheme> tableScanOrderingSchema(
Map<Symbol, ColumnSchema> tableColumnSchema,
+ Map<Symbol, ColumnSchema> nodeColumnSchema,
List<Symbol> newOrderingSymbols,
List<SortOrder> newSortOrders,
boolean lastIsTimeRelated,
@@ -1513,7 +1515,10 @@ public class TableDistributedPlanGenerator
.boxed()
.collect(Collectors.toMap(newOrderingSymbols::get,
newSortOrders::get)));
if (isOrderByAllIdsAndTime(
- tableColumnSchema, orderingScheme, size - 2)) { // all id columns
included
+ tableColumnSchema,
+ nodeColumnSchema,
+ orderingScheme,
+ size - 2)) { // all id columns included
return Optional.of(
new OrderingScheme(
newOrderingSymbols,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index 02626c18e20..38d3d287853 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -107,7 +107,11 @@ public class TransformSortToStreamSort implements
PlanOptimizer {
if (streamSortIndex >= 0) {
boolean orderByAllIdsAndTime =
- isOrderByAllIdsAndTime(tableColumnSchema, orderingScheme,
streamSortIndex);
+ isOrderByAllIdsAndTime(
+ tableColumnSchema,
+ deviceTableScanNode.getAssignments(),
+ orderingScheme,
+ streamSortIndex);
return new StreamSortNode(
queryContext.getQueryId().genPlanNodeId(),
@@ -152,18 +156,33 @@ public class TransformSortToStreamSort implements
PlanOptimizer {
}
}
+ /**
+ * @param tableColumnSchema The ColumnSchema of original Table, but the
symbol name maybe rewrite
+ * by Join
+ * @param nodeColumnSchema The ColumnSchema of current node, which has been
column pruned
+ */
public static boolean isOrderByAllIdsAndTime(
Map<Symbol, ColumnSchema> tableColumnSchema,
+ Map<Symbol, ColumnSchema> nodeColumnSchema,
OrderingScheme orderingScheme,
int streamSortIndex) {
- for (Map.Entry<Symbol, ColumnSchema> entry : tableColumnSchema.entrySet())
{
- if (entry.getValue().getColumnCategory() == TsTableColumnCategory.TAG
- && !orderingScheme.getOrderings().containsKey(entry.getKey())) {
- return false;
+ int tagCount = 0;
+ for (ColumnSchema columnSchema : tableColumnSchema.values()) {
+ if (columnSchema.getColumnCategory() == TsTableColumnCategory.TAG) {
+ tagCount++;
}
}
- return orderingScheme.getOrderings().size() == streamSortIndex + 1
- || isTimeColumn(orderingScheme.getOrderBy().get(streamSortIndex + 1),
tableColumnSchema);
+
+ for (Symbol orderBy : orderingScheme.getOrderBy()) {
+ ColumnSchema columnSchema = nodeColumnSchema.get(orderBy);
+ if (columnSchema != null && columnSchema.getColumnCategory() ==
TsTableColumnCategory.TAG) {
+ tagCount--;
+ }
+ }
+ return tagCount == 0
+ && (orderingScheme.getOrderings().size() == streamSortIndex + 1
+ || isTimeColumn(
+ orderingScheme.getOrderBy().get(streamSortIndex + 1),
tableColumnSchema));
}
private static class Context {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
index eb5091bc878..df5ab9b1829 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
@@ -759,4 +759,56 @@ public class JoinTest {
assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(),
exchange(), exchange()));
}
+
+ @Test
+ public void joinSortEliminationTest() {
+ PlanTester planTester = new PlanTester();
+ sql = "select * from table1 t1 " + "left join table1 t2 using (tag1, tag2,
tag3, time)";
+ logicalQueryPlan = planTester.createPlan(sql);
+ assertPlan(
+ logicalQueryPlan.getRootNode(),
+ output(join(sort(tableScan("testdb.table1")),
sort(tableScan("testdb.table1")))));
+
+ assertPlan(planTester.getFragmentPlan(0), output(join(exchange(),
exchange())));
+
+ assertPlan(planTester.getFragmentPlan(1), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(2), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(3), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(4), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(5), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(6), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(7), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(8), tableScan("testdb.table1"));
+
+ sql = "select * from table1 t1 " + "full join table1 t2 using (tag1, tag2,
tag3, time)";
+ logicalQueryPlan = planTester.createPlan(sql);
+ assertPlan(
+ logicalQueryPlan.getRootNode(),
+ output(project(join(sort(tableScan("testdb.table1")),
sort(tableScan("testdb.table1"))))));
+
+ assertPlan(planTester.getFragmentPlan(0), output(project(join(exchange(),
exchange()))));
+
+ assertPlan(planTester.getFragmentPlan(1), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(2), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(3), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(4), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(5), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(6), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(7), tableScan("testdb.table1"));
+
+ assertPlan(planTester.getFragmentPlan(8), tableScan("testdb.table1"));
+ }
}