This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/topk
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/topk by this push:
     new 9dd4718a05b add impl
9dd4718a05b is described below

commit 9dd4718a05b6f4fdf0b4d8c81735f17609066dc5
Author: Beyyes <[email protected]>
AuthorDate: Thu Jul 18 18:14:37 2024 +0800

    add impl
---
 .../plan/planner/TableOperatorGenerator.java       |  2 +-
 .../plan/relational/planner/QueryPlanner.java      |  6 +-
 .../distribute/DistributedPlanGenerator.java       | 50 +++++++++--
 .../distribute/TableDistributionPlanner.java       |  3 +-
 .../TableModelTypeProviderExtractor.java           |  7 ++
 .../rule/MergeLimitOverProjectWithSort.java        | 98 ++++++++++++++++++++++
 .../planner/iterative/rule/MergeLimitWithSort.java | 60 +++++++++++++
 .../iterative/rule/PruneTableScanColumns.java      |  5 +-
 .../planner/iterative/rule/PruneTopKColumns.java   | 31 +++++++
 .../plan/relational/planner/node/Patterns.java     |  9 +-
 .../plan/relational/planner/node/SortNode.java     | 31 +++++--
 .../relational/planner/node/StreamSortNode.java    | 10 ++-
 .../relational/planner/node/TableScanNode.java     | 30 ++++++-
 .../plan/relational/planner/node/TopKNode.java     |  8 +-
 .../planner/optimizations/OptimizeFactory.java     | 25 ++++--
 .../PushLimitOffsetIntoTableScan.java              | 16 +++-
 .../optimizations/TransformSortToStreamSort.java   |  9 ++
 .../analyzer/LimitOffsetPushDownTest.java          | 58 ++++++++-----
 .../plan/relational/analyzer/SortTest.java         | 84 ++++---------------
 .../plan/relational/analyzer/TestPlanBuilder.java  | 10 ++-
 20 files changed, 418 insertions(+), 134 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 0a6c26634da..54faac3d78d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -653,7 +653,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         dataTypes,
         getComparatorForTable(
             node.getOrderingScheme().getOrderingList(), sortItemIndexList, 
sortItemDataTypeList),
-        node.getCount(),
+        (int) node.getCount(),
         node.isChildrenDataInOrder());
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index a9c094d7d9a..0a49d7c0492 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -279,7 +279,11 @@ public class QueryPlanner {
 
     return subPlan.withNewRoot(
         new SortNode(
-            queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), 
orderingScheme.get(), false));
+            queryIdAllocator.genPlanNodeId(),
+            subPlan.getRoot(),
+            orderingScheme.get(),
+            false,
+            false));
   }
 
   private PlanBuilder offset(PlanBuilder subPlan, Optional<Offset> offset) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index f37c384e4d4..4c4e3cf1935 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
@@ -64,6 +65,7 @@ import static 
org.apache.iotdb.commons.schema.SchemaConstant.ROOT;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaValidator.parseDeviceIdArray;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
 import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+import static org.apache.tsfile.utils.Preconditions.checkArgument;
 
 /** This class is used to generate distributed plan for table model. */
 public class DistributedPlanGenerator
@@ -193,10 +195,42 @@ public class DistributedPlanGenerator
     return resultNodeList;
   }
 
+  @Override
+  public List<PlanNode> visitTopK(TopKNode node, PlanContext context) {
+    context.expectedOrderingScheme = node.getOrderingScheme();
+    context.hasSortProperty = true;
+    nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
+
+    checkArgument(
+        node.getChildren().size() == 1, "Size of TopKNode can only be 1 in 
logical plan.");
+    List<PlanNode> childrenNodes = node.getChildren().get(0).accept(this, 
context);
+    if (childrenNodes.size() == 1) {
+      node.setChildren(Collections.singletonList(childrenNodes.get(0)));
+      return Collections.singletonList(node);
+    }
+
+    TopKNode newTopKNode = (TopKNode) node.clone();
+    for (int i = 0; i < childrenNodes.size(); i++) {
+      PlanNode child = childrenNodes.get(i);
+      TopKNode subTopKNode =
+          new TopKNode(
+              queryId.genPlanNodeId(),
+              Collections.singletonList(child),
+              node.getOrderingScheme(),
+              node.getCount(),
+              node.getOutputSymbols(),
+              node.isChildrenDataInOrder());
+      newTopKNode.addChild(subTopKNode);
+    }
+    nodeOrderingMap.put(newTopKNode.getPlanNodeId(), 
newTopKNode.getOrderingScheme());
+
+    return Collections.singletonList(newTopKNode);
+  }
+
   @Override
   public List<PlanNode> visitSort(SortNode node, PlanContext context) {
     context.expectedOrderingScheme = node.getOrderingScheme();
-    context.hasSortNode = true;
+    context.hasSortProperty = true;
     nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
 
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
@@ -211,7 +245,7 @@ public class DistributedPlanGenerator
             queryId.genPlanNodeId(), node.getOrderingScheme(), 
node.getOutputSymbols());
     for (PlanNode child : childrenNodes) {
       SortNode subSortNode =
-          new SortNode(queryId.genPlanNodeId(), child, 
node.getOrderingScheme(), false);
+          new SortNode(queryId.genPlanNodeId(), child, 
node.getOrderingScheme(), false, false);
       mergeSortNode.addChild(subSortNode);
     }
     nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), 
mergeSortNode.getOrderingScheme());
@@ -222,7 +256,7 @@ public class DistributedPlanGenerator
   @Override
   public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext 
context) {
     context.expectedOrderingScheme = node.getOrderingScheme();
-    context.hasSortNode = true;
+    context.hasSortProperty = true;
     nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
 
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
@@ -242,6 +276,7 @@ public class DistributedPlanGenerator
               child,
               node.getOrderingScheme(),
               false,
+              node.isOrderByAllIdsAndTime(),
               node.getStreamCompareKeyEndIndex());
       mergeSortNode.addChild(subSortNode);
     }
@@ -309,7 +344,10 @@ public class DistributedPlanGenerator
                           node.getIdAndAttributeIndexMap(),
                           node.getScanOrder(),
                           node.getTimePredicate().orElse(null),
-                          node.getPushDownPredicate());
+                          node.getPushDownPredicate(),
+                          node.getPushDownLimit(),
+                          node.getPushDownOffset(),
+                          node.isPushLimitToEachDevice());
                   scanNode.setRegionReplicaSet(regionReplicaSet);
                   return scanNode;
                 });
@@ -335,7 +373,7 @@ public class DistributedPlanGenerator
     }
     context.mostUsedDataRegion = mostUsedDataRegion;
 
-    if (!context.hasSortNode) {
+    if (!context.hasSortProperty) {
       return resultTableScanNodeList;
     }
 
@@ -531,7 +569,7 @@ public class DistributedPlanGenerator
   public static class PlanContext {
     final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
     boolean hasExchangeNode = false;
-    boolean hasSortNode = false;
+    boolean hasSortProperty = false;
     OrderingScheme expectedOrderingScheme;
     TRegionReplicaSet mostUsedDataRegion;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 04caab49710..4a0de5ccb8b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -29,7 +29,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import 
org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.PlanOptimizersStatsCollector;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushLimitOffsetIntoTableScan;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SortElimination;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
 
@@ -53,7 +52,7 @@ public class TableDistributionPlanner {
     this.analysis = analysis;
     this.logicalQueryPlan = logicalQueryPlan;
     this.mppQueryContext = mppQueryContext;
-    this.optimizers = Arrays.asList(new PushLimitOffsetIntoTableScan(), new 
SortElimination());
+    this.optimizers = Arrays.asList(new SortElimination());
   }
 
   public DistributedQueryPlan plan() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index 08369519559..1309d364398 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 
 import org.apache.tsfile.read.common.type.BooleanType;
@@ -120,6 +121,12 @@ public class TableModelTypeProviderExtractor {
       return null;
     }
 
+    @Override
+    public Void visitTopK(TopKNode node, Void context) {
+      node.getChildren().forEach(c -> c.accept(this, context));
+      return null;
+    }
+
     @Override
     public Void visitStreamSort(StreamSortNode node, Void context) {
       node.getChild().accept(this, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitOverProjectWithSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitOverProjectWithSort.java
new file mode 100644
index 00000000000..cc78b745b72
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitOverProjectWithSort.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.project;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.sort;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+/**
+ * Transforms:
+ *
+ * <pre>
+ * - Limit (limit = x)
+ *    - Project (identity, narrowing)
+ *       - Sort (order by a, b)
+ * </pre>
+ *
+ * Into:
+ *
+ * <pre>
+ * - Project (identity, narrowing)
+ *    - TopN (limit = x, order by a, b)
+ * </pre>
+ *
+ * Applies to LimitNode without ties only.
+ */
+public class MergeLimitOverProjectWithSort implements Rule<LimitNode> {
+  private static final Capture<ProjectNode> PROJECT = newCapture();
+  private static final Capture<SortNode> SORT = newCapture();
+
+  private static final Pattern<LimitNode> PATTERN =
+      limit()
+          // .matching(limit -> !limit.isWithTies())
+          .with(
+              source()
+                  .matching(
+                      project()
+                          .capturedAs(PROJECT)
+                          .matching(ProjectNode::isIdentity)
+                          .with(source().matching(sort().capturedAs(SORT)))));
+
+  @Override
+  public Pattern<LimitNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(LimitNode parent, Captures captures, Context context) {
+    ProjectNode project = captures.get(PROJECT);
+    SortNode sort = captures.get(SORT);
+
+    if (sort instanceof StreamSortNode) {
+      return Result.empty();
+    }
+
+    return Result.ofPlanNode(
+        project.replaceChildren(
+            ImmutableList.of(
+                new TopKNode(
+                    parent.getPlanNodeId(),
+                    sort.getChildren(),
+                    sort.getOrderingScheme(),
+                    parent.getCount(),
+                    parent.getOutputSymbols(),
+                    sort.isOrderByAllIdsAndTime()))));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitWithSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitWithSort.java
new file mode 100644
index 00000000000..49913558f38
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeLimitWithSort.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.sort;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Capture.newCapture;
+
+public class MergeLimitWithSort implements Rule<LimitNode> {
+  private static final Capture<SortNode> CHILD = newCapture();
+
+  private static final Pattern<LimitNode> PATTERN =
+      limit()
+          // .matching(limit -> !limit.isWithTies())
+          .with(source().matching(sort().capturedAs(CHILD)));
+
+  @Override
+  public Pattern<LimitNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(LimitNode parent, Captures captures, Context context) {
+    SortNode child = captures.get(CHILD);
+
+    if (child instanceof StreamSortNode) {
+      return Result.empty();
+    }
+
+    return Result.ofPlanNode(
+        new TopKNode(
+            parent.getPlanNodeId(),
+            child.getChildren(),
+            child.getOrderingScheme(),
+            parent.getCount(),
+            parent.getOutputSymbols(),
+            child.isOrderByAllIdsAndTime()));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
index d086a685b4c..6afca034475 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java
@@ -97,6 +97,9 @@ public class PruneTableScanColumns extends 
ProjectOffPushDownRule<TableScanNode>
             node.getIdAndAttributeIndexMap(),
             node.getScanOrder(),
             node.getTimePredicate().orElse(null),
-            node.getPushDownPredicate()));
+            node.getPushDownPredicate(),
+            node.getPushDownLimit(),
+            node.getPushDownOffset(),
+            node.isPushLimitToEachDevice()));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKColumns.java
new file mode 100644
index 00000000000..e95b1efa16b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTopKColumns.java
@@ -0,0 +1,31 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+
+import com.google.common.collect.Streams;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictChildOutputs;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.topK;
+
+public class PruneTopKColumns extends ProjectOffPushDownRule<TopKNode> {
+  public PruneTopKColumns() {
+    super(topK());
+  }
+
+  @Override
+  protected Optional<PlanNode> pushDownProjectOff(
+      Context context, TopKNode topKNode, Set<Symbol> referencedOutputs) {
+    Set<Symbol> prunedTopNInputs =
+        Streams.concat(
+                referencedOutputs.stream(), 
topKNode.getOrderingScheme().getOrderBy().stream())
+            .collect(toImmutableSet());
+
+    return restrictChildOutputs(context.getIdAllocator(), topKNode, 
prunedTopNInputs);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
index 489df2f8366..541db20fe00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
@@ -160,14 +160,13 @@ public final class Patterns {
     return typeOf(TableScanNode.class);
   }
 
-  /*public static Pattern<TableWriterNode> tableWriterNode()
-  {
-      return typeOf(TableWriterNode.class);
+  public static Pattern<TopKNode> topK() {
+    return typeOf(TopKNode.class);
   }
 
-  public static Pattern<TopNNode> topN()
+  /*public static Pattern<TableWriterNode> tableWriterNode()
   {
-      return typeOf(TopNNode.class);
+      return typeOf(TableWriterNode.class);
   }
 
   public static Pattern<UnionNode> union()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
index 2e85d983fb7..67e9ad4f2aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
@@ -34,16 +34,24 @@ import java.util.List;
 public class SortNode extends SingleChildProcessNode {
   protected final OrderingScheme orderingScheme;
   protected final boolean partial;
-
-  public SortNode(PlanNodeId id, PlanNode child, OrderingScheme scheme, 
boolean partial) {
+  // when order by all ids and time, this sort node can be eliminated in a way
+  protected boolean orderByAllIdsAndTime;
+
+  public SortNode(
+      PlanNodeId id,
+      PlanNode child,
+      OrderingScheme scheme,
+      boolean partial,
+      boolean orderByAllIdsAndTime) {
     super(id, child);
     this.orderingScheme = scheme;
     this.partial = partial;
+    this.orderByAllIdsAndTime = orderByAllIdsAndTime;
   }
 
   @Override
   public PlanNode clone() {
-    return new SortNode(id, null, orderingScheme, partial);
+    return new SortNode(id, null, orderingScheme, partial, 
orderByAllIdsAndTime);
   }
 
   @Override
@@ -60,21 +68,21 @@ public class SortNode extends SingleChildProcessNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.TABLE_SORT_NODE.serialize(byteBuffer);
     orderingScheme.serialize(byteBuffer);
-    ReadWriteIOUtils.write(partial, byteBuffer);
+    ReadWriteIOUtils.write(orderByAllIdsAndTime, byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
     PlanNodeType.TABLE_SORT_NODE.serialize(stream);
     orderingScheme.serialize(stream);
-    ReadWriteIOUtils.write(partial, stream);
+    ReadWriteIOUtils.write(orderByAllIdsAndTime, stream);
   }
 
   public static SortNode deserialize(ByteBuffer byteBuffer) {
     OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer);
     boolean partial = ReadWriteIOUtils.readBool(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new SortNode(planNodeId, null, orderingScheme, partial);
+    return new SortNode(planNodeId, null, orderingScheme, partial, false);
   }
 
   @Override
@@ -84,7 +92,8 @@ public class SortNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode replaceChildren(List<PlanNode> newChildren) {
-    return new SortNode(id, Iterables.getOnlyElement(newChildren), 
orderingScheme, partial);
+    return new SortNode(
+        id, Iterables.getOnlyElement(newChildren), orderingScheme, partial, 
orderByAllIdsAndTime);
   }
 
   public OrderingScheme getOrderingScheme() {
@@ -95,6 +104,14 @@ public class SortNode extends SingleChildProcessNode {
     return this.partial;
   }
 
+  public boolean isOrderByAllIdsAndTime() {
+    return orderByAllIdsAndTime;
+  }
+
+  public void setOrderByAllIdsAndTime(boolean orderByAllIdsAndTime) {
+    this.orderByAllIdsAndTime = orderByAllIdsAndTime;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
index 7ee9dd322fa..8a35a80bf22 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
@@ -43,8 +43,9 @@ public class StreamSortNode extends SortNode {
       PlanNode child,
       OrderingScheme scheme,
       boolean partial,
+      boolean orderByAllIdsAndTime,
       int streamCompareKeyEndIndex) {
-    super(id, child, scheme, partial);
+    super(id, child, scheme, partial, orderByAllIdsAndTime);
     this.streamCompareKeyEndIndex = streamCompareKeyEndIndex;
   }
 
@@ -59,6 +60,7 @@ public class StreamSortNode extends SortNode {
         Iterables.getOnlyElement(newChildren),
         orderingScheme,
         partial,
+        orderByAllIdsAndTime,
         streamCompareKeyEndIndex);
   }
 
@@ -69,7 +71,8 @@ public class StreamSortNode extends SortNode {
 
   @Override
   public PlanNode clone() {
-    return new StreamSortNode(id, null, orderingScheme, partial, 
streamCompareKeyEndIndex);
+    return new StreamSortNode(
+        id, null, orderingScheme, partial, orderByAllIdsAndTime, 
streamCompareKeyEndIndex);
   }
 
   @Override
@@ -93,7 +96,8 @@ public class StreamSortNode extends SortNode {
     boolean partial = ReadWriteIOUtils.readBool(byteBuffer);
     int streamCompareKeyEndIndex = ReadWriteIOUtils.read(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new StreamSortNode(planNodeId, null, orderingScheme, partial, 
streamCompareKeyEndIndex);
+    return new StreamSortNode(
+        planNodeId, null, orderingScheme, partial, false, 
streamCompareKeyEndIndex);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index 98f21b2b9a8..770ab775c05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -118,7 +118,10 @@ public class TableScanNode extends SourceNode {
       Map<Symbol, Integer> idAndAttributeIndexMap,
       Ordering scanOrder,
       Expression timePredicate,
-      Expression pushDownPredicate) {
+      Expression pushDownPredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      boolean pushLimitToEachDevice) {
     super(id);
     this.qualifiedObjectName = qualifiedObjectName;
     this.outputSymbols = outputSymbols;
@@ -128,6 +131,9 @@ public class TableScanNode extends SourceNode {
     this.scanOrder = scanOrder;
     this.timePredicate = timePredicate;
     this.pushDownPredicate = pushDownPredicate;
+    this.pushDownLimit = pushDownLimit;
+    this.pushDownOffset = pushDownOffset;
+    this.pushLimitToEachDevice = pushLimitToEachDevice;
   }
 
   @Override
@@ -154,7 +160,10 @@ public class TableScanNode extends SourceNode {
         idAndAttributeIndexMap,
         scanOrder,
         timePredicate,
-        pushDownPredicate);
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice);
   }
 
   @Override
@@ -214,6 +223,10 @@ public class TableScanNode extends SourceNode {
     } else {
       ReadWriteIOUtils.write(false, byteBuffer);
     }
+
+    ReadWriteIOUtils.write(pushDownLimit, byteBuffer);
+    ReadWriteIOUtils.write(pushDownOffset, byteBuffer);
+    ReadWriteIOUtils.write(pushLimitToEachDevice, byteBuffer);
   }
 
   @Override
@@ -264,6 +277,10 @@ public class TableScanNode extends SourceNode {
     } else {
       ReadWriteIOUtils.write(false, stream);
     }
+
+    ReadWriteIOUtils.write(pushDownLimit, stream);
+    ReadWriteIOUtils.write(pushDownOffset, stream);
+    ReadWriteIOUtils.write(pushLimitToEachDevice, stream);
   }
 
   public static TableScanNode deserialize(ByteBuffer byteBuffer) {
@@ -314,6 +331,10 @@ public class TableScanNode extends SourceNode {
       pushDownPredicate = Expression.deserialize(byteBuffer);
     }
 
+    long pushDownLimit = ReadWriteIOUtils.readLong(byteBuffer);
+    long pushDownOffset = ReadWriteIOUtils.readLong(byteBuffer);
+    boolean pushLimitToEachDevice = ReadWriteIOUtils.readBool(byteBuffer);
+
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
 
     return new TableScanNode(
@@ -325,7 +346,10 @@ public class TableScanNode extends SourceNode {
         idAndAttributeIndexMap,
         scanOrder,
         timePredicate,
-        pushDownPredicate);
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
index 221bcc8c892..4625bc2eae8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
@@ -42,7 +42,7 @@ public class TopKNode extends MultiChildProcessNode {
 
   private final OrderingScheme orderingScheme;
 
-  private final int count;
+  private final long count;
 
   private final List<Symbol> outputSymbols;
 
@@ -51,7 +51,7 @@ public class TopKNode extends MultiChildProcessNode {
   public TopKNode(
       PlanNodeId id,
       OrderingScheme scheme,
-      int count,
+      long count,
       List<Symbol> outputSymbols,
       boolean childrenDataInOrder) {
     super(id);
@@ -65,7 +65,7 @@ public class TopKNode extends MultiChildProcessNode {
       PlanNodeId id,
       List<PlanNode> children,
       OrderingScheme scheme,
-      int count,
+      long count,
       List<Symbol> outputSymbols,
       boolean childrenDataInOrder) {
     super(id, children);
@@ -142,7 +142,7 @@ public class TopKNode extends MultiChildProcessNode {
     return orderingScheme;
   }
 
-  public int getCount() {
+  public long getCount() {
     return count;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
index 72ca95bd0e4..2a66a20b6d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
@@ -18,6 +18,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Iterati
 import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.InlineProjections;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithSort;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithSort;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneFilterColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneLimitColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneOffsetColumns;
@@ -25,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Pr
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneProjectColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneSortColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneTableScanColumns;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneTopKColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughOffset;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PushLimitThroughProject;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.RemoveRedundantIdentityProjections;
@@ -53,7 +56,8 @@ public class OptimizeFactory {
             new PruneOutputSourceColumns(),
             new PruneProjectColumns(),
             new PruneSortColumns(),
-            new PruneTableScanColumns(plannerContext.getMetadata()));
+            new PruneTableScanColumns(plannerContext.getMetadata()),
+            new PruneTopKColumns());
     IterativeOptimizer columnPruningOptimizer =
         new IterativeOptimizer(plannerContext, new RuleStatsRecorder(), 
columnPruningRules);
 
@@ -64,10 +68,19 @@ public class OptimizeFactory {
             ImmutableSet.of(
                 new InlineProjections(plannerContext), new 
RemoveRedundantIdentityProjections()));
 
-    Set<Rule<?>> limitPushdownRules =
-        ImmutableSet.of(new PushLimitThroughOffset(), new 
PushLimitThroughProject());
     IterativeOptimizer limitPushdownOptimizer =
-        new IterativeOptimizer(plannerContext, new RuleStatsRecorder(), 
limitPushdownRules);
+        new IterativeOptimizer(
+            plannerContext,
+            new RuleStatsRecorder(),
+            ImmutableSet.of(new PushLimitThroughOffset(), new 
PushLimitThroughProject()));
+
+    PlanOptimizer pushLimitOffsetIntoTableScanOptimizer = new 
PushLimitOffsetIntoTableScan();
+
+    IterativeOptimizer topKOptimizer =
+        new IterativeOptimizer(
+            plannerContext,
+            new RuleStatsRecorder(),
+            ImmutableSet.of(new MergeLimitWithSort(), new 
MergeLimitOverProjectWithSort()));
 
     this.planOptimizers =
         ImmutableList.of(
@@ -79,7 +92,9 @@ public class OptimizeFactory {
             columnPruningOptimizer,
             inlineProjectionsOptimizer,
             limitPushdownOptimizer,
-            transformSortToStreamSortOptimizer);
+            pushLimitOffsetIntoTableScanOptimizer,
+            transformSortToStreamSortOptimizer,
+            topKOptimizer);
   }
 
   public List<PlanOptimizer> getPlanOptimizers() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index a11683dde28..ed5faef0f35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
 
@@ -43,7 +44,7 @@ import static 
org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
 /**
  * <b>Optimization phase:</b> Distributed plan planning.
  *
- * <p>The LIMIT OFFSET condition can be pushed down to the SeriesScanNode, 
when the following
+ * <p>The LIMIT OFFSET condition can be pushed down to the TableScanNode, when 
the following
  * conditions are met:
  * <li>Time series query (not aggregation query).
  * <li>The query expressions are all scalar expression.
@@ -87,9 +88,10 @@ public class PushLimitOffsetIntoTableScan implements 
PlanOptimizer {
     @Override
     public PlanNode visitOffset(OffsetNode node, Context context) {
       context.setOffset(node.getCount());
-      if (context.getLimit() > 0) {
-        context.setLimit(context.getLimit() + context.getOffset());
-      }
+      // already use rule {@link PushLimitThroughOffset}
+      //      if (context.getLimit() > 0) {
+      //        context.setLimit(context.getLimit() + context.getOffset());
+      //      }
       node.setChild(node.getChild().accept(this, context));
       return node;
     }
@@ -163,6 +165,12 @@ public class PushLimitOffsetIntoTableScan implements 
PlanOptimizer {
       return node;
     }
 
+    @Override
+    public PlanNode visitTopK(TopKNode node, Context context) {
+      throw new IllegalStateException(
+          "TopKNode must be appeared after PushLimitOffsetIntoTableScan");
+    }
+
     @Override
     public PlanNode visitStreamSort(StreamSortNode node, Context context) {
       return visitSort(node, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
index 9ea378df087..b9c7fe2d81a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -91,11 +91,20 @@ public class TransformSortToStreamSort implements 
PlanOptimizer {
       }
 
       if (streamSortIndex > 0) {
+        boolean orderByAllIdsAndTime = true;
+        for (Map.Entry<Symbol, ColumnSchema> entry : 
tableColumnSchema.entrySet()) {
+          if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
+              && !orderingScheme.getOrderings().containsKey(entry.getKey())) {
+            orderByAllIdsAndTime = false;
+          }
+        }
+
         return new StreamSortNode(
             queryContext.getQueryId().genPlanNodeId(),
             child,
             node.getOrderingScheme(),
             node.isPartial(),
+            orderByAllIdsAndTime,
             streamSortIndex);
       }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
index c612a697540..b636257ce0c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -27,10 +27,12 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
-import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 
 import org.junit.Test;
 
@@ -65,7 +67,6 @@ public class LimitOffsetPushDownTest {
   TableScanNode tableScanNode;
 
   // without sort operation, limit can be pushed into TableScan, 
pushLimitToEachDevice==false
-  // Output - Project - Limit - Offset - Collect - TableScan
   @Test
   public void noOrderByTest() {
     sql = "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 offset 
5 limit 10";
@@ -75,8 +76,11 @@ public class LimitOffsetPushDownTest {
         new LogicalPlanner(context, metadata, sessionInfo, 
WarningCollector.NOOP)
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
+    // LogicalPlan: `Output - Project - Offset - Limit - TableScan`
+    assertTrue(getChildrenNode(rootNode, 3) instanceof LimitNode);
     assertTrue(getChildrenNode(rootNode, 4) instanceof TableScanNode);
 
+    // DistributePlan: `Output - Project - Offset - Limit - Collect - 
TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -102,7 +106,6 @@ public class LimitOffsetPushDownTest {
   }
 
   // order by all tags, limit can be pushed into TableScan, 
pushLimitToEachDevice==false
-  // Output - Limit - Offset - Project - MergeSort -  Project - TableScan
   @Test
   public void orderByAllTagsTest() {
     sql =
@@ -113,8 +116,10 @@ public class LimitOffsetPushDownTest {
         new LogicalPlanner(context, metadata, sessionInfo, 
WarningCollector.NOOP)
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
+    // LogicalPlan: `Output - Offset - Limit - Project - StreamSort -  Project 
- TableScan`
     assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
 
+    // DistributePlan: `Output - Offset - Limit - Project - MergeSort -  
Project - TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -140,7 +145,6 @@ public class LimitOffsetPushDownTest {
   }
 
   // order by some tags, limit can be pushed into TableScan, 
pushLimitToEachDevice==true
-  // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
   @Test
   public void orderBySomeTagsTest() {
     sql =
@@ -151,8 +155,12 @@ public class LimitOffsetPushDownTest {
         new LogicalPlanner(context, metadata, sessionInfo, 
WarningCollector.NOOP)
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
+    // LogicalPlan: `Output - Offset - Limit - Project - StreamSort - Project 
- TableScan`
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
     assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
 
+    // DistributePlan: `Output - Offset - Limit - Project - MergeSort - 
StreamSort - Project -
+    // TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -160,7 +168,7 @@ public class LimitOffsetPushDownTest {
         (MergeSortNode)
             
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
5);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
     tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
@@ -178,7 +186,6 @@ public class LimitOffsetPushDownTest {
   }
 
   // order by time, limit can be pushed into TableScan, 
pushLimitToEachDevice==true
-  // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
   @Test
   public void orderByTimeTest() {
     sql =
@@ -188,19 +195,22 @@ public class LimitOffsetPushDownTest {
     logicalQueryPlan =
         new LogicalPlanner(context, metadata, sessionInfo, 
WarningCollector.NOOP)
             .plan(actualAnalysis);
+    // LogicalPlan: `Output - Offset - Project - TopK - Project - TableScan`
     rootNode = logicalQueryPlan.getRootNode();
-    assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof TopKNode);
+    assertTrue(getChildrenNode(rootNode, 5) instanceof TableScanNode);
 
+    // DistributePlan-1 `Identity - Output - Offset - Project - TopK - 
{Exchange + TopK + Exchange}
+    // DistributePlan-2 `Identity - TopK - Project - TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
5);
-    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
-    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
+    TopKNode topKNode =
+        (TopKNode) 
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
4);
+    assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(topKNode.getChildren().get(1) instanceof TopKNode);
+    assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+    tableScanNode = (TableScanNode) 
getChildrenNode(topKNode.getChildren().get(1), 2);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(DESC, tableScanNode.getScanOrder());
     assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
@@ -216,7 +226,6 @@ public class LimitOffsetPushDownTest {
   }
 
   // order by others, limit can not be pushed into TableScan
-  // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
   @Test
   public void orderByOthersTest() {
     sql =
@@ -226,19 +235,22 @@ public class LimitOffsetPushDownTest {
     logicalQueryPlan =
         new LogicalPlanner(context, metadata, sessionInfo, 
WarningCollector.NOOP)
             .plan(actualAnalysis);
+    // LogicalPlan: `Output - Offset - Project - TopK - Project - TableScan`
     rootNode = logicalQueryPlan.getRootNode();
-    assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof TopKNode);
+    assertTrue(getChildrenNode(rootNode, 5) instanceof TableScanNode);
 
+    // DistributePlan-1 `Identity - Output - Offset - Project - TopK - 
{Exchange + TopK + Exchange}
+    // DistributePlan-2 `Identity - TopK - Project - TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
5);
-    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
-    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
+    TopKNode topKNode =
+        (TopKNode) 
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
4);
+    assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(topKNode.getChildren().get(1) instanceof TopKNode);
+    assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+    tableScanNode = (TableScanNode) 
getChildrenNode(topKNode.getChildren().get(1), 2);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(ASC, tableScanNode.getScanOrder());
     assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index 2242ebb74ca..d66b2c4899e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
 
 import org.junit.Test;
 
@@ -82,34 +83,6 @@ public class SortTest {
   DistributedQueryPlan distributedQueryPlan;
   TableScanNode tableScanNode;
 
-  /*
-   * order by time, others, some_ids
-   *
-   * IdentitySinkNode-33
-   *   └──OutputNode-8
-   *       └──LimitNode-7
-   *           └──OffsetNode-6
-   *             └──ProjectNode
-   *               └──MergeSortNode-25
-   *                   ├──ExchangeNode-29: 
[SourceAddress:192.0.12.1/test_query.2.0/31]
-   *                   ├──SortNode-27
-   *                   │   └──ProjectNode-23
-   *                   │           └──FilterNode-17
-   *                   │               └──TableScanNode-14
-   *                   └──ExchangeNode-30: 
[SourceAddress:192.0.10.1/test_query.3.0/32]
-   *
-   * IdentitySinkNode-31
-   *   └──SortNode-26
-   *       └──ProjectNode-22
-   *               └──FilterNode-16
-   *                   └──TableScanNode-13
-   *
-   * IdentitySinkNode-31
-   *   └──SortNode-26
-   *           └──ProjectNode-19
-   *               └──FilterNode-16
-   *                   └──TableScanNode-13
-   */
   @Test
   public void timeOthersSomeIDColumnSortTest() {
     sql =
@@ -122,32 +95,22 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
+    // LogicalPlan: `Output - Offset - Project - TopK - Project - FilterNode - 
TableScan`
     assertTrue(rootNode instanceof OutputNode);
     assertTrue(getChildrenNode(rootNode, 1) instanceof OffsetNode);
-    assertTrue(getChildrenNode(rootNode, 2) instanceof LimitNode);
-    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    // TODO(beyyes) merge parent and child ProjectNode into one, parent 
contains all children
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof TopKNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 5) instanceof FilterNode);
+    assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(rootNode, 6);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // Output - Limit - Offset -Project - MergeSort - Sort - Project - Filter 
- TableScan
+    // DistributePlan-1 `Identity - Output - Offset - Project - TopK - 
{Exchange + TopK + Exchange}
+    // DistributePlan-2 `Identity - TopK - Project - Filter - TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -158,25 +121,12 @@ public class SortTest {
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
     assertTrue(getChildrenNode(outputNode, 1) instanceof OffsetNode);
-    assertTrue(getChildrenNode(outputNode, 2) instanceof LimitNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
-    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof TopKNode);
+    TopKNode topKNode = (TopKNode) getChildrenNode(outputNode, 3);
+    assertTrue(topKNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(topKNode.getChildren().get(1) instanceof TopKNode);
+    assertTrue(topKNode.getChildren().get(2) instanceof ExchangeNode);
+    SortNode sortNode = (SortNode) topKNode.getChildren().get(1);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
     tableScanNode =
@@ -194,7 +144,7 @@ public class SortTest {
     assertEquals(DESC, tableScanNode.getScanOrder());
     assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
 
-    // IdentitySink - Sort - Project - Filter - TableScan
+    // IdentitySink - TopK - Project - Filter - TableScan
     assertTrue(
         distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
     assertTrue(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
index bcf776321f0..4101d092efa 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
@@ -81,7 +81,10 @@ public class TestPlanBuilder {
       Map<Symbol, Integer> idAndAttributeIndexMap,
       Ordering scanOrder,
       Expression timePredicate,
-      Expression pushDownPredicate) {
+      Expression pushDownPredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      boolean pushLimitToEachDevice) {
     this.root =
         new TableScanNode(
             new PlanNodeId(id),
@@ -92,7 +95,10 @@ public class TestPlanBuilder {
             idAndAttributeIndexMap,
             scanOrder,
             timePredicate,
-            pushDownPredicate);
+            pushDownPredicate,
+            pushDownLimit,
+            pushDownOffset,
+            pushLimitToEachDevice);
     return this;
   }
 }

Reply via email to