This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/TableModelGrammar_0627 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 50723025f818b1f0097e3361c09b4e4e29df057a Author: Beyyes <[email protected]> AuthorDate: Tue Jul 2 11:14:22 2024 +0800 perfect the impl of DistributedPlanGenerator --- .../distribute/DistributedPlanGenerator.java | 169 ++++++++++++++++----- .../optimizations/PushPredicateIntoTableScan.java | 2 +- 2 files changed, 134 insertions(+), 37 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java index 2cfced93aae..49f0b1257eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Abst import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; @@ -32,9 +33,14 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import java.util.ArrayList; import java.util.Collections; @@ -43,14 +49,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static com.google.common.collect.ImmutableList.toImmutableList; import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction; public class DistributedPlanGenerator extends PlanVisitor<List<PlanNode>, DistributedPlanGenerator.PlanContext> { private final MPPQueryContext queryContext; private final Analysis analysis; + Map<PlanNodeId, OrderingScheme> planNodeOrderingSchemeMap = new HashMap<>(); public DistributedPlanGenerator(MPPQueryContext queryContext, Analysis analysis) { this.queryContext = queryContext; @@ -79,6 +88,116 @@ public class DistributedPlanGenerator return Collections.singletonList(newNode); } + @Override + public List<PlanNode> visitOutput(OutputNode outputNode, PlanContext context) { + context.expectedOrderingScheme = + new OrderingScheme( + outputNode.getOutputSymbols(), + outputNode.getOutputSymbols().stream() + .collect(Collectors.toMap(symbol -> symbol, symbol -> SortOrder.ASC_NULLS_LAST))); + + List<PlanNode> childrenNodes = outputNode.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + outputNode.setChild(childrenNodes.get(0)); + return Collections.singletonList(outputNode); + } + + return connectViaMergeSort(outputNode, childrenNodes); + } + + @Override + public List<PlanNode> visitLimit(LimitNode limitNode, PlanContext context) { + List<PlanNode> childrenNodes = limitNode.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + limitNode.setChild(childrenNodes.get(0)); + return Collections.singletonList(limitNode); + } + + return connectViaMergeSort(limitNode, childrenNodes); + } + + @Override + public List<PlanNode> visitOffset(OffsetNode offsetNode, PlanContext context) { + List<PlanNode> childrenNodes = offsetNode.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + offsetNode.setChild(childrenNodes.get(0)); + return Collections.singletonList(offsetNode); + } + + return connectViaMergeSort(offsetNode, childrenNodes); + } + + @Override + public List<PlanNode> visitProject(ProjectNode projectNode, PlanContext context) { + List<PlanNode> childrenNodes = projectNode.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + projectNode.setChild(childrenNodes.get(0)); + return Collections.singletonList(projectNode); + } + + for (Expression expression : projectNode.getAssignments().getMap().values()) { + if (containsDiffFunction(expression)) { + return connectViaMergeSort(projectNode, childrenNodes); + } + } + + return childrenNodes.stream() + .map( + child -> + new ProjectNode( + queryContext.getQueryId().genPlanNodeId(), child, projectNode.getAssignments())) + .collect(Collectors.toList()); + } + + @Override + public List<PlanNode> visitSort(SortNode sortNode, PlanContext context) { + context.expectedOrderingScheme = sortNode.getOrderingScheme(); + + List<PlanNode> childrenNodes = sortNode.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + sortNode.setChild(childrenNodes.get(0)); + return Collections.singletonList(sortNode); + } + + MergeSortNode mergeSortNode = + new MergeSortNode( + queryContext.getQueryId().genPlanNodeId(), + sortNode.getOrderingScheme(), + sortNode.getOutputSymbols()); + for (PlanNode child : childrenNodes) { + SortNode subSortNode = + new SortNode( + queryContext.getQueryId().genPlanNodeId(), + child, + sortNode.getOrderingScheme(), + false); + mergeSortNode.addChild(subSortNode); + planNodeOrderingSchemeMap.put(subSortNode.getPlanNodeId(), sortNode.getOrderingScheme()); + } + planNodeOrderingSchemeMap.put(mergeSortNode.getPlanNodeId(), sortNode.getOrderingScheme()); + return Collections.singletonList(mergeSortNode); + } + + @Override + public List<PlanNode> visitFilter(FilterNode filterNode, PlanContext context) { + List<PlanNode> childrenNodes = filterNode.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + filterNode.setChild(childrenNodes.get(0)); + return Collections.singletonList(filterNode); + } + + if (containsDiffFunction(filterNode.getPredicate())) { + return connectViaMergeSort(filterNode, childrenNodes); + } + + return childrenNodes.stream() + .map( + child -> + new FilterNode( + queryContext.getQueryId().genPlanNodeId(), child, filterNode.getPredicate())) + .collect(Collectors.toList()); + } + @Override public List<PlanNode> visitTableScan(TableScanNode node, PlanContext context) { @@ -154,42 +273,19 @@ public class DistributedPlanGenerator } } - @Override - public List<PlanNode> visitFilter(FilterNode filterNode, PlanContext context) { - List<PlanNode> childrenNodes = filterNode.getChild().accept(this, context); - if (childrenNodes.size() == 1) { - filterNode.setChild(childrenNodes.get(0)); - return Collections.singletonList(filterNode); - } - - List<PlanNode> result = new ArrayList<>(); - for (PlanNode child : childrenNodes) { - FilterNode newFilterNode = - new FilterNode( - queryContext.getQueryId().genPlanNodeId(), child, filterNode.getPredicate()); - result.add(newFilterNode); - } - - return result; - } - - @Override - public List<PlanNode> visitProject(ProjectNode projectNode, PlanContext context) { - List<PlanNode> childrenNodes = projectNode.getChild().accept(this, context); - if (childrenNodes.size() == 1) { - projectNode.setChild(childrenNodes.get(0)); - return Collections.singletonList(projectNode); - } - - List<PlanNode> result = new ArrayList<>(); - for (PlanNode child : childrenNodes) { - ProjectNode newProjectNode = - new ProjectNode( - queryContext.getQueryId().genPlanNodeId(), child, projectNode.getAssignments()); - result.add(newProjectNode); - } - - return result; + private List<PlanNode> connectViaMergeSort( + SingleChildProcessNode node, List<PlanNode> childrenNodes) { + OrderingScheme childrenOrderingScheme = + planNodeOrderingSchemeMap.get(childrenNodes.get(0).getPlanNodeId()); + MergeSortNode mergeSortNode = + new MergeSortNode( + queryContext.getQueryId().genPlanNodeId(), + childrenOrderingScheme, + node.getOutputSymbols()); + childrenNodes.forEach(mergeSortNode::addChild); + node.setChild(mergeSortNode); + planNodeOrderingSchemeMap.put(node.getPlanNodeId(), childrenOrderingScheme); + return Collections.singletonList(node); } // ------------------- schema related interface --------------------------------------------- @@ -267,6 +363,7 @@ public class DistributedPlanGenerator public static class PlanContext { final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; boolean hasExchangeNode = false; + OrderingScheme expectedOrderingScheme; public PlanContext() { this.nodeDistributionMap = new HashMap<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 58f8130a960..96d92ad66bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -310,7 +310,7 @@ public class PushPredicateIntoTableScan implements RelationalPlanOptimizer { } } - static boolean containsDiffFunction(Expression expression) { + public static boolean containsDiffFunction(Expression expression) { if (expression instanceof FunctionCall && "diff".equalsIgnoreCase(((FunctionCall) expression).getName().toString())) { return true;
