This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new f09cfe11625 Add LimitOffsetPushDown optimize rule
f09cfe11625 is described below

commit f09cfe1162553c9515586c7b5f1c01720ae9e9db
Author: Beyyes <[email protected]>
AuthorDate: Tue Jul 9 19:48:52 2024 +0800

    Add LimitOffsetPushDown optimize rule
---
 .../plan/relational/analyzer/Analysis.java         |  15 ++
 .../plan/relational/planner/QueryPlanner.java      |  14 +-
 .../plan/relational/planner/RelationPlanner.java   |  16 +-
 .../plan/relational/planner/TranslationMap.java    |  27 +-
 .../distribute/TableDistributionPlanner.java       |  10 +
 .../plan/relational/planner/node/CollectNode.java  |   5 +-
 .../relational/planner/node/TableScanNode.java     |  12 +
 .../planner/optimizations/LimitOffsetPushDown.java | 268 ++++++++++++++++++++
 .../optimizations/PushPredicateIntoTableScan.java  |  30 ++-
 .../planner/optimizations/SimplifyExpressions.java |   1 +
 .../analyzer/LimitOffsetPushDownTest.java          | 273 +++++++++++++++++++++
 .../plan/relational/analyzer/SortTest.java         |  34 ++-
 .../plan/relational/analyzer/TestPlanBuilder.java  |  98 ++++++++
 13 files changed, 743 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 25a8e6edc7f..f913ddbf98e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -30,8 +30,10 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySour
 import 
org.apache.iotdb.db.queryengine.plan.execution.memory.TableModelStatementMemorySourceContext;
 import 
org.apache.iotdb.db.queryengine.plan.execution.memory.TableModelStatementMemorySourceVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import org.apache.iotdb.db.queryengine.plan.relational.security.Identity;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllColumns;
@@ -68,6 +70,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -154,6 +157,9 @@ public class Analysis implements IAnalysis {
 
   private final Set<NodeRef<Relation>> aliasedRelations = new 
LinkedHashSet<>();
 
+  private final Map<QualifiedObjectName, Map<Symbol, ColumnSchema>> 
tableColumnSchemas =
+      new HashMap<>();
+
   private DataPartition dataPartition;
 
   // only be used in write plan and won't be used in query
@@ -566,6 +572,15 @@ public class Analysis implements IAnalysis {
     return aliasedRelations.contains(NodeRef.of(relation));
   }
 
+  public void addTableSchema(
+      QualifiedObjectName qualifiedObjectName, Map<Symbol, ColumnSchema> 
tableColumnSchema) {
+    tableColumnSchemas.put(qualifiedObjectName, tableColumnSchema);
+  }
+
+  public Map<Symbol, ColumnSchema> getTableColumnSchema(QualifiedObjectName 
qualifiedObjectName) {
+    return tableColumnSchemas.get(qualifiedObjectName);
+  }
+
   public boolean hasValueFilter() {
     return hasValueFilter;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index 60e2a46371d..8456c03a52f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -91,7 +91,7 @@ public class QueryPlanner {
             .collect(toImmutableList());
 
     List<Expression> orderBy = analysis.getOrderByExpressions(query);
-    if (orderBy.size() > 0) {
+    if (!orderBy.isEmpty()) {
       builder =
           builder.appendProjections(
               Iterables.concat(orderBy, outputs), symbolAllocator, 
queryContext);
@@ -134,7 +134,8 @@ public class QueryPlanner {
         // Add projections for aggregations required by ORDER BY. After this 
step, grouping keys and
         // translated
         // aggregations are visible.
-        List<Expression> orderByAggregates = 
analysis.getOrderByAggregates(node.getOrderBy().get());
+        List<Expression> orderByAggregates =
+            analysis.getOrderByAggregates(node.getOrderBy().orElse(null));
         builder = builder.appendProjections(orderByAggregates, 
symbolAllocator, queryContext);
       }
 
@@ -146,16 +147,15 @@ public class QueryPlanner {
       // The new scope is the composite of the fields from the FROM and SELECT 
clause (local nested
       // scopes). Fields from the bottom of
       // the scope stack need to be placed first to match the expected layout 
for nested scopes.
-      List<Symbol> newFields = new ArrayList<>();
-      newFields.addAll(builder.getTranslations().getFieldSymbolsList());
+      List<Symbol> newFields = new 
ArrayList<>(builder.getTranslations().getFieldSymbolsList());
 
       outputs.stream().map(builder::translate).forEach(newFields::add);
 
-      builder = builder.withScope(analysis.getScope(node.getOrderBy().get()), 
newFields);
+      builder = 
builder.withScope(analysis.getScope(node.getOrderBy().orElse(null)), newFields);
     }
 
     List<Expression> orderBy = analysis.getOrderByExpressions(node);
-    if (orderBy.size() > 0) {
+    if (!orderBy.isEmpty()) {
       builder =
           builder.appendProjections(
               Iterables.concat(orderBy, outputs), symbolAllocator, 
queryContext);
@@ -220,7 +220,7 @@ public class QueryPlanner {
     if (node.getFrom().isPresent()) {
       RelationPlan relationPlan =
           new RelationPlanner(analysis, symbolAllocator, queryContext, 
session, recursiveSubqueries)
-              .process(node.getFrom().get(), null);
+              .process(node.getFrom().orElse(null), null);
       return newPlanBuilder(relationPlan, analysis);
     } else {
       throw new IllegalStateException("From clause must not by empty");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index d85fcc9056c..8b974707298 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -125,16 +125,19 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
       throw new IllegalStateException("Table " + table.getName() + " has no 
prefix!");
     }
 
+    QualifiedObjectName qualifiedObjectName =
+        new QualifiedObjectName(
+            
qualifiedName.getPrefix().map(QualifiedName::toString).orElse(null),
+            qualifiedName.getSuffix());
+    Map<Symbol, ColumnSchema> tableColumnSchema = symbolToColumnSchema.build();
+    analysis.addTableSchema(qualifiedObjectName, tableColumnSchema);
     TableScanNode tableScanNode =
         new TableScanNode(
             idAllocator.genPlanNodeId(),
-            new QualifiedObjectName(
-                
qualifiedName.getPrefix().map(QualifiedName::toString).orElse(null),
-                qualifiedName.getSuffix()),
+            qualifiedObjectName,
             outputSymbols,
-            symbolToColumnSchema.build(),
+            tableColumnSchema,
             idAndAttributeIndexMap);
-
     return new RelationPlan(tableScanNode, scope, outputSymbols);
 
     // Collection<Field> fields = 
analysis.getMaterializedViewStorageTableFields(node);
@@ -156,7 +159,6 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     throw new IllegalStateException("Unsupported node type: " + 
node.getClass().getName());
   }
 
-  // ================================ Implemented later 
=====================================
   @Override
   protected RelationPlan visitTableSubquery(TableSubquery node, Void context) {
     RelationPlan plan = process(node.getQuery(), context);
@@ -164,6 +166,8 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     return new RelationPlan(plan.getRoot(), analysis.getScope(node), 
plan.getFieldMappings());
   }
 
+  // ================================ Implemented later 
=====================================
+
   @Override
   protected RelationPlan visitValues(Values node, Void context) {
     throw new IllegalStateException("Values is not supported in current 
version.");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
index ea1a78a5573..aa29710a20b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TranslationMap.java
@@ -226,18 +226,16 @@ public class TranslationMap {
           public Expression rewriteFieldReference(
               FieldReference node, Void context, ExpressionTreeRewriter<Void> 
treeRewriter) {
             Optional<SymbolReference> mapped = tryGetMapping(node);
-            if (mapped.isPresent()) {
-              return mapped.get();
-            }
-
-            return getSymbolForColumn(node)
-                .map(Symbol::toSymbolReference)
-                .orElseThrow(
-                    () ->
-                        new IllegalStateException(
-                            format(
-                                "No symbol mapping for node '%s' (%s)",
-                                node, node.getFieldIndex())));
+            return mapped.orElseGet(
+                () ->
+                    getSymbolForColumn(node)
+                        .map(Symbol::toSymbolReference)
+                        .orElseThrow(
+                            () ->
+                                new IllegalStateException(
+                                    format(
+                                        "No symbol mapping for node '%s' (%s)",
+                                        node, node.getFieldIndex()))));
           }
 
           @Override
@@ -266,7 +264,7 @@ public class TranslationMap {
 
             List<Expression> newArguments =
                 node.getArguments().stream()
-                    .map(argument -> rewrite(argument))
+                    .map(TranslationMap.this::rewrite)
                     .collect(Collectors.toList());
             return new FunctionCall(node.getName(), node.isDistinct(), 
newArguments);
           }
@@ -323,8 +321,7 @@ public class TranslationMap {
 
   private static void verifyAstExpression(Expression astExpression) {
     verify(
-        AstUtil.preOrder(astExpression)
-            .noneMatch(expression -> expression instanceof SymbolReference),
+        
AstUtil.preOrder(astExpression).noneMatch(SymbolReference.class::isInstance),
         "symbol references are not allowed");
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 70a87f06dec..b47eb51db94 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -27,6 +27,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LimitOffsetPushDown;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TablePlanOptimizer;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
 
 import java.util.Collections;
@@ -42,12 +44,14 @@ public class TableDistributionPlanner {
   private final Analysis analysis;
   private final LogicalQueryPlan logicalQueryPlan;
   private final MPPQueryContext mppQueryContext;
+  private final List<TablePlanOptimizer> optimizers;
 
   public TableDistributionPlanner(
       Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext 
mppQueryContext) {
     this.analysis = analysis;
     this.logicalQueryPlan = logicalQueryPlan;
     this.mppQueryContext = mppQueryContext;
+    this.optimizers = Collections.singletonList(new LimitOffsetPushDown());
   }
 
   public DistributedQueryPlan plan() {
@@ -59,6 +63,12 @@ public class TableDistributionPlanner {
             .genResult(logicalQueryPlan.getRootNode(), planContext);
     checkArgument(distributedPlanResult.size() == 1, "Root node must return 
only one");
 
+    // distribute plan optimize rule
+    this.optimizers.forEach(
+        optimizer ->
+            optimizer.optimize(
+                distributedPlanResult.get(0), analysis, null, null, 
mppQueryContext));
+
     // add exchange node for distributed plan
     PlanNode outputNodeWithExchange =
         new AddExchangeNodes(mppQueryContext)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
index 0dc0a4cb570..7920216428d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
@@ -33,7 +33,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-/** CollectNode output the content of children. */
+/**
+ * CollectNode output the content of children. Normally it will output the 
child one by one, but in
+ * some cases, while some children are blocked, it may output the content of 
other children.
+ */
 public class CollectNode extends MultiChildProcessNode {
 
   public CollectNode(PlanNodeId id) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index c4283bb654a..9207bfe5512 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -377,14 +377,26 @@ public class TableScanNode extends SourceNode {
     return this.pushDownLimit;
   }
 
+  public void setPushDownLimit(long pushDownLimit) {
+    this.pushDownLimit = pushDownLimit;
+  }
+
   public long getPushDownOffset() {
     return this.pushDownOffset;
   }
 
+  public void setPushDownOffset(long pushDownOffset) {
+    this.pushDownOffset = pushDownOffset;
+  }
+
   public Expression getPushDownPredicate() {
     return this.pushDownPredicate;
   }
 
+  public void setPushLimitToEachDevice(boolean pushLimitToEachDevice) {
+    this.pushLimitToEachDevice = pushLimitToEachDevice;
+  }
+
   public void setPushDownPredicate(@Nullable Expression pushDownPredicate) {
     this.pushDownPredicate = pushDownPredicate;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
new file mode 100644
index 00000000000..4bf01ac1f72
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
+import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+
+/**
+ * <b>Optimization phase:</b> Distributed plan planning.
+ *
+ * <p>The LIMIT OFFSET condition can be pushed down to the SeriesScanNode, 
when the following
+ * conditions are met:
+ * <li>Time series query (not aggregation query).
+ * <li>The query expressions are all scalar expression.
+ * <li>Order by all IDs, limit can be pushed down, set 
pushDownToEachDevice==false
+ * <li>Order by some IDs or order by time, limit can be pushed down, set 
pushDownToEachDevice==true
+ */
+public class LimitOffsetPushDown implements TablePlanOptimizer {
+
+  @Override
+  public PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context) {
+    if (!(analysis.getStatement() instanceof Query)) {
+      return planNode;
+    }
+
+    return planNode.accept(new Rewriter(), new Context(analysis));
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+
+    @Override
+    public PlanNode visitPlan(PlanNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      return newNode;
+    }
+
+    @Override
+    public PlanNode visitOutput(OutputNode node, Context context) {
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitLimit(LimitNode node, Context context) {
+      context.setLimit(node.getCount());
+      node.setChild(node.getChild().accept(this, context));
+      return node;
+    }
+
+    @Override
+    public PlanNode visitOffset(OffsetNode node, Context context) {
+      context.setOffset(node.getCount());
+      if (context.getLimit() > 0) {
+        context.setLimit(context.getLimit() + context.getOffset());
+      }
+      node.setChild(node.getChild().accept(this, context));
+      return node;
+    }
+
+    @Override
+    public PlanNode visitCollect(CollectNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      return newNode;
+    }
+
+    @Override
+    public PlanNode visitProject(ProjectNode node, Context context) {
+      for (Expression expression : node.getAssignments().getMap().values()) {
+        if (containsDiffFunction(expression)) {
+          context.setEnablePushDown(false);
+          return node;
+        }
+      }
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public PlanNode visitSort(SortNode node, Context context) {
+      Context newContext =
+          new Context(
+              context.analysis,
+              context.getLimit(),
+              context.getOffset(),
+              context.isEnablePushDown(),
+              context.canPushLimitToEachDevice());
+      PlanNode child = node.getChild().accept(this, newContext);
+      if (!newContext.isEnablePushDown()) {
+        return node;
+      }
+
+      OrderingScheme orderingScheme = node.getOrderingScheme();
+      TableScanNode tableScanNode = newContext.getTableScanNode();
+      Map<Symbol, ColumnSchema> tableColumnSchema =
+          
context.getAnalysis().getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+      Set<Symbol> sortSymbols = new HashSet<>();
+      for (Symbol orderBy : orderingScheme.getOrderBy()) {
+        if (TIMESTAMP_STR.equalsIgnoreCase(orderBy.getName())) {
+          break;
+        }
+
+        // order by measurement or expression, can not push down limit
+        if (!tableColumnSchema.containsKey(orderBy)
+            || tableColumnSchema.get(orderBy).getColumnCategory()
+                == TsTableColumnCategory.MEASUREMENT) {
+          tableScanNode.setPushDownLimit(0);
+          tableScanNode.setPushDownOffset(0);
+          return node;
+        }
+
+        sortSymbols.add(orderBy);
+      }
+
+      boolean pushLimitToEachDevice = false;
+      for (Map.Entry<Symbol, ColumnSchema> entry : 
tableColumnSchema.entrySet()) {
+        if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
+            && !sortSymbols.contains(entry.getKey())) {
+          pushLimitToEachDevice = true;
+          break;
+        }
+      }
+      tableScanNode.setPushLimitToEachDevice(pushLimitToEachDevice);
+      node.setChild(child);
+      return node;
+    }
+
+    @Override
+    public PlanNode visitFilter(FilterNode node, Context context) {
+      // If there is still a FilterNode here, it means that there are read 
filter conditions that
+      // cannot be pushed
+      // down to TableScan.
+      context.setEnablePushDown(false);
+      return node;
+    }
+
+    @Override
+    public PlanNode visitTableScan(TableScanNode node, Context context) {
+      context.setTableScanNode(node);
+      if (context.getLimit() > 0) {
+        node.setPushDownLimit(context.getLimit());
+      }
+      // TODO only one data region, pushDownOffset can be set
+      //      if (context.getOffset() > 0) {
+      //        node.setPushDownOffset(context.getOffset());
+      //      }
+      if (context.canPushLimitToEachDevice()) {
+        node.setPushLimitToEachDevice(true);
+      }
+      return node;
+    }
+  }
+
+  private static class Context {
+    private final Analysis analysis;
+    private long limit;
+    private long offset;
+    private boolean enablePushDown = true;
+    private boolean pushLimitToEachDevice = false;
+    private TableScanNode tableScanNode;
+
+    public Context(Analysis analysis) {
+      this.analysis = analysis;
+    }
+
+    public Context(
+        Analysis analysis,
+        long limit,
+        long offset,
+        boolean enablePushDown,
+        boolean pushLimitToEachDevice) {
+      this.analysis = analysis;
+      this.limit = limit;
+      this.offset = offset;
+      this.enablePushDown = enablePushDown;
+      this.pushLimitToEachDevice = pushLimitToEachDevice;
+    }
+
+    public Analysis getAnalysis() {
+      return analysis;
+    }
+
+    public long getLimit() {
+      return limit;
+    }
+
+    public void setLimit(long limit) {
+      this.limit = limit;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+
+    public void setOffset(long offset) {
+      this.offset = offset;
+    }
+
+    public boolean isEnablePushDown() {
+      return enablePushDown;
+    }
+
+    public void setEnablePushDown(boolean enablePushDown) {
+      this.enablePushDown = enablePushDown;
+    }
+
+    public boolean canPushLimitToEachDevice() {
+      return pushLimitToEachDevice;
+    }
+
+    public void setPushLimitToE5achDevice(boolean pushLimitToEachDevice) {
+      this.pushLimitToEachDevice = pushLimitToEachDevice;
+    }
+
+    public TableScanNode getTableScanNode() {
+      return tableScanNode;
+    }
+
+    public void setTableScanNode(TableScanNode tableScanNode) {
+      this.tableScanNode = tableScanNode;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 98b10ab23ce..63d55214779 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -55,21 +55,25 @@ import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTim
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.extractGlobalTimeFilter;
 
 /**
- * After the optimized rule {@link SimplifyExpressions} finished, predicate 
expression in FilterNode
- * has been transformed to conjunctive normal forms(CNF).
+ * <b>Optimization phase:</b> Logical plan planning.
  *
- * <p>1. In this class, we examine each expression in CNFs, determine how to 
use it, in metadata
- * query, or pushed down into ScanOperators, or it can only be used in 
FilterNode above with
- * TableScanNode.
- * <li>For metadata query expressions, it will be used in {@code 
tableIndexScan} method to generate
- *     the deviceEntries and DataPartition used for TableScanNode.
- * <li>For expressions which can be pushed into TableScanNode, we will execute 
{@code
- *     extractGlobalTimeFilter}, to extract the timePredicate and 
pushDownValuePredicate.
- * <li>Expression which can not be pushed down into TableScanNode, will be 
used in the FilterNode
- *     above of TableScanNode.
+ * <p>After the optimized rule {@link SimplifyExpressions} finished, predicate 
expression in
+ * FilterNode has been transformed to conjunctive normal forms(CNF).
  *
- *     <p>Notice that, when aggregation, multi-table, join are introduced, 
this optimization rule
- *     need to be adapted.
+ * <p>In this class, we examine each expression in CNFs, determine how to use 
it, in metadata query,
+ * or pushed down into ScanOperators, or it can only be used in FilterNode 
above with TableScanNode.
+ *
+ * <ul>
+ *   <li>For metadata query expressions, it will be used in {@code 
tableIndexScan} method to
+ *       generate the deviceEntries and DataPartition used for TableScanNode.
+ *   <li>For expressions which can be pushed into TableScanNode, we will 
execute {@code
+ *       extractGlobalTimeFilter}, to extract the timePredicate and 
pushDownValuePredicate.
+ *   <li>Expression which can not be pushed down into TableScanNode, will be 
used in the FilterNode
+ *       above of TableScanNode.
+ * </ul>
+ *
+ * <p>Notice that, when aggregation, multi-table, join are introduced, this 
optimization rule need
+ * to be adapted.
  */
 public class PushPredicateIntoTableScan implements TablePlanOptimizer {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
index e6bf1ee22c2..2b0256ee2d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExtractCommonPredicatesExpressionRewriter.extractCommonPredicates;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.NormalizeOrExpressionRewriter.normalizeOrExpression;
 
+/** <b>Optimization phase:</b> Logical plan planning. */
 public class SimplifyExpressions implements TablePlanOptimizer {
 
   @Override
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
new file mode 100644
index 00000000000..3cdaa96bb16
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneUnUsedColumns;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SimplifyExpressions;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.TablePlanOptimizer;
+
+import org.junit.Test;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC;
+import static 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.DESC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LimitOffsetPushDownTest {
+  QueryId queryId = new QueryId("test_query");
+  SessionInfo sessionInfo =
+      new SessionInfo(
+          1L,
+          "iotdb-user",
+          ZoneId.systemDefault(),
+          IoTDBConstant.ClientVersion.V_1_0,
+          "db",
+          IClientSession.SqlDialect.TABLE);
+  Metadata metadata = new TestMatadata();
+  String sql;
+  Analysis actualAnalysis;
+  MPPQueryContext context;
+  LogicalPlanner logicalPlanner;
+  LogicalQueryPlan logicalQueryPlan;
+  PlanNode rootNode;
+  TableDistributionPlanner distributionPlanner;
+  DistributedQueryPlan distributedQueryPlan;
+  TableScanNode tableScanNode;
+  List<TablePlanOptimizer> planOptimizerList =
+      Arrays.asList(
+          new SimplifyExpressions(),
+          new PruneUnUsedColumns(),
+          new PushPredicateIntoTableScan(),
+          new RemoveRedundantIdentityProjections());
+
+  // without sort operation, limit can be pushed into TableScan, 
pushLimitToEachDevice==false
+  // Output - Project - Limit - Offset - Collect - TableScan
+  @Test
+  public void noOrderByTest() {
+    sql = "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 offset 
5 limit 10";
+    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+    actualAnalysis = analyzeSQL(sql, metadata);
+    logicalQueryPlan =
+        new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList, 
WarningCollector.NOOP)
+            .plan(actualAnalysis);
+    rootNode = logicalQueryPlan.getRootNode();
+    assertTrue(getChildrenNode(rootNode, 4) instanceof TableScanNode);
+
+    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
+    distributedQueryPlan = distributionPlanner.plan();
+    assertEquals(3, distributedQueryPlan.getFragments().size());
+    CollectNode collectNode =
+        (CollectNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
5);
+    assertTrue(collectNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(collectNode.getChildren().get(1) instanceof TableScanNode);
+    assertTrue(collectNode.getChildren().get(2) instanceof ExchangeNode);
+    tableScanNode = (TableScanNode) collectNode.getChildren().get(1);
+    assertEquals(4, tableScanNode.getDeviceEntries().size());
+    assertEquals(ASC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertFalse(tableScanNode.isPushLimitToEachDevice());
+
+    tableScanNode =
+        (TableScanNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
1);
+    assertEquals(2, tableScanNode.getDeviceEntries().size());
+    assertEquals(ASC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertFalse(tableScanNode.isPushLimitToEachDevice());
+  }
+
+  // order by all tags, limit can be pushed into TableScan, 
pushLimitToEachDevice==false
+  // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+  @Test
+  public void orderByAllTagsTest() {
+    sql =
+        "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by 
tag2 desc, tag1 asc, attr1 desc, tag3 desc, time desc, s1+s3 asc offset 5 limit 
10";
+    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+    actualAnalysis = analyzeSQL(sql, metadata);
+    logicalQueryPlan =
+        new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList, 
WarningCollector.NOOP)
+            .plan(actualAnalysis);
+    rootNode = logicalQueryPlan.getRootNode();
+    assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
+    distributedQueryPlan = distributionPlanner.plan();
+    assertEquals(3, distributedQueryPlan.getFragments().size());
+    MergeSortNode mergeSortNode =
+        (MergeSortNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
4);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+    assertEquals(4, tableScanNode.getDeviceEntries().size());
+    assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertFalse(tableScanNode.isPushLimitToEachDevice());
+
+    tableScanNode =
+        (TableScanNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
4);
+    assertEquals(2, tableScanNode.getDeviceEntries().size());
+    assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertFalse(tableScanNode.isPushLimitToEachDevice());
+  }
+
+  // order by some tags, limit can be pushed into TableScan, 
pushLimitToEachDevice==true
+  // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+  @Test
+  public void orderBySomeTagsTest() {
+    sql =
+        "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by 
tag2 desc, attr1 desc, tag3 desc, time desc, s1+s3 asc offset 5 limit 10";
+    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+    actualAnalysis = analyzeSQL(sql, metadata);
+    logicalQueryPlan =
+        new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList, 
WarningCollector.NOOP)
+            .plan(actualAnalysis);
+    rootNode = logicalQueryPlan.getRootNode();
+    assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
+    distributedQueryPlan = distributionPlanner.plan();
+    assertEquals(3, distributedQueryPlan.getFragments().size());
+    MergeSortNode mergeSortNode =
+        (MergeSortNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
4);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+    assertEquals(4, tableScanNode.getDeviceEntries().size());
+    assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertTrue(tableScanNode.isPushLimitToEachDevice());
+
+    tableScanNode =
+        (TableScanNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
4);
+    assertEquals(2, tableScanNode.getDeviceEntries().size());
+    assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertTrue(tableScanNode.isPushLimitToEachDevice());
+  }
+
+  // order by time, limit can be pushed into TableScan, 
pushLimitToEachDevice==true
+  // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+  @Test
+  public void orderByTimeTest() {
+    sql =
+        "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by 
time desc, tag2 asc, s1+s3 asc offset 5 limit 10";
+    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+    actualAnalysis = analyzeSQL(sql, metadata);
+    logicalQueryPlan =
+        new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList, 
WarningCollector.NOOP)
+            .plan(actualAnalysis);
+    rootNode = logicalQueryPlan.getRootNode();
+    assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
+    distributedQueryPlan = distributionPlanner.plan();
+    assertEquals(3, distributedQueryPlan.getFragments().size());
+    MergeSortNode mergeSortNode =
+        (MergeSortNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
4);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+    assertEquals(4, tableScanNode.getDeviceEntries().size());
+    assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertTrue(tableScanNode.isPushLimitToEachDevice());
+
+    tableScanNode =
+        (TableScanNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
4);
+    assertEquals(2, tableScanNode.getDeviceEntries().size());
+    assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
+    assertTrue(tableScanNode.isPushLimitToEachDevice());
+  }
+
+  // order by others, limit can not be pushed into TableScan
+  // Output - Limit - Offset - MergeSort - Sort - Project - Project - TableScan
+  @Test
+  public void orderByOthersTest() {
+    sql =
+        "SELECT time, tag3, cast(s2 AS double) FROM table1 where s1>1 order by 
s1 desc, tag2 desc, attr1 desc, tag3 desc, time desc, s1+s3 asc offset 5 limit 
10";
+    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+    actualAnalysis = analyzeSQL(sql, metadata);
+    logicalQueryPlan =
+        new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList, 
WarningCollector.NOOP)
+            .plan(actualAnalysis);
+    rootNode = logicalQueryPlan.getRootNode();
+    assertTrue(getChildrenNode(rootNode, 6) instanceof TableScanNode);
+
+    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
+    distributedQueryPlan = distributionPlanner.plan();
+    assertEquals(3, distributedQueryPlan.getFragments().size());
+    MergeSortNode mergeSortNode =
+        (MergeSortNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
4);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 3);
+    assertEquals(4, tableScanNode.getDeviceEntries().size());
+    assertEquals(ASC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
+
+    tableScanNode =
+        (TableScanNode)
+            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
4);
+    assertEquals(2, tableScanNode.getDeviceEntries().size());
+    assertEquals(ASC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
+  }
+
+  private PlanNode getChildrenNode(PlanNode root, int idx) {
+    PlanNode result = root;
+    for (int i = 1; i <= idx; i++) {
+      result = result.getChildren().get(0);
+    }
+    return result;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index b5a71999a3e..aa539c1cf19 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -122,7 +122,7 @@ public class SortTest {
   @Test
   public void timeOthersSomeIDColumnSortTest() {
     sql =
-        "SELECT time, tag3, tag1, cast(s2 as double), s2+s3, attr1 FROM table1 
"
+        "SELECT time, tag3, substring(tag1, 1), cast(s2 as double), s2+s3, 
attr1 FROM table1 "
             + "where s1>1 and s1+s3>0 and cast(s1 as double)>1.0 order by time 
desc, s1+s2 asc, tag2 asc, tag1 desc offset 5 limit 10";
     context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
     actualAnalysis = analyzeSQL(sql, metadata);
@@ -131,8 +131,6 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - SortNode - ProjectNode - 
ProjectNode - FilterNode -
-    // TableScanNode
     assertTrue(rootNode instanceof OutputNode);
     assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
     assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
@@ -141,7 +139,7 @@ public class SortTest {
             instanceof SortNode);
     SortNode sortNode =
         (SortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
-    // TODO(beyyes) if these two ProjectNode can be merged into one?
+    // TODO(beyyes) merge parent and child ProjectNode into one, parent 
contains all children
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -162,7 +160,7 @@ public class SortTest {
                 .getChildren()
                 .get(0);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
-    // TODO(beyyes) fix the case prune unUsed Columns when the child of 
SortNode is ProjectNode
+    // TODO(beyyes) fix the case prune unused columns when the child of 
SortNode is ProjectNode
     //    assertEquals(
     //        Arrays.asList("time", "tag1", "tag2", "attr1", "s1", "s2"),
     //        tableScanNode.getOutputColumnNames());
@@ -170,9 +168,7 @@ public class SortTest {
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - MergeSortNode - SortNode - 
ProjectNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // Output - Limit - Offset - MergeSort - Sort - Project - Project - Filter 
- TableScan
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -228,8 +224,9 @@ public class SortTest {
             .map(d -> d.getDeviceID().toString())
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
 
-    // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode - 
TableScanNode
+    // IdentitySink - Sort - Project - Project - Filter - TableScan
     assertTrue(
         distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
     assertTrue(
@@ -298,6 +295,7 @@ public class SortTest {
             .map(d -> d.getDeviceID().toString())
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
 
     // TODO(beyyes) add only one device entry optimization and verifies
   }
@@ -348,10 +346,9 @@ public class SortTest {
     assertEquals(9, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
 
-    // OutputNode - LimitNode - OffsetNode - MergeSortNode - SortNode - 
ProjectNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // Output - Limit - Offset - MergeSort - Sort - Project - Project - Filter 
- TableScan
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -407,8 +404,9 @@ public class SortTest {
             .map(d -> d.getDeviceID().toString())
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
 
-    // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode - 
TableScanNode
+    // IdentitySink - Sort - Project - Project - Filter - TableScan
     assertTrue(
         distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
     assertTrue(
@@ -477,6 +475,7 @@ public class SortTest {
             .map(d -> d.getDeviceID().toString())
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
   }
 
   // order by time, some_ids, others
@@ -492,8 +491,7 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - SortNode - ProjectNode - 
ProjectNode - FilterNode -
-    // TableScanNode
+    // Output - Limit - Offset - Sort - Project - Project - Filter - TableScan
     assertTrue(rootNode instanceof OutputNode);
     assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
     assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
@@ -525,10 +523,8 @@ public class SortTest {
     assertEquals(9, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
 
-    // OutputNode - LimitNode - OffsetNode - MergeSortNode - SortNode - 
ProjectNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
@@ -584,6 +580,7 @@ public class SortTest {
             .map(d -> d.getDeviceID().toString())
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
 
     // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode - 
TableScanNode
     assertTrue(
@@ -654,6 +651,7 @@ public class SortTest {
             .map(d -> d.getDeviceID().toString())
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
+    assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
   }
 
   // order by time, all_ids, others
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
new file mode 100644
index 00000000000..bcf776321f0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestPlanBuilder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanBuilder {
+
+  private PlanNode root;
+
+  public TestPlanBuilder() {}
+
+  public PlanNode getRoot() {
+    return root;
+  }
+
+  public TestPlanBuilder output(String id, List<String> columnNames, 
List<Symbol> outputSymbols) {
+    this.root = new OutputNode(new PlanNodeId(id), this.root, columnNames, 
outputSymbols);
+    return this;
+  }
+
+  public TestPlanBuilder limit(String id, long count) {
+    this.root = new LimitNode(new PlanNodeId(id), this.root, count, null);
+    return this;
+  }
+
+  public TestPlanBuilder offset(String id, long count) {
+    this.root = new OffsetNode(new PlanNodeId(id), this.root, count);
+    return this;
+  }
+
+  public TestPlanBuilder project(String id, Assignments assignments) {
+    this.root = new ProjectNode(new PlanNodeId(id), this.root, assignments);
+    return this;
+  }
+
+  public TestPlanBuilder filter(String id, Expression predicate) {
+    this.root = new FilterNode(new PlanNodeId(id), this.root, predicate);
+    return this;
+  }
+
+  public TestPlanBuilder tableScan(
+      String id,
+      QualifiedObjectName qualifiedObjectName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      List<DeviceEntry> deviceEntries,
+      Map<Symbol, Integer> idAndAttributeIndexMap,
+      Ordering scanOrder,
+      Expression timePredicate,
+      Expression pushDownPredicate) {
+    this.root =
+        new TableScanNode(
+            new PlanNodeId(id),
+            qualifiedObjectName,
+            outputSymbols,
+            assignments,
+            deviceEntries,
+            idAndAttributeIndexMap,
+            scanOrder,
+            timePredicate,
+            pushDownPredicate);
+    return this;
+  }
+}

Reply via email to