This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/align_by_time_topk in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 838e03e210d69f4abc1187ea50805a5bac35655e Author: Beyyes <[email protected]> AuthorDate: Wed Jan 3 16:46:20 2024 +0800 align by time + order by + limit use TopKNode --- .../plan/planner/LogicalPlanBuilder.java | 34 +++++++++++++++------- .../plan/planner/LogicalPlanVisitor.java | 11 +++---- .../plan/planner/TemplatedLogicalPlan.java | 4 +-- .../planner/distribution/DistributionPlanner.java | 4 +-- .../plan/planner/plan/node/process/TopKNode.java | 6 ++-- 5 files changed, 34 insertions(+), 25 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 0ca934c3282..07b62e5b72f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -129,7 +129,7 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDC import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_VALUE_USE_TOP_K; import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME; import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_VALUE; import static org.apache.iotdb.db.utils.constant.SqlConstant.MAX_TIME; @@ -772,7 +772,7 @@ public class LogicalPlanBuilder { && queryStatement.hasLimit() && queryStatement.getOrderByComponent() != null && !queryStatement.isOrderByBasedOnDevice() - && limitValue <= LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) { + && limitValue <= LIMIT_VALUE_USE_TOP_K) { TopKNode topKNode = new TopKNode( @@ -1189,10 +1189,6 @@ public class LogicalPlanBuilder { return this; } - if (this.getRoot() instanceof TopKNode) { - return this; - } - this.root = new LimitNode(context.getQueryId().genPlanNodeId(), this.getRoot(), rowLimit); return this; } @@ -1557,10 +1553,7 @@ public class LogicalPlanBuilder { return this; } - public LogicalPlanBuilder planOrderBy( - QueryStatement queryStatement, - Set<Expression> orderByExpressions, - Set<Expression> selectExpression) { + public LogicalPlanBuilder planOrderBy(QueryStatement queryStatement, Analysis analysis) { // only the order by clause having expression needs a sortNode if (!queryStatement.hasOrderByExpression()) { return this; @@ -1571,13 +1564,32 @@ public class LogicalPlanBuilder { return this; } + Set<Expression> orderByExpressions = analysis.getOrderByExpressions(); updateTypeProvider(orderByExpressions); OrderByParameter orderByParameter = new OrderByParameter(queryStatement.getSortItemList()); if (orderByParameter.isEmpty()) { return this; } - this.root = new SortNode(context.getQueryId().genPlanNodeId(), root, orderByParameter); + if (queryStatement.hasLimit() && queryStatement.getRowLimit() <= LIMIT_VALUE_USE_TOP_K) { + long limitValue = + queryStatement.hasOffset() + ? queryStatement.getRowOffset() + queryStatement.getRowLimit() + : queryStatement.getRowLimit(); + TopKNode topKNode = + new TopKNode( + context.getQueryId().genPlanNodeId(), + (int) limitValue, + orderByParameter, + root.getOutputColumnNames()); + topKNode.addChild(this.root); + this.root = topKNode; + analysis.setUseTopKNode(); + } else { + this.root = new SortNode(context.getQueryId().genPlanNodeId(), root, orderByParameter); + } + + Set<Expression> selectExpression = analysis.getSelectExpressions(); if (root.getOutputColumnNames().size() != selectExpression.size()) { this.root = new TransformNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index 0b131538942..f62658ce3f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -216,17 +216,18 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte } if (!queryStatement.needPushDownSort()) { - planBuilder = - planBuilder.planOrderBy( - queryStatement, analysis.getOrderByExpressions(), analysis.getSelectExpressions()); + planBuilder = planBuilder.planOrderBy(queryStatement, analysis); } // other upstream node planBuilder = planBuilder .planFill(analysis.getFillDescriptor(), queryStatement.getResultTimeOrder()) - .planOffset(queryStatement.getRowOffset()) - .planLimit(queryStatement.getRowLimit()); + .planOffset(queryStatement.getRowOffset()); + + if (!analysis.isUseTopKNode()) { + planBuilder = planBuilder.planLimit(queryStatement.getRowLimit()); + } // plan select into if (queryStatement.isAlignByDevice()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java index 05ef92dbd60..31e430f4fd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java @@ -182,9 +182,7 @@ public class TemplatedLogicalPlan { } if (!queryStatement.needPushDownSort()) { - planBuilder = - planBuilder.planOrderBy( - queryStatement, analysis.getOrderByExpressions(), analysis.getSelectExpressions()); + planBuilder = planBuilder.planOrderBy(queryStatement, analysis); } // other upstream node diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java index a0ba04cba5f..77ff7261986 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java @@ -53,7 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode.LIMIT_VALUE_USE_TOP_K; public class DistributionPlanner { private final Analysis analysis; @@ -166,7 +166,7 @@ public class DistributionPlanner { // TopKNode will use IdentityNode but not ShuffleSinkNode if (queryStatement.hasLimit() && !queryStatement.isOrderByBasedOnDevice() - && queryStatement.getRowLimit() <= LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) { + && queryStatement.getRowLimit() <= LIMIT_VALUE_USE_TOP_K) { return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java index 8b5845d0467..b32bbc2bd9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/TopKNode.java @@ -36,10 +36,8 @@ import java.util.Objects; /** TopNode is optimized for `order by time|expression limit N align by device` query. */ public class TopKNode extends MultiChildProcessNode { - // when LIMIT value in `order by time|expression LIMIT N align by device` query is less this - // value, - // use TopKNode, otherwise use MergeSortNode - public static final int LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE = 1000000; + // when LIMIT value is less this value, can use TopKNode + public static final int LIMIT_VALUE_USE_TOP_K = 1000000; private final int topValue;
