This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch meituan in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f3467e51a818efcbb5c0b26259aa3976802d13a2 Author: Beyyes <[email protected]> AuthorDate: Thu Dec 7 15:40:55 2023 +0800 TopKNode will use IdentityNode but not ShuffleSinkNode --- .../execution/operator/process/TopKOperator.java | 8 ++++---- .../planner/distribution/DistributionPlanner.java | 21 +++++++++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java index d59067463f9..5672cbe8e9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java @@ -80,7 +80,7 @@ public class TopKOperator implements ProcessOperator { // the data of every childOperator is in order private final boolean childrenDataInOrder; - public static int operatorBatchUpperBound = 100000; + public static final int OPERATOR_BATCH_UPPER_BOUND = 100000; public TopKOperator( OperatorContext operatorContext, @@ -101,9 +101,9 @@ public class TopKOperator implements ProcessOperator { initResultTsBlock(); deviceBatchStep = - operatorBatchUpperBound % topValue == 0 - ? operatorBatchUpperBound / topValue - : operatorBatchUpperBound / topValue + 1; + OPERATOR_BATCH_UPPER_BOUND % topValue == 0 + ? OPERATOR_BATCH_UPPER_BOUND / topValue + : OPERATOR_BATCH_UPPER_BOUND / topValue + 1; canCallNext = new boolean[deviceOperators.size()]; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java index de38440c127..f568e6fb4f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java @@ -51,6 +51,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE; + public class DistributionPlanner { private final Analysis analysis; private final MPPQueryContext context; @@ -155,10 +157,21 @@ public class DistributionPlanner { private boolean needShuffleSinkNode( QueryStatement queryStatement, NodeGroupContext nodeGroupContext) { OrderByComponent orderByComponent = queryStatement.getOrderByComponent(); - return nodeGroupContext.isAlignByDevice() - && orderByComponent != null - && (!orderByComponent.getSortItemList().isEmpty() - && (orderByComponent.isBasedOnTime() && !queryStatement.hasOrderByExpression())); + + if (nodeGroupContext.isAlignByDevice() && orderByComponent != null) { + + // TopKNode will use IdentityNode but not ShuffleSinkNode + if (queryStatement.hasLimit() + && !queryStatement.isOrderByBasedOnDevice() + && queryStatement.getRowLimit() <= LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) { + return false; + } + + return !orderByComponent.getSortItemList().isEmpty() + && (orderByComponent.isBasedOnTime() && !queryStatement.hasOrderByExpression()); + } + + return false; } public PlanNode optimize(PlanNode rootWithExchange) {
