This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new d3d3b606f9c Add stream sort and sort elimination optimize rule
d3d3b606f9c is described below

commit d3d3b606f9cff833152bf7e22d567d425c4b9f9b
Author: Beyyes <[email protected]>
AuthorDate: Thu Jul 18 13:10:07 2024 +0800

    Add stream sort and sort elimination optimize rule
---
 .../plan/relational/analyzer/Analysis.java         |  15 +-
 .../plan/relational/planner/LogicalPlanner.java    |   5 +-
 .../plan/relational/planner/QueryPlanner.java      |   1 +
 .../distribute/DistributedPlanGenerator.java       |  32 ++
 .../distribute/TableDistributionPlanner.java       |   4 +-
 .../TableModelTypeProviderExtractor.java           |   7 +
 .../plan/relational/planner/node/FilterNode.java   |   2 +-
 .../plan/relational/planner/node/LimitNode.java    |   2 +-
 .../plan/relational/planner/node/OffsetNode.java   |   2 +-
 .../plan/relational/planner/node/OutputNode.java   |   2 +-
 .../plan/relational/planner/node/ProjectNode.java  |   2 +-
 .../plan/relational/planner/node/SortNode.java     |   6 +-
 .../relational/planner/node/StreamSortNode.java    |  17 +
 .../planner/optimizations/OptimizeFactory.java     |   4 +-
 .../PushLimitOffsetIntoTableScan.java              |   6 +
 .../planner/optimizations/SortElimination.java     | 134 ++++++
 .../optimizations/TransformSortToStreamSort.java   | 123 +++++
 .../analyzer/LimitOffsetPushDownTest.java          |  11 +-
 .../plan/relational/analyzer/SortTest.java         | 503 +++++----------------
 19 files changed, 483 insertions(+), 395 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index f913ddbf98e..095248a7391 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -169,9 +169,12 @@ public class Analysis implements IAnalysis {
 
   private boolean finishQueryAfterAnalyze;
 
-  // indicate is there a value filter
+  // indicate if value filter exists in query
   private boolean hasValueFilter = false;
 
+  // indicate if sort node exists in query
+  private boolean hasSortNode = false;
+
   // if emptyDataSource, there is no need to execute the query in BE
   private boolean emptyDataSource = false;
 
@@ -585,10 +588,18 @@ public class Analysis implements IAnalysis {
     return hasValueFilter;
   }
 
-  public void setHasValueFilter(boolean hasValueFilter) {
+  public void setValueFilter(boolean hasValueFilter) {
     this.hasValueFilter = hasValueFilter;
   }
 
+  public boolean hasSortNode() {
+    return hasSortNode;
+  }
+
+  public void setSortNode(boolean hasSortNode) {
+    this.hasSortNode = hasSortNode;
+  }
+
   public boolean isEmptyDataSource() {
     return emptyDataSource;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 7d778d2ebf2..c88dc681a13 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -75,8 +75,9 @@ public class LogicalPlanner {
     this.metadata = metadata;
     this.sessionInfo = requireNonNull(sessionInfo, "session is null");
     this.warningCollector = requireNonNull(warningCollector, "warningCollector 
is null");
-    PlannerContext plannerContext = new PlannerContext(metadata, new 
InternalTypeManager());
-    this.planOptimizers = new 
OptimizeFactory(plannerContext).getPlanOptimizers();
+    this.planOptimizers =
+        new OptimizeFactory(new PlannerContext(metadata, new 
InternalTypeManager()))
+            .getPlanOptimizers();
   }
 
   public LogicalQueryPlan plan(Analysis analysis) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index ae7f47f3058..a9c094d7d9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -152,6 +152,7 @@ public class QueryPlanner {
       outputs.stream().map(builder::translate).forEach(newFields::add);
 
       builder = 
builder.withScope(analysis.getScope(node.getOrderBy().orElse(null)), newFields);
+      analysis.setSortNode(true);
     }
 
     List<Expression> orderBy = analysis.getOrderByExpressions(node);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 61543eda7d1..f37c384e4d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -218,6 +219,37 @@ public class DistributedPlanGenerator
     return Collections.singletonList(mergeSortNode);
   }
 
+  @Override
+  public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext 
context) {
+    context.expectedOrderingScheme = node.getOrderingScheme();
+    context.hasSortNode = true;
+    nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
+
+    List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      node.setChild(childrenNodes.get(0));
+      return Collections.singletonList(node);
+    }
+
+    // may have ProjectNode above SortNode later, so use MergeSortNode but not 
return SortNode list
+    MergeSortNode mergeSortNode =
+        new MergeSortNode(
+            queryId.genPlanNodeId(), node.getOrderingScheme(), 
node.getOutputSymbols());
+    for (PlanNode child : childrenNodes) {
+      StreamSortNode subSortNode =
+          new StreamSortNode(
+              queryId.genPlanNodeId(),
+              child,
+              node.getOrderingScheme(),
+              false,
+              node.getStreamCompareKeyEndIndex());
+      mergeSortNode.addChild(subSortNode);
+    }
+    nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), 
mergeSortNode.getOrderingScheme());
+
+    return Collections.singletonList(mergeSortNode);
+  }
+
   @Override
   public List<PlanNode> visitFilter(FilterNode node, PlanContext context) {
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index ce2c70b166b..04caab49710 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -30,8 +30,10 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.execution.querystats.Plan
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushLimitOffsetIntoTableScan;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SortElimination;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -51,7 +53,7 @@ public class TableDistributionPlanner {
     this.analysis = analysis;
     this.logicalQueryPlan = logicalQueryPlan;
     this.mppQueryContext = mppQueryContext;
-    this.optimizers = Collections.singletonList(new 
PushLimitOffsetIntoTableScan());
+    this.optimizers = Arrays.asList(new PushLimitOffsetIntoTableScan(), new 
SortElimination());
   }
 
   public DistributedQueryPlan plan() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index 9065ddaa4dc..08369519559 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 
@@ -119,6 +120,12 @@ public class TableModelTypeProviderExtractor {
       return null;
     }
 
+    @Override
+    public Void visitStreamSort(StreamSortNode node, Void context) {
+      node.getChild().accept(this, context);
+      return null;
+    }
+
     @Override
     public Void visitMergeSort(MergeSortNode node, Void context) {
       node.getChildren().forEach(c -> c.accept(this, context));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
index 52485ca9789..812979ed239 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
@@ -50,7 +50,7 @@ public class FilterNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new FilterNode(id, child, predicate);
+    return new FilterNode(id, null, predicate);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
index a0862384d87..8e7b0292b0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
@@ -54,7 +54,7 @@ public class LimitNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LimitNode(id, child, count, tiesResolvingScheme);
+    return new LimitNode(id, null, count, tiesResolvingScheme);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
index 334e22420a9..fc363190d2b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
@@ -40,7 +40,7 @@ public class OffsetNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new OffsetNode(id, child, count);
+    return new OffsetNode(id, null, count);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
index 8061dd043cb..219b68d3e72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
@@ -62,7 +62,7 @@ public class OutputNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new OutputNode(id, child, columnNames, outputSymbols);
+    return new OutputNode(id, null, columnNames, outputSymbols);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
index 0653ac161d8..05fb45ecd07 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
@@ -49,7 +49,7 @@ public class ProjectNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new ProjectNode(id, child, assignments);
+    return new ProjectNode(id, null, assignments);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
index 8ba5839fa00..2e85d983fb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
@@ -43,7 +43,7 @@ public class SortNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new SortNode(id, child, orderingScheme, partial);
+    return new SortNode(id, null, orderingScheme, partial);
   }
 
   @Override
@@ -91,6 +91,10 @@ public class SortNode extends SingleChildProcessNode {
     return orderingScheme;
   }
 
+  public boolean isPartial() {
+    return this.partial;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
index b0d419b0840..7ee9dd322fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
@@ -26,11 +26,13 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 public class StreamSortNode extends SortNode {
 
@@ -50,11 +52,26 @@ public class StreamSortNode extends SortNode {
     return streamCompareKeyEndIndex;
   }
 
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    return new StreamSortNode(
+        id,
+        Iterables.getOnlyElement(newChildren),
+        orderingScheme,
+        partial,
+        streamCompareKeyEndIndex);
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitStreamSort(this, context);
   }
 
+  @Override
+  public PlanNode clone() {
+    return new StreamSortNode(id, null, orderingScheme, partial, 
streamCompareKeyEndIndex);
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.TABLE_STREAM_SORT_NODE.serialize(byteBuffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
index 486721b46e9..08de786d49d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/OptimizeFactory.java
@@ -41,6 +41,7 @@ public class OptimizeFactory {
 
     PlanOptimizer simplifyExpressionOptimizer = new SimplifyExpressions();
     PlanOptimizer pushPredicateIntoTableScanOptimizer = new 
PushPredicateIntoTableScan();
+    PlanOptimizer transformSortToStreamSortOptimizer = new 
TransformSortToStreamSort();
 
     Set<Rule<?>> columnPruningRules =
         ImmutableSet.of(
@@ -69,7 +70,8 @@ public class OptimizeFactory {
             pushPredicateIntoTableScanOptimizer,
             // redo columnPrune and inlineProjections after 
pushPredicateIntoTableScan
             columnPruningOptimizer,
-            inlineProjectionsOptimizer);
+            inlineProjectionsOptimizer,
+            transformSortToStreamSortOptimizer);
   }
 
   public List<PlanOptimizer> getPlanOptimizers() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index 081ea33983e..a11683dde28 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
@@ -162,6 +163,11 @@ public class PushLimitOffsetIntoTableScan implements 
PlanOptimizer {
       return node;
     }
 
+    @Override
+    public PlanNode visitStreamSort(StreamSortNode node, Context context) {
+      return visitSort(node, context);
+    }
+
     @Override
     public PlanNode visitFilter(FilterNode node, Context context) {
       // If there is still a FilterNode here, it means that there are read 
filter conditions that
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
new file mode 100644
index 00000000000..23bb78c8da4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
+
+/**
+ * <b>Optimization phase:</b> Distributed plan planning.
+ *
+ * <p>This optimize rule implement the rules below.
+ * <li>When order by time and there is only one device entry in TableScanNode 
below, the SortNode
+ *     can be eliminated.
+ * <li>When order by all IDColumns and time, the SortNode can be eliminated.
+ */
+public class SortElimination implements PlanOptimizer {
+
+  @Override
+  public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
+    if (!context.getAnalysis().hasSortNode()) {
+      return plan;
+    }
+
+    return plan.accept(new Rewriter(context.getAnalysis()), new Context());
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+    private final Analysis analysis;
+
+    public Rewriter(Analysis analysis) {
+      this.analysis = analysis;
+    }
+
+    @Override
+    public PlanNode visitPlan(PlanNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      return newNode;
+    }
+
+    @Override
+    public PlanNode visitSort(SortNode node, Context context) {
+      PlanNode child = node.getChild().accept(this, context);
+      TableScanNode tableScanNode = context.getTableScanNode();
+      OrderingScheme orderingScheme = node.getOrderingScheme();
+      if (tableScanNode.getDeviceEntries().size() == 1
+          && 
TIMESTAMP_STR.equalsIgnoreCase(orderingScheme.getOrderBy().get(0).getName())) {
+        return child;
+      }
+      return tryRemoveSortWhenOrderByAllIDsAndTime(node, child, tableScanNode);
+    }
+
+    @Override
+    public PlanNode visitStreamSort(StreamSortNode node, Context context) {
+      PlanNode child = node.getChild().accept(this, context);
+      TableScanNode tableScanNode = context.getTableScanNode();
+      return tryRemoveSortWhenOrderByAllIDsAndTime(node, child, tableScanNode);
+    }
+
+    @Override
+    public PlanNode visitTableScan(TableScanNode node, Context context) {
+      context.setTableScanNode(node);
+      return node;
+    }
+
+    private PlanNode tryRemoveSortWhenOrderByAllIDsAndTime(
+        SortNode sortNode, PlanNode child, TableScanNode tableScanNode) {
+      Set<Symbol> sortSymbolsBeforeTime = new HashSet<>();
+      Map<Symbol, ColumnSchema> tableColumnSchema =
+          
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+      for (Symbol orderBy : sortNode.getOrderingScheme().getOrderBy()) {
+        if (!tableColumnSchema.containsKey(orderBy)
+            || tableColumnSchema.get(orderBy).getColumnCategory()
+                == TsTableColumnCategory.MEASUREMENT) {
+          return sortNode;
+        } else if (tableColumnSchema.get(orderBy).getColumnCategory()
+            == TsTableColumnCategory.TIME) {
+          break;
+        } else {
+          sortSymbolsBeforeTime.add(orderBy);
+        }
+      }
+
+      for (Map.Entry<Symbol, ColumnSchema> entry : 
tableColumnSchema.entrySet()) {
+        if (entry.getValue().getColumnCategory() == TsTableColumnCategory.ID
+            && !sortSymbolsBeforeTime.contains(entry.getKey())) {
+          return sortNode;
+        }
+      }
+
+      return child;
+    }
+  }
+
+  private static class Context {
+    private TableScanNode tableScanNode;
+
+    public TableScanNode getTableScanNode() {
+      return tableScanNode;
+    }
+
+    public void setTableScanNode(TableScanNode tableScanNode) {
+      this.tableScanNode = tableScanNode;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
new file mode 100644
index 00000000000..9ea378df087
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import java.util.Map;
+
+/**
+ * <b>Optimization phase:</b> Logical plan planning.
+ *
+ * <p>This optimize rule implement the rules below.
+ * <li>When the sort order is `IDColumns,Time` or `IDColumns,Others` in 
SortNode, SortNode can be
+ *     transformed to StreamSortNode.
+ */
+public class TransformSortToStreamSort implements PlanOptimizer {
+
+  @Override
+  public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
+    if (!context.getAnalysis().hasSortNode()) {
+      return plan;
+    }
+
+    return plan.accept(
+        new Rewriter(context.getAnalysis(), context.getQueryContext()), new 
Context());
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+    private final Analysis analysis;
+    private final MPPQueryContext queryContext;
+
+    public Rewriter(Analysis analysis, MPPQueryContext queryContext) {
+      this.analysis = analysis;
+      this.queryContext = queryContext;
+    }
+
+    @Override
+    public PlanNode visitPlan(PlanNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      return newNode;
+    }
+
+    @Override
+    public PlanNode visitSort(SortNode node, Context context) {
+      PlanNode child = node.getChild().accept(this, context);
+      TableScanNode tableScanNode = context.getTableScanNode();
+      Map<Symbol, ColumnSchema> tableColumnSchema =
+          
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+
+      OrderingScheme orderingScheme = node.getOrderingScheme();
+      int streamSortIndex = 0;
+      for (Symbol orderBy : orderingScheme.getOrderBy()) {
+        if (!tableColumnSchema.containsKey(orderBy)
+            || tableColumnSchema.get(orderBy).getColumnCategory()
+                == TsTableColumnCategory.MEASUREMENT
+            || tableColumnSchema.get(orderBy).getColumnCategory() == 
TsTableColumnCategory.TIME) {
+          break;
+        } else {
+          streamSortIndex++;
+        }
+      }
+
+      if (streamSortIndex > 0) {
+        return new StreamSortNode(
+            queryContext.getQueryId().genPlanNodeId(),
+            child,
+            node.getOrderingScheme(),
+            node.isPartial(),
+            streamSortIndex);
+      }
+
+      return node;
+    }
+
+    @Override
+    public PlanNode visitTableScan(TableScanNode node, Context context) {
+      context.setTableScanNode(node);
+      return node;
+    }
+  }
+
+  private static class Context {
+    private TableScanNode tableScanNode;
+
+    public TableScanNode getTableScanNode() {
+      return tableScanNode;
+    }
+
+    public void setTableScanNode(TableScanNode tableScanNode) {
+      this.tableScanNode = tableScanNode;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
index ba8b20794a3..c612a697540 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 
@@ -101,7 +102,7 @@ public class LimitOffsetPushDownTest {
   }
 
   // order by all tags, limit can be pushed into TableScan, 
pushLimitToEachDevice==false
-  // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
+  // Output - Limit - Offset - Project - MergeSort -  Project - TableScan
   @Test
   public void orderByAllTagsTest() {
     sql =
@@ -121,9 +122,9 @@ public class LimitOffsetPushDownTest {
         (MergeSortNode)
             
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
5);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
+    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 1);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(DESC, tableScanNode.getScanOrder());
     assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
@@ -131,7 +132,7 @@ public class LimitOffsetPushDownTest {
 
     tableScanNode =
         (TableScanNode)
-            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
3);
+            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
2);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(DESC, tableScanNode.getScanOrder());
     assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
@@ -250,7 +251,7 @@ public class LimitOffsetPushDownTest {
     assertTrue(tableScanNode.getPushDownLimit() == 0 && 
tableScanNode.getPushDownOffset() == 0);
   }
 
-  private PlanNode getChildrenNode(PlanNode root, int idx) {
+  static PlanNode getChildrenNode(PlanNode root, int idx) {
     PlanNode result = root;
     for (int i = 1; i <= idx; i++) {
       result = result.getChildren().get(0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index 1fcbc31ed98..3ca16e80040 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 
 import org.junit.Test;
@@ -49,6 +50,7 @@ import java.util.Arrays;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.LimitOffsetPushDownTest.getChildrenNode;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.ASC;
 import static 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering.DESC;
 import static org.junit.Assert.assertEquals;
@@ -72,6 +74,10 @@ public class SortTest {
   LogicalPlanner logicalPlanner;
   LogicalQueryPlan logicalQueryPlan;
   PlanNode rootNode;
+  OutputNode outputNode;
+  PlanNode mergeSortNode;
+  ProjectNode projectNode;
+  StreamSortNode streamSortNode;
   TableDistributionPlanner distributionPlanner;
   DistributedQueryPlan distributedQueryPlan;
   TableScanNode tableScanNode;
@@ -694,73 +700,40 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - 
ProjectNode - FilterNode -
-    // TableScanNode
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
-    assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    assertTrue(
-        sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof TableScanNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+    streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - 
SortNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -773,48 +746,13 @@ public class SortTest {
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
 
-    // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    // DistributePlan: `IdentitySink - StreamSort - Project - Filter - 
TableScan`
+    streamSortNode =
+        (StreamSortNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -837,70 +775,39 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - 
ProjectNode - FilterNode -
-    // TableScanNode
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
-    assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+    StreamSortNode streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 
4);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - 
SortNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    projectNode = (ProjectNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -913,48 +820,12 @@ public class SortTest {
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
 
-    // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    // DistributePlan: `IdentitySink-Project-Filter-TableScan`
+    projectNode =
+        (ProjectNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -977,73 +848,41 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode  - 
ProjectNode - FilterNode -
-    // TableScanNode
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
-    assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+    streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+    assertEquals(2, streamSortNode.getStreamCompareKeyEndIndex());
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - 
SortNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    assertTrue(
-        sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof TableScanNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -1056,48 +895,13 @@ public class SortTest {
             .collect(Collectors.toList()));
     assertEquals(ASC, tableScanNode.getScanOrder());
 
-    // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    // DistributePlan: `IdentitySink - StreamSort - Project - Filter - 
TableScan`
+    streamSortNode =
+        (StreamSortNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -1120,70 +924,48 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - 
ProjectNode - FilterNode -
-    // TableScanNode
+    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+    actualAnalysis = analyzeSQL(sql, metadata);
+    logicalQueryPlan =
+        new LogicalPlanner(context, metadata, sessionInfo, 
WarningCollector.NOOP)
+            .plan(actualAnalysis);
+    rootNode = logicalQueryPlan.getRootNode();
+
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
-    assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+    streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+    assertEquals(3, streamSortNode.getStreamCompareKeyEndIndex());
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - 
SortNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -1196,48 +978,13 @@ public class SortTest {
             .collect(Collectors.toList()));
     assertEquals(ASC, tableScanNode.getScanOrder());
 
-    // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode - 
TableScanNode
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    // DistributePlan: `IdentitySink - StreamSort - Project - Filter - 
TableScan`
+    streamSortNode =
+        (StreamSortNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),

Reply via email to