This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/TableModelGrammar_0627
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 50723025f818b1f0097e3361c09b4e4e29df057a
Author: Beyyes <[email protected]>
AuthorDate: Tue Jul 2 11:14:22 2024 +0800

    perfect the impl of DistributedPlanGenerator
---
 .../distribute/DistributedPlanGenerator.java       | 169 ++++++++++++++++-----
 .../optimizations/PushPredicateIntoTableScan.java  |   2 +-
 2 files changed, 134 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 2cfced93aae..49f0b1257eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Abst
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
@@ -32,9 +33,14 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -43,14 +49,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
 
 public class DistributedPlanGenerator
     extends PlanVisitor<List<PlanNode>, DistributedPlanGenerator.PlanContext> {
   private final MPPQueryContext queryContext;
   private final Analysis analysis;
+  Map<PlanNodeId, OrderingScheme> planNodeOrderingSchemeMap = new HashMap<>();
 
   public DistributedPlanGenerator(MPPQueryContext queryContext, Analysis 
analysis) {
     this.queryContext = queryContext;
@@ -79,6 +88,116 @@ public class DistributedPlanGenerator
     return Collections.singletonList(newNode);
   }
 
+  @Override
+  public List<PlanNode> visitOutput(OutputNode outputNode, PlanContext 
context) {
+    context.expectedOrderingScheme =
+        new OrderingScheme(
+            outputNode.getOutputSymbols(),
+            outputNode.getOutputSymbols().stream()
+                .collect(Collectors.toMap(symbol -> symbol, symbol -> 
SortOrder.ASC_NULLS_LAST)));
+
+    List<PlanNode> childrenNodes = outputNode.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      outputNode.setChild(childrenNodes.get(0));
+      return Collections.singletonList(outputNode);
+    }
+
+    return connectViaMergeSort(outputNode, childrenNodes);
+  }
+
+  @Override
+  public List<PlanNode> visitLimit(LimitNode limitNode, PlanContext context) {
+    List<PlanNode> childrenNodes = limitNode.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      limitNode.setChild(childrenNodes.get(0));
+      return Collections.singletonList(limitNode);
+    }
+
+    return connectViaMergeSort(limitNode, childrenNodes);
+  }
+
+  @Override
+  public List<PlanNode> visitOffset(OffsetNode offsetNode, PlanContext 
context) {
+    List<PlanNode> childrenNodes = offsetNode.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      offsetNode.setChild(childrenNodes.get(0));
+      return Collections.singletonList(offsetNode);
+    }
+
+    return connectViaMergeSort(offsetNode, childrenNodes);
+  }
+
+  @Override
+  public List<PlanNode> visitProject(ProjectNode projectNode, PlanContext 
context) {
+    List<PlanNode> childrenNodes = projectNode.getChild().accept(this, 
context);
+    if (childrenNodes.size() == 1) {
+      projectNode.setChild(childrenNodes.get(0));
+      return Collections.singletonList(projectNode);
+    }
+
+    for (Expression expression : 
projectNode.getAssignments().getMap().values()) {
+      if (containsDiffFunction(expression)) {
+        return connectViaMergeSort(projectNode, childrenNodes);
+      }
+    }
+
+    return childrenNodes.stream()
+        .map(
+            child ->
+                new ProjectNode(
+                    queryContext.getQueryId().genPlanNodeId(), child, 
projectNode.getAssignments()))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<PlanNode> visitSort(SortNode sortNode, PlanContext context) {
+    context.expectedOrderingScheme = sortNode.getOrderingScheme();
+
+    List<PlanNode> childrenNodes = sortNode.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      sortNode.setChild(childrenNodes.get(0));
+      return Collections.singletonList(sortNode);
+    }
+
+    MergeSortNode mergeSortNode =
+        new MergeSortNode(
+            queryContext.getQueryId().genPlanNodeId(),
+            sortNode.getOrderingScheme(),
+            sortNode.getOutputSymbols());
+    for (PlanNode child : childrenNodes) {
+      SortNode subSortNode =
+          new SortNode(
+              queryContext.getQueryId().genPlanNodeId(),
+              child,
+              sortNode.getOrderingScheme(),
+              false);
+      mergeSortNode.addChild(subSortNode);
+      planNodeOrderingSchemeMap.put(subSortNode.getPlanNodeId(), 
sortNode.getOrderingScheme());
+    }
+    planNodeOrderingSchemeMap.put(mergeSortNode.getPlanNodeId(), 
sortNode.getOrderingScheme());
+    return Collections.singletonList(mergeSortNode);
+  }
+
+  @Override
+  public List<PlanNode> visitFilter(FilterNode filterNode, PlanContext 
context) {
+    List<PlanNode> childrenNodes = filterNode.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      filterNode.setChild(childrenNodes.get(0));
+      return Collections.singletonList(filterNode);
+    }
+
+    if (containsDiffFunction(filterNode.getPredicate())) {
+      return connectViaMergeSort(filterNode, childrenNodes);
+    }
+
+    return childrenNodes.stream()
+        .map(
+            child ->
+                new FilterNode(
+                    queryContext.getQueryId().genPlanNodeId(), child, 
filterNode.getPredicate()))
+        .collect(Collectors.toList());
+  }
+
   @Override
   public List<PlanNode> visitTableScan(TableScanNode node, PlanContext 
context) {
 
@@ -154,42 +273,19 @@ public class DistributedPlanGenerator
     }
   }
 
-  @Override
-  public List<PlanNode> visitFilter(FilterNode filterNode, PlanContext 
context) {
-    List<PlanNode> childrenNodes = filterNode.getChild().accept(this, context);
-    if (childrenNodes.size() == 1) {
-      filterNode.setChild(childrenNodes.get(0));
-      return Collections.singletonList(filterNode);
-    }
-
-    List<PlanNode> result = new ArrayList<>();
-    for (PlanNode child : childrenNodes) {
-      FilterNode newFilterNode =
-          new FilterNode(
-              queryContext.getQueryId().genPlanNodeId(), child, 
filterNode.getPredicate());
-      result.add(newFilterNode);
-    }
-
-    return result;
-  }
-
-  @Override
-  public List<PlanNode> visitProject(ProjectNode projectNode, PlanContext 
context) {
-    List<PlanNode> childrenNodes = projectNode.getChild().accept(this, 
context);
-    if (childrenNodes.size() == 1) {
-      projectNode.setChild(childrenNodes.get(0));
-      return Collections.singletonList(projectNode);
-    }
-
-    List<PlanNode> result = new ArrayList<>();
-    for (PlanNode child : childrenNodes) {
-      ProjectNode newProjectNode =
-          new ProjectNode(
-              queryContext.getQueryId().genPlanNodeId(), child, 
projectNode.getAssignments());
-      result.add(newProjectNode);
-    }
-
-    return result;
+  private List<PlanNode> connectViaMergeSort(
+      SingleChildProcessNode node, List<PlanNode> childrenNodes) {
+    OrderingScheme childrenOrderingScheme =
+        planNodeOrderingSchemeMap.get(childrenNodes.get(0).getPlanNodeId());
+    MergeSortNode mergeSortNode =
+        new MergeSortNode(
+            queryContext.getQueryId().genPlanNodeId(),
+            childrenOrderingScheme,
+            node.getOutputSymbols());
+    childrenNodes.forEach(mergeSortNode::addChild);
+    node.setChild(mergeSortNode);
+    planNodeOrderingSchemeMap.put(node.getPlanNodeId(), 
childrenOrderingScheme);
+    return Collections.singletonList(node);
   }
 
   // ------------------- schema related interface 
---------------------------------------------
@@ -267,6 +363,7 @@ public class DistributedPlanGenerator
   public static class PlanContext {
     final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
     boolean hasExchangeNode = false;
+    OrderingScheme expectedOrderingScheme;
 
     public PlanContext() {
       this.nodeDistributionMap = new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 58f8130a960..96d92ad66bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -310,7 +310,7 @@ public class PushPredicateIntoTableScan implements 
RelationalPlanOptimizer {
     }
   }
 
-  static boolean containsDiffFunction(Expression expression) {
+  public static boolean containsDiffFunction(Expression expression) {
     if (expression instanceof FunctionCall
         && "diff".equalsIgnoreCase(((FunctionCall) 
expression).getName().toString())) {
       return true;

Reply via email to