This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/align_by_time_topk
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 838e03e210d69f4abc1187ea50805a5bac35655e
Author: Beyyes <[email protected]>
AuthorDate: Wed Jan 3 16:46:20 2024 +0800

    align by time + order by + limit use TopKNode
---
 .../plan/planner/LogicalPlanBuilder.java           | 34 +++++++++++++++-------
 .../plan/planner/LogicalPlanVisitor.java           | 11 +++----
 .../plan/planner/TemplatedLogicalPlan.java         |  4 +--
 .../planner/distribution/DistributionPlanner.java  |  4 +--
 .../plan/planner/plan/node/process/TopKNode.java   |  6 ++--
 5 files changed, 34 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 0ca934c3282..07b62e5b72f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -129,7 +129,7 @@ import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDC
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
-import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_VALUE_USE_TOP_K;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE;
 import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME;
@@ -772,7 +772,7 @@ public class LogicalPlanBuilder {
         && queryStatement.hasLimit()
         && queryStatement.getOrderByComponent() != null
         && !queryStatement.isOrderByBasedOnDevice()
-        && limitValue <= LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) {
+        && limitValue <= LIMIT_VALUE_USE_TOP_K) {
 
       TopKNode topKNode =
           new TopKNode(
@@ -1189,10 +1189,6 @@ public class LogicalPlanBuilder {
       return this;
     }
 
-    if (this.getRoot() instanceof TopKNode) {
-      return this;
-    }
-
     this.root = new LimitNode(context.getQueryId().genPlanNodeId(), 
this.getRoot(), rowLimit);
     return this;
   }
@@ -1557,10 +1553,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  public LogicalPlanBuilder planOrderBy(
-      QueryStatement queryStatement,
-      Set<Expression> orderByExpressions,
-      Set<Expression> selectExpression) {
+  public LogicalPlanBuilder planOrderBy(QueryStatement queryStatement, 
Analysis analysis) {
     // only the order by clause having expression needs a sortNode
     if (!queryStatement.hasOrderByExpression()) {
       return this;
@@ -1571,13 +1564,32 @@ public class LogicalPlanBuilder {
       return this;
     }
 
+    Set<Expression> orderByExpressions = analysis.getOrderByExpressions();
     updateTypeProvider(orderByExpressions);
     OrderByParameter orderByParameter = new 
OrderByParameter(queryStatement.getSortItemList());
     if (orderByParameter.isEmpty()) {
       return this;
     }
-    this.root = new SortNode(context.getQueryId().genPlanNodeId(), root, 
orderByParameter);
 
+    if (queryStatement.hasLimit() && queryStatement.getRowLimit() <= 
LIMIT_VALUE_USE_TOP_K) {
+      long limitValue =
+          queryStatement.hasOffset()
+              ? queryStatement.getRowOffset() + queryStatement.getRowLimit()
+              : queryStatement.getRowLimit();
+      TopKNode topKNode =
+          new TopKNode(
+              context.getQueryId().genPlanNodeId(),
+              (int) limitValue,
+              orderByParameter,
+              root.getOutputColumnNames());
+      topKNode.addChild(this.root);
+      this.root = topKNode;
+      analysis.setUseTopKNode();
+    } else {
+      this.root = new SortNode(context.getQueryId().genPlanNodeId(), root, 
orderByParameter);
+    }
+
+    Set<Expression> selectExpression = analysis.getSelectExpressions();
     if (root.getOutputColumnNames().size() != selectExpression.size()) {
       this.root =
           new TransformNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 0b131538942..f62658ce3f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -216,17 +216,18 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
     }
 
     if (!queryStatement.needPushDownSort()) {
-      planBuilder =
-          planBuilder.planOrderBy(
-              queryStatement, analysis.getOrderByExpressions(), 
analysis.getSelectExpressions());
+      planBuilder = planBuilder.planOrderBy(queryStatement, analysis);
     }
 
     // other upstream node
     planBuilder =
         planBuilder
             .planFill(analysis.getFillDescriptor(), 
queryStatement.getResultTimeOrder())
-            .planOffset(queryStatement.getRowOffset())
-            .planLimit(queryStatement.getRowLimit());
+            .planOffset(queryStatement.getRowOffset());
+
+    if (!analysis.isUseTopKNode()) {
+      planBuilder = planBuilder.planLimit(queryStatement.getRowLimit());
+    }
 
     // plan select into
     if (queryStatement.isAlignByDevice()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
index 05ef92dbd60..31e430f4fd6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@ -182,9 +182,7 @@ public class TemplatedLogicalPlan {
     }
 
     if (!queryStatement.needPushDownSort()) {
-      planBuilder =
-          planBuilder.planOrderBy(
-              queryStatement, analysis.getOrderByExpressions(), 
analysis.getSelectExpressions());
+      planBuilder = planBuilder.planOrderBy(queryStatement, analysis);
     }
 
     // other upstream node
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index a0ba04cba5f..77ff7261986 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -53,7 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_VALUE_USE_TOP_K;
 
 public class DistributionPlanner {
   private final Analysis analysis;
@@ -166,7 +166,7 @@ public class DistributionPlanner {
       // TopKNode will use IdentityNode but not ShuffleSinkNode
       if (queryStatement.hasLimit()
           && !queryStatement.isOrderByBasedOnDevice()
-          && queryStatement.getRowLimit() <= 
LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) {
+          && queryStatement.getRowLimit() <= LIMIT_VALUE_USE_TOP_K) {
         return false;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java
index 8b5845d0467..b32bbc2bd9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java
@@ -36,10 +36,8 @@ import java.util.Objects;
 /** TopNode is optimized for `order by time|expression limit N align by 
device` query. */
 public class TopKNode extends MultiChildProcessNode {
 
-  // when LIMIT value in `order by time|expression LIMIT N align by device` 
query is less this
-  // value,
-  // use TopKNode, otherwise use MergeSortNode
-  public static final int LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE = 1000000;
+  // when LIMIT value is less this value, can use TopKNode
+  public static final int LIMIT_VALUE_USE_TOP_K = 1000000;
 
   private final int topValue;
 

Reply via email to