This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch meituan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f3467e51a818efcbb5c0b26259aa3976802d13a2
Author: Beyyes <[email protected]>
AuthorDate: Thu Dec 7 15:40:55 2023 +0800

    TopKNode will use IdentityNode but not ShuffleSinkNode
---
 .../execution/operator/process/TopKOperator.java    |  8 ++++----
 .../planner/distribution/DistributionPlanner.java   | 21 +++++++++++++++++----
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index d59067463f9..5672cbe8e9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -80,7 +80,7 @@ public class TopKOperator implements ProcessOperator {
   // the data of every childOperator is in order
   private final boolean childrenDataInOrder;
 
-  public static int operatorBatchUpperBound = 100000;
+  public static final int OPERATOR_BATCH_UPPER_BOUND = 100000;
 
   public TopKOperator(
       OperatorContext operatorContext,
@@ -101,9 +101,9 @@ public class TopKOperator implements ProcessOperator {
     initResultTsBlock();
 
     deviceBatchStep =
-        operatorBatchUpperBound % topValue == 0
-            ? operatorBatchUpperBound / topValue
-            : operatorBatchUpperBound / topValue + 1;
+        OPERATOR_BATCH_UPPER_BOUND % topValue == 0
+            ? OPERATOR_BATCH_UPPER_BOUND / topValue
+            : OPERATOR_BATCH_UPPER_BOUND / topValue + 1;
     canCallNext = new boolean[deviceOperators.size()];
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index de38440c127..f568e6fb4f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -51,6 +51,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE;
+
 public class DistributionPlanner {
   private final Analysis analysis;
   private final MPPQueryContext context;
@@ -155,10 +157,21 @@ public class DistributionPlanner {
   private boolean needShuffleSinkNode(
       QueryStatement queryStatement, NodeGroupContext nodeGroupContext) {
     OrderByComponent orderByComponent = queryStatement.getOrderByComponent();
-    return nodeGroupContext.isAlignByDevice()
-        && orderByComponent != null
-        && (!orderByComponent.getSortItemList().isEmpty()
-            && (orderByComponent.isBasedOnTime() && 
!queryStatement.hasOrderByExpression()));
+
+    if (nodeGroupContext.isAlignByDevice() && orderByComponent != null) {
+
+      // TopKNode will use IdentityNode but not ShuffleSinkNode
+      if (queryStatement.hasLimit()
+          && !queryStatement.isOrderByBasedOnDevice()
+          && queryStatement.getRowLimit() <= 
LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) {
+        return false;
+      }
+
+      return !orderByComponent.getSortItemList().isEmpty()
+          && (orderByComponent.isBasedOnTime() && 
!queryStatement.hasOrderByExpression());
+    }
+
+    return false;
   }
 
   public PlanNode optimize(PlanNode rootWithExchange) {

Reply via email to