This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6b38b5d5415 Optimize sort properties of JoinNode in case of JOIN USING
6b38b5d5415 is described below
commit 6b38b5d5415848d12a825a52116f39154445224b
Author: Weihao Li <[email protected]>
AuthorDate: Mon May 26 21:18:34 2025 +0800
Optimize sort properties of JoinNode in case of JOIN USING
---
.../distribute/TableDistributedPlanGenerator.java | 79 +++++++++++--
.../plan/relational/analyzer/JoinTest.java | 124 +++++++++++++++++++++
.../planner/assertions/PlanMatchPattern.java | 4 +
3 files changed, 200 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 095ab695ff6..0680de2873c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -84,6 +84,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -104,6 +105,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -231,10 +233,51 @@ public class TableDistributedPlanGenerator
ImmutableSet.copyOf(node.getOutputSymbols()).containsAll(childOrdering.getOrderBy());
}
if (childrenNodes.size() == 1) {
+ PlanNode child = childrenNodes.get(0);
if (containAllSortItem) {
nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
}
- node.setChild(childrenNodes.get(0));
+
+ // Now the join implement but CROSS is MergeSortJoin, so it can keep
order
+ if (child instanceof JoinNode) {
+ JoinNode joinNode = (JoinNode) child;
+
+ // We only process FULL Join here, other type will be processed in
visitJoinNode()
+ if (joinNode.getJoinType() == JoinNode.JoinType.FULL
+ && !joinNode.getAsofCriteria().isPresent()) {
+ Map<Symbol, Expression> assignmentsMap =
node.getAssignments().getMap();
+ // If these Coalesces are all appear in ProjectNode, the ProjectNode
is ordered
+ int coalescesSize = joinNode.getCriteria().size();
+
+ // We use map to memorize Symbol of according Coalesce, use linked
to avoid twice query of
+ // this Map when constructOrderingSchema
+ Map<Expression, Symbol> orderedCoalesces = new
LinkedHashMap<>(coalescesSize);
+ for (JoinNode.EquiJoinClause clause : joinNode.getCriteria()) {
+ orderedCoalesces.put(
+ new CoalesceExpression(
+ ImmutableList.of(
+ clause.getLeft().toSymbolReference(),
+ clause.getRight().toSymbolReference())),
+ null);
+ }
+
+ for (Map.Entry<Symbol, Expression> assignment :
assignmentsMap.entrySet()) {
+ if (orderedCoalesces.containsKey(assignment.getValue())) {
+ coalescesSize--;
+ orderedCoalesces.put(assignment.getValue(), assignment.getKey());
+ }
+ }
+
+ // All Coalesces appear in ProjectNode
+ if (coalescesSize == 0) {
+ nodeOrderingMap.put(
+ node.getPlanNodeId(),
+ constructOrderingSchema(new
ArrayList<>(orderedCoalesces.values())));
+ }
+ }
+ }
+
+ node.setChild(child);
return Collections.singletonList(node);
}
@@ -481,14 +524,36 @@ public class TableDistributedPlanGenerator
rightChildrenNodes.size() == 1,
"The size of right children node of JoinNode should be 1");
}
+
+ OrderingScheme leftChildOrdering =
nodeOrderingMap.get(node.getLeftChild().getPlanNodeId());
+ OrderingScheme rightChildOrdering =
nodeOrderingMap.get(node.getRightChild().getPlanNodeId());
+
// For CrossJoinNode, we need to merge children nodes(It's safe for other
JoinNodes here since
// the size of their children is always 1.)
- node.setLeftChild(
- mergeChildrenViaCollectOrMergeSort(
- nodeOrderingMap.get(node.getLeftChild().getPlanNodeId()),
leftChildrenNodes));
- node.setRightChild(
- mergeChildrenViaCollectOrMergeSort(
- nodeOrderingMap.get(node.getRightChild().getPlanNodeId()),
rightChildrenNodes));
+ node.setLeftChild(mergeChildrenViaCollectOrMergeSort(leftChildOrdering,
leftChildrenNodes));
+ node.setRightChild(mergeChildrenViaCollectOrMergeSort(rightChildOrdering,
rightChildrenNodes));
+
+ // Now the join implement but CROSS is MergeSortJoin, so it can keep order
+ if (!node.isCrossJoin() && !node.getAsofCriteria().isPresent()) {
+ switch (node.getJoinType()) {
+ case FULL:
+ // If join type is FULL Join, we will process SortProperties in
ProjectNode above this
+ // node.
+ break;
+ case INNER:
+ case LEFT:
+ if (ImmutableSet.copyOf(node.getLeftOutputSymbols())
+ .containsAll(leftChildOrdering.getOrderBy())) {
+ nodeOrderingMap.put(node.getPlanNodeId(), leftChildOrdering);
+ }
+ break;
+ case RIGHT:
+ throw new IllegalStateException(
+ "RIGHT Join should be transformed to LEFT Join in previous
process");
+ default:
+ throw new UnsupportedOperationException("Unsupported Join Type: " +
node.getJoinType());
+ }
+ }
return Collections.singletonList(node);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
index 2b0806c857c..eb5091bc878 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
@@ -71,8 +71,10 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationFunction;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationTableScan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.join;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
@@ -635,4 +637,126 @@ public class JoinTest {
+ "ON t1.time = t2.time ORDER BY t1.tag1 OFFSET 3 LIMIT 6",
false);
}
+
+ @Test
+ public void testJoinSortProperties() {
+ // FULL JOIN
+ PlanTester planTester = new PlanTester();
+ sql =
+ "select * from table1 t1 "
+ + "full join table1 t2 using (time, s1)"
+ + "full join table1 t3 using (time, s1)";
+ logicalQueryPlan = planTester.createPlan(sql);
+ assertPlan(
+ logicalQueryPlan.getRootNode(),
+ output(
+ project(
+ join(
+ sort(
+ project(
+ join(
+ sort(tableScan("testdb.table1")),
+ sort(tableScan("testdb.table1"))))),
+ sort(tableScan("testdb.table1"))))));
+
+ assertPlan(planTester.getFragmentPlan(0), output(project(join(exchange(),
exchange()))));
+
+ // the sort node above JoinNode has been eliminated
+ assertPlan(planTester.getFragmentPlan(1), project(join(exchange(),
exchange())));
+
+ assertPlan(planTester.getFragmentPlan(2), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(3),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(4),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(5),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(7),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(8),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(9),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(),
exchange(), exchange()));
+
+ // LEFT
+ sql =
+ "select * from table1 t1 "
+ + "left join table1 t2 using (time, s1)"
+ + "left join table1 t3 using (time, s1)";
+ assertLeftOrInner(planTester);
+
+ // INNER JOIN
+ sql =
+ "select * from table1 t1 "
+ + "inner join table1 t2 using (time, s1)"
+ + "inner join table1 t3 using (time, s1)";
+ assertLeftOrInner(planTester);
+
+ // RIGHT JOIN
+ sql =
+ "select * from table1 t1 "
+ + "right join table1 t2 using (time, s1)"
+ + "right join table1 t3 using (time, s1)";
+ logicalQueryPlan = planTester.createPlan(sql);
+ assertPlan(
+ logicalQueryPlan.getRootNode(),
+ output(
+ join(
+ sort(tableScan("testdb.table1")),
+ sort(join(sort(tableScan("testdb.table1")),
sort(tableScan("testdb.table1")))))));
+
+ assertPlan(planTester.getFragmentPlan(0), output(join(exchange(),
exchange())));
+
+ assertPlan(planTester.getFragmentPlan(1), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(2),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(3),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(4),
sort(tableScan("testdb.table1")));
+
+ // the sort node above JoinNode has been eliminated
+ assertPlan(planTester.getFragmentPlan(5), join(exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(),
exchange(), exchange()));
+ }
+
+ private void assertLeftOrInner(PlanTester planTester) {
+ logicalQueryPlan = planTester.createPlan(sql);
+ assertPlan(
+ logicalQueryPlan.getRootNode(),
+ output(
+ join(
+ sort(join(sort(tableScan("testdb.table1")),
sort(tableScan("testdb.table1")))),
+ sort(tableScan("testdb.table1")))));
+
+ assertPlan(planTester.getFragmentPlan(0), output(join(exchange(),
exchange())));
+
+ // the sort node above JoinNode has been eliminated
+ assertPlan(planTester.getFragmentPlan(1), join(exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(2), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(3),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(4),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(5),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(6), mergeSort(exchange(),
exchange(), exchange()));
+
+ assertPlan(planTester.getFragmentPlan(7),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(8),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(9),
sort(tableScan("testdb.table1")));
+
+ assertPlan(planTester.getFragmentPlan(10), mergeSort(exchange(),
exchange(), exchange()));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 93662f6082b..66bccb425a4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -516,6 +516,10 @@ public final class PlanMatchPattern {
return builder.build();
}*/
+ public static PlanMatchPattern join(PlanMatchPattern left, PlanMatchPattern
right) {
+ return node(JoinNode.class, left, right);
+ }
+
public static PlanMatchPattern join(
JoinNode.JoinType type, Consumer<JoinMatcher.Builder> handler) {
JoinMatcher.Builder builder = new JoinMatcher.Builder(type);