This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/sort_transform_elimate_topk
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 15ae2f7249f6e34a35f023be0ce740502233382a
Author: Beyyes <[email protected]>
AuthorDate: Thu Jul 11 23:44:19 2024 +0800

    add streamsort impl
---
 .../plan/relational/analyzer/Analysis.java         |  15 ++-
 .../plan/relational/planner/LogicalPlanner.java    |  10 +-
 .../distribute/DistributedPlanGenerator.java       |  32 +++++
 .../distribute/TableDistributionPlanner.java       |  13 +-
 .../TableModelTypeProviderExtractor.java           |   7 ++
 .../plan/relational/planner/node/FilterNode.java   |   2 +-
 .../plan/relational/planner/node/LimitNode.java    |   2 +-
 .../plan/relational/planner/node/OffsetNode.java   |   2 +-
 .../plan/relational/planner/node/OutputNode.java   |   2 +-
 .../plan/relational/planner/node/ProjectNode.java  |   2 +-
 .../plan/relational/planner/node/SortNode.java     |   6 +-
 .../relational/planner/node/StreamSortNode.java    |   5 +
 .../planner/optimizations/AddStreamSort.java       | 131 +++++++++++++++++++++
 .../planner/optimizations/LimitOffsetPushDown.java |   6 +
 .../planner/optimizations/PruneUnUsedColumns.java  |  16 ++-
 .../RemoveRedundantIdentityProjections.java        |  15 ++-
 .../planner/optimizations/SortElimination.java     |  35 ++++++
 .../plan/relational/analyzer/SortTest.java         |  47 ++++----
 18 files changed, 294 insertions(+), 54 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index f913ddbf98e..095248a7391 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -169,9 +169,12 @@ public class Analysis implements IAnalysis {
 
   private boolean finishQueryAfterAnalyze;
 
-  // indicate is there a value filter
+  // indicate if value filter exists in query
   private boolean hasValueFilter = false;
 
+  // indicate if sort node exists in query
+  private boolean hasSortNode = false;
+
   // if emptyDataSource, there is no need to execute the query in BE
   private boolean emptyDataSource = false;
 
@@ -585,10 +588,18 @@ public class Analysis implements IAnalysis {
     return hasValueFilter;
   }
 
-  public void setHasValueFilter(boolean hasValueFilter) {
+  public void setValueFilter(boolean hasValueFilter) {
     this.hasValueFilter = hasValueFilter;
   }
 
+  public boolean hasSortNode() {
+    return hasSortNode;
+  }
+
+  public void setSortNode(boolean hasSortNode) {
+    this.hasSortNode = hasSortNode;
+  }
+
   public boolean isEmptyDataSource() {
     return emptyDataSource;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index ac17c3ff500..8cbab03fa81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.AddStreamSort;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneUnUsedColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections;
@@ -85,7 +86,8 @@ public class LogicalPlanner {
             new SimplifyExpressions(),
             new PruneUnUsedColumns(),
             new PushPredicateIntoTableScan(),
-            new RemoveRedundantIdentityProjections());
+            new RemoveRedundantIdentityProjections(),
+            new AddStreamSort());
   }
 
   public LogicalPlanner(
@@ -103,9 +105,9 @@ public class LogicalPlanner {
 
   public LogicalQueryPlan plan(Analysis analysis) {
     PlanNode planNode = planStatement(analysis, analysis.getStatement());
-
-    tablePlanOptimizers.forEach(
-        optimizer -> optimizer.optimize(planNode, analysis, metadata, 
sessionInfo, context));
+    for (TablePlanOptimizer optimizer : tablePlanOptimizers) {
+      planNode = optimizer.optimize(planNode, analysis, metadata, sessionInfo, 
context);
+    }
 
     return new LogicalQueryPlan(context, planNode);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 61543eda7d1..f37c384e4d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -218,6 +219,37 @@ public class DistributedPlanGenerator
     return Collections.singletonList(mergeSortNode);
   }
 
+  @Override
+  public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext 
context) {
+    context.expectedOrderingScheme = node.getOrderingScheme();
+    context.hasSortNode = true;
+    nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme());
+
+    List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+    if (childrenNodes.size() == 1) {
+      node.setChild(childrenNodes.get(0));
+      return Collections.singletonList(node);
+    }
+
+    // may have ProjectNode above SortNode later, so use MergeSortNode but not 
return SortNode list
+    MergeSortNode mergeSortNode =
+        new MergeSortNode(
+            queryId.genPlanNodeId(), node.getOrderingScheme(), 
node.getOutputSymbols());
+    for (PlanNode child : childrenNodes) {
+      StreamSortNode subSortNode =
+          new StreamSortNode(
+              queryId.genPlanNodeId(),
+              child,
+              node.getOrderingScheme(),
+              false,
+              node.getStreamCompareKeyEndIndex());
+      mergeSortNode.addChild(subSortNode);
+    }
+    nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), 
mergeSortNode.getOrderingScheme());
+
+    return Collections.singletonList(mergeSortNode);
+  }
+
   @Override
   public List<PlanNode> visitFilter(FilterNode node, PlanContext context) {
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index b47eb51db94..107eff8c6a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -63,16 +63,15 @@ public class TableDistributionPlanner {
             .genResult(logicalQueryPlan.getRootNode(), planContext);
     checkArgument(distributedPlanResult.size() == 1, "Root node must return 
only one");
 
-    // distribute plan optimize rule
-    this.optimizers.forEach(
-        optimizer ->
-            optimizer.optimize(
-                distributedPlanResult.get(0), analysis, null, null, 
mppQueryContext));
+    PlanNode optimizedPlanNode = distributedPlanResult.get(0);
+    for (TablePlanOptimizer optimizer : optimizers) {
+      optimizedPlanNode =
+          optimizer.optimize(optimizedPlanNode, analysis, null, null, 
mppQueryContext);
+    }
 
     // add exchange node for distributed plan
     PlanNode outputNodeWithExchange =
-        new AddExchangeNodes(mppQueryContext)
-            .addExchangeNodes(distributedPlanResult.get(0), planContext);
+        new 
AddExchangeNodes(mppQueryContext).addExchangeNodes(optimizedPlanNode, 
planContext);
     if (analysis.getStatement() instanceof Query) {
       analysis
           .getRespDatasetHeader()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index 9065ddaa4dc..08369519559 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 
@@ -119,6 +120,12 @@ public class TableModelTypeProviderExtractor {
       return null;
     }
 
+    @Override
+    public Void visitStreamSort(StreamSortNode node, Void context) {
+      node.getChild().accept(this, context);
+      return null;
+    }
+
     @Override
     public Void visitMergeSort(MergeSortNode node, Void context) {
       node.getChildren().forEach(c -> c.accept(this, context));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
index 2258e35ffeb..f75b45489cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
@@ -49,7 +49,7 @@ public class FilterNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new FilterNode(id, child, predicate);
+    return new FilterNode(id, null, predicate);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
index 03fa0b97ec0..f5f4b93a6ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
@@ -48,7 +48,7 @@ public class LimitNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new LimitNode(id, child, count, tiesResolvingScheme);
+    return new LimitNode(id, null, count, tiesResolvingScheme);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
index f08c3ae2dfc..174f564687e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
@@ -39,7 +39,7 @@ public class OffsetNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new OffsetNode(id, child, count);
+    return new OffsetNode(id, null, count);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
index 87f15146a11..c68d213001c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
@@ -61,7 +61,7 @@ public class OutputNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new OutputNode(id, child, columnNames, outputSymbols);
+    return new OutputNode(id, null, columnNames, outputSymbols);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
index 140951a0e97..dcf6f0aa7e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
@@ -48,7 +48,7 @@ public class ProjectNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new ProjectNode(id, child, assignments);
+    return new ProjectNode(id, null, assignments);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
index 64ecf359045..e90fb1c2112 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
@@ -42,7 +42,7 @@ public class SortNode extends SingleChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new SortNode(id, child, orderingScheme, partial);
+    return new SortNode(id, null, orderingScheme, partial);
   }
 
   @Override
@@ -85,6 +85,10 @@ public class SortNode extends SingleChildProcessNode {
     return orderingScheme;
   }
 
+  public boolean isPartial() {
+    return this.partial;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
index b0d419b0840..6f5b7582418 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
@@ -55,6 +55,11 @@ public class StreamSortNode extends SortNode {
     return visitor.visitStreamSort(this, context);
   }
 
+  @Override
+  public PlanNode clone() {
+    return new StreamSortNode(id, null, orderingScheme, partial, 
streamCompareKeyEndIndex);
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.TABLE_STREAM_SORT_NODE.serialize(byteBuffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/AddStreamSort.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/AddStreamSort.java
new file mode 100644
index 00000000000..91900dbefbb
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/AddStreamSort.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import java.util.Map;
+
+/**
+ * <b>Optimization phase:</b> Logical plan planning.
+ *
+ * <p>This optimize rule implement the rules below.
+ * <li>When the sort order is `IDColumns,Time` or `IDColumns,Others` in 
SortNode, SortNode can be
+ *     transformed to StreamSortNode.
+ */
+public class AddStreamSort implements TablePlanOptimizer {
+
+  @Override
+  public PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context) {
+    if (!analysis.hasSortNode()) {
+      return planNode;
+    }
+
+    return planNode.accept(new Rewriter(analysis, context), new Context());
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, Context> {
+    private final Analysis analysis;
+    private final MPPQueryContext queryContext;
+
+    public Rewriter(Analysis analysis, MPPQueryContext queryContext) {
+      this.analysis = analysis;
+      this.queryContext = queryContext;
+    }
+
+    @Override
+    public PlanNode visitPlan(PlanNode node, Context context) {
+      PlanNode newNode = node.clone();
+      for (PlanNode child : node.getChildren()) {
+        newNode.addChild(child.accept(this, context));
+      }
+      return newNode;
+    }
+
+    @Override
+    public PlanNode visitSort(SortNode node, Context context) {
+      PlanNode child = node.getChild().accept(this, context);
+      TableScanNode tableScanNode = context.getTableScanNode();
+      Map<Symbol, ColumnSchema> tableColumnSchema =
+          
analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName());
+
+      OrderingScheme orderingScheme = node.getOrderingScheme();
+      int streamSortIndex = 0;
+      for (Symbol orderBy : orderingScheme.getOrderBy()) {
+        if (!tableColumnSchema.containsKey(orderBy)
+            || tableColumnSchema.get(orderBy).getColumnCategory()
+                == TsTableColumnCategory.MEASUREMENT
+            || tableColumnSchema.get(orderBy).getColumnCategory() == 
TsTableColumnCategory.TIME) {
+          break;
+        } else {
+          streamSortIndex++;
+        }
+      }
+
+      if (streamSortIndex > 0) {
+        return new StreamSortNode(
+            queryContext.getQueryId().genPlanNodeId(),
+            child,
+            node.getOrderingScheme(),
+            node.isPartial(),
+            streamSortIndex);
+      }
+
+      return node;
+    }
+
+    @Override
+    public PlanNode visitTableScan(TableScanNode node, Context context) {
+      context.setTableScanNode(node);
+      return node;
+    }
+  }
+
+  private static class Context {
+    private TableScanNode tableScanNode;
+
+    public Context() {}
+
+    public TableScanNode getTableScanNode() {
+      return tableScanNode;
+    }
+
+    public void setTableScanNode(TableScanNode tableScanNode) {
+      this.tableScanNode = tableScanNode;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
index 4bf01ac1f72..efbf67ec127 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
@@ -170,6 +171,11 @@ public class LimitOffsetPushDown implements 
TablePlanOptimizer {
       return node;
     }
 
+    @Override
+    public PlanNode visitStreamSort(StreamSortNode node, Context context) {
+      return visitSort(node, context);
+    }
+
     @Override
     public PlanNode visitFilter(FilterNode node, Context context) {
       // If there is still a FilterNode here, it means that there are read 
filter conditions that
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java
index c67a7a59b60..e2640efca56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java
@@ -43,11 +43,11 @@ import java.util.Set;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
 
 /**
- * Remove unused columns in TableScanNode.
+ * <b>Optimization phase:</b> Logical plan planning.
  *
- * <p>For example, The output columns of TableScanNode in `select * from 
table1` query are `tag1,
- * attr1, s1`, but the output columns of TableScanNode in `select s1 from 
table1` query can only be
- * `s1`.
+ * <p>Remove unused columns in TableScanNode. For example, The output columns 
of TableScanNode in
+ * `select * from table1` query are `tag1, attr1, s1`, but the output columns 
of TableScanNode in
+ * `select s1 from table1` query can only be `s1`.
  */
 public class PruneUnUsedColumns implements TablePlanOptimizer {
 
@@ -58,10 +58,15 @@ public class PruneUnUsedColumns implements 
TablePlanOptimizer {
       Metadata metadata,
       SessionInfo sessionInfo,
       MPPQueryContext context) {
-    return planNode.accept(new Rewriter(), new RewriterContext());
+    return planNode.accept(new Rewriter(analysis), new RewriterContext());
   }
 
   private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> 
{
+    private final Analysis analysis;
+
+    public Rewriter(Analysis analysis) {
+      this.analysis = analysis;
+    }
 
     @Override
     public PlanNode visitPlan(PlanNode node, RewriterContext context) {
@@ -80,6 +85,7 @@ public class PruneUnUsedColumns implements TablePlanOptimizer 
{
 
     @Override
     public PlanNode visitSort(SortNode node, RewriterContext context) {
+      analysis.setSortNode(true);
       context.allUsedSymbolSet.addAll(node.getOutputSymbols());
       node.getChild().accept(this, context);
       return node;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
index 7e89c5e1f62..e9b7fb498a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
 import java.util.Collections;
 import java.util.List;
 
+/** <b>Optimization phase:</b> Logical plan planning. */
 public class RemoveRedundantIdentityProjections implements TablePlanOptimizer {
 
   @Override
@@ -36,13 +37,13 @@ public class RemoveRedundantIdentityProjections implements 
TablePlanOptimizer {
       Metadata metadata,
       SessionInfo sessionInfo,
       MPPQueryContext context) {
-    return planNode.accept(new Rewriter(), new RewriterContext());
+    return planNode.accept(new Rewriter(), new Context());
   }
 
-  private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> 
{
+  private static class Rewriter extends PlanVisitor<PlanNode, Context> {
 
     @Override
-    public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+    public PlanNode visitPlan(PlanNode node, Context context) {
       PlanNode newNode = node.clone();
       for (PlanNode child : node.getChildren()) {
         context.setParent(node);
@@ -52,7 +53,7 @@ public class RemoveRedundantIdentityProjections implements 
TablePlanOptimizer {
     }
 
     @Override
-    public PlanNode visitProject(ProjectNode projectNode, RewriterContext 
context) {
+    public PlanNode visitProject(ProjectNode projectNode, Context context) {
       // TODO change the impl using the method of context.getParent()
       if (projectNode.getChild() instanceof ProjectNode
           && 
projectNode.getOutputSymbols().equals(projectNode.getChild().getOutputSymbols()))
 {
@@ -77,16 +78,14 @@ public class RemoveRedundantIdentityProjections implements 
TablePlanOptimizer {
     }
 
     @Override
-    public PlanNode visitTableScan(TableScanNode node, RewriterContext 
context) {
+    public PlanNode visitTableScan(TableScanNode node, Context context) {
       return node;
     }
   }
 
-  private static class RewriterContext {
+  private static class Context {
     private PlanNode parent;
 
-    public RewriterContext() {}
-
     public PlanNode getParent() {
       return this.parent;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
new file mode 100644
index 00000000000..424f170e0d5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+
+/** <b>Optimization phase:</b> Distributed plan planning. */
+public class SortElimination implements TablePlanOptimizer {
+
+  @Override
+  public PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context) {
+    return null;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index aa539c1cf19..71330d8cf46 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -41,7 +41,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.AddStreamSort;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneUnUsedColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections;
@@ -87,7 +89,8 @@ public class SortTest {
           new SimplifyExpressions(),
           new PruneUnUsedColumns(),
           new PushPredicateIntoTableScan(),
-          new RemoveRedundantIdentityProjections());
+          new RemoveRedundantIdentityProjections(),
+          new AddStreamSort());
 
   /*
    * order by time, others, some_ids
@@ -851,9 +854,9 @@ public class SortTest {
     assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
     assertTrue(
         rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof SortNode);
-    SortNode sortNode =
-        (SortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+            instanceof StreamSortNode);
+    StreamSortNode sortNode =
+        (StreamSortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -903,9 +906,9 @@ public class SortTest {
             .map(Symbol::getName)
             .collect(Collectors.toList()));
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
+    sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -1028,9 +1031,9 @@ public class SortTest {
     assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
     assertTrue(
         rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof SortNode);
-    SortNode sortNode =
-        (SortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+            instanceof StreamSortNode);
+    StreamSortNode sortNode =
+        (StreamSortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -1082,7 +1085,7 @@ public class SortTest {
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
     assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
+    sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -1205,9 +1208,9 @@ public class SortTest {
     assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
     assertTrue(
         rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof SortNode);
-    SortNode sortNode =
-        (SortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+            instanceof StreamSortNode);
+    StreamSortNode sortNode =
+        (StreamSortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -1257,9 +1260,9 @@ public class SortTest {
             .map(Symbol::getName)
             .collect(Collectors.toList()));
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
+    sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -1296,7 +1299,7 @@ public class SortTest {
         distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
     assertTrue(
         
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
+            instanceof StreamSortNode);
     assertTrue(
         distributedQueryPlan
                 .getFragments()
@@ -1382,9 +1385,9 @@ public class SortTest {
     assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
     assertTrue(
         rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof SortNode);
-    SortNode sortNode =
-        (SortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+            instanceof StreamSortNode);
+    StreamSortNode sortNode =
+        (StreamSortNode) 
rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -1434,9 +1437,9 @@ public class SortTest {
             .map(Symbol::getName)
             .collect(Collectors.toList()));
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
+    sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
     assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
     assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
ProjectNode);
     assertTrue(
@@ -1473,7 +1476,7 @@ public class SortTest {
         distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
     assertTrue(
         
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
+            instanceof StreamSortNode);
     assertTrue(
         distributedQueryPlan
                 .getFragments()


Reply via email to