This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/sort_transform_elimate_topk
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4188b0edba9a9472dac77ef51d9608f2c7825aa6
Author: Beyyes <[email protected]>
AuthorDate: Tue Jul 16 15:11:38 2024 +0800

    add streamsort and sort elimination
---
 .../relational/planner/node/StreamSortNode.java    |  12 +
 .../analyzer/LimitOffsetPushDownTest.java          |   9 +-
 .../plan/relational/analyzer/SortTest.java         | 486 +++++----------------
 3 files changed, 133 insertions(+), 374 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
index 6f5b7582418..7ee9dd322fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java
@@ -26,11 +26,13 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 public class StreamSortNode extends SortNode {
 
@@ -50,6 +52,16 @@ public class StreamSortNode extends SortNode {
     return streamCompareKeyEndIndex;
   }
 
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    return new StreamSortNode(
+        id,
+        Iterables.getOnlyElement(newChildren),
+        orderingScheme,
+        partial,
+        streamCompareKeyEndIndex);
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitStreamSort(this, context);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
index 31b33a772b4..59490b86269 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan;
@@ -108,7 +109,7 @@ public class LimitOffsetPushDownTest {
   }
 
   // order by all tags, limit can be pushed into TableScan, 
pushLimitToEachDevice==false
-  // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan
+  // Output - Limit - Offset - Project - MergeSort -  Project - TableScan
   @Test
   public void orderByAllTagsTest() {
     sql =
@@ -128,9 +129,9 @@ public class LimitOffsetPushDownTest {
         (MergeSortNode)
             
getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 
5);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 2);
+    tableScanNode = (TableScanNode) 
getChildrenNode(mergeSortNode.getChildren().get(1), 1);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(DESC, tableScanNode.getScanOrder());
     assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
@@ -138,7 +139,7 @@ public class LimitOffsetPushDownTest {
 
     tableScanNode =
         (TableScanNode)
-            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
3);
+            
getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 
2);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(DESC, tableScanNode.getScanOrder());
     assertTrue(tableScanNode.getPushDownLimit() == 15 && 
tableScanNode.getPushDownOffset() == 0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
index 3286313881a..4501848c444 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java
@@ -79,6 +79,10 @@ public class SortTest {
   LogicalPlanner logicalPlanner;
   LogicalQueryPlan logicalQueryPlan;
   PlanNode rootNode;
+  OutputNode outputNode;
+  PlanNode mergeSortNode;
+  ProjectNode projectNode;
+  StreamSortNode streamSortNode;
   TableDistributionPlanner distributionPlanner;
   DistributedQueryPlan distributedQueryPlan;
   TableScanNode tableScanNode;
@@ -704,72 +708,40 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // LogicalPlan: `Output - Limit - Offset - Project - StreamSort - Project 
- Filter - TableScan`
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
     assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
     assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
     assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
     assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
-    StreamSortNode streamSortNode =
-        (StreamSortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(streamSortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(streamSortNode.getChildren().get(0).getChildren().get(0) 
instanceof FilterNode);
-    assertTrue(
-        
streamSortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof TableScanNode);
-    tableScanNode =
-        (TableScanNode)
-            
streamSortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // DistributePlan: `Output - Limit - Offset - Project - MergeSort - 
StreamSort - Project -
-    // Filter - TableScan`
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
     assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
     streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(streamSortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(streamSortNode.getChildren().get(0).getChildren().get(0) 
instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            
streamSortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -783,47 +755,12 @@ public class SortTest {
     assertEquals(DESC, tableScanNode.getScanOrder());
 
     // DistributePlan: `IdentitySink - StreamSort - Project - Filter - 
TableScan`
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    streamSortNode =
+        (StreamSortNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -846,70 +783,39 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - 
ProjectNode - FilterNode -
-    // TableScanNode
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
-    assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+    StreamSortNode streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 
4);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - 
SortNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    projectNode = (ProjectNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -922,48 +828,12 @@ public class SortTest {
             .collect(Collectors.toList()));
     assertEquals(DESC, tableScanNode.getScanOrder());
 
-    // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    // DistributePlan: `IdentitySink-Project-Filter-TableScan`
+    projectNode =
+        (ProjectNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -986,73 +856,41 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode  - 
ProjectNode - FilterNode -
-    // TableScanNode
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
-    assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+    streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+    assertEquals(2, streamSortNode.getStreamCompareKeyEndIndex());
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - 
SortNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    assertTrue(
-        sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof TableScanNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -1065,48 +903,13 @@ public class SortTest {
             .collect(Collectors.toList()));
     assertEquals(ASC, tableScanNode.getScanOrder());
 
-    // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    // DistributePlan: `IdentitySink - StreamSort - Project - Filter - 
TableScan`
+    streamSortNode =
+        (StreamSortNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
@@ -1129,70 +932,48 @@ public class SortTest {
             .plan(actualAnalysis);
     rootNode = logicalQueryPlan.getRootNode();
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - 
ProjectNode - FilterNode -
-    // TableScanNode
+    context = new MPPQueryContext(sql, queryId, sessionInfo, null, null);
+    actualAnalysis = analyzeSQL(sql, metadata);
+    logicalQueryPlan =
+        new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList, 
WarningCollector.NOOP)
+            .plan(actualAnalysis);
+    rootNode = logicalQueryPlan.getRootNode();
+
+    // LogicalPlan: 
`Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan`
     assertTrue(rootNode instanceof OutputNode);
-    assertTrue(rootNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    SortNode sortNode =
-        (SortNode)
-            rootNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode);
+    assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode);
+    streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4);
+    assertEquals(3, streamSortNode.getStreamCompareKeyEndIndex());
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals("testdb.table1", 
tableScanNode.getQualifiedObjectName().toString());
     assertEquals(8, tableScanNode.getAssignments().size());
     assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
 
-    // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - 
SortNode - ProjectNode -
-    // FilterNode -
-    // TableScanNode
+    // DistributePlan: 
`Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan`
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
     distributedQueryPlan = distributionPlanner.plan();
     assertEquals(3, distributedQueryPlan.getFragments().size());
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
-            instanceof OutputNode);
-    OutputNode outputNode =
+    outputNode =
         (OutputNode)
             
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
-    assertTrue(outputNode.getChildren().get(0) instanceof LimitNode);
-    assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof 
OffsetNode);
-    assertTrue(
-        
outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof ProjectNode);
-    MergeSortNode mergeSortNode =
-        (MergeSortNode)
-            outputNode
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode);
+    assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode);
+    assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 
4);
     assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
-    assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode);
     assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
-    sortNode = (SortNode) mergeSortNode.getChildren().get(1);
-    assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode);
-    assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
-    tableScanNode =
-        (TableScanNode) 
sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0);
+    streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(4, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList(
@@ -1205,48 +986,13 @@ public class SortTest {
             .collect(Collectors.toList()));
     assertEquals(ASC, tableScanNode.getScanOrder());
 
-    // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode - 
TableScanNode
-    assertTrue(
-        distributedQueryPlan.getFragments().get(1).getPlanNodeTree() 
instanceof IdentitySinkNode);
-    assertTrue(
-        
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0)
-            instanceof SortNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof ProjectNode);
-    assertTrue(
-        distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof FilterNode);
-    tableScanNode =
-        (TableScanNode)
-            distributedQueryPlan
-                .getFragments()
-                .get(1)
-                .getPlanNodeTree()
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0);
+    // DistributePlan: `IdentitySink - StreamSort - Project - Filter - 
TableScan`
+    streamSortNode =
+        (StreamSortNode)
+            
distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0);
+    assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode);
+    assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode);
+    tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3);
     assertEquals(2, tableScanNode.getDeviceEntries().size());
     assertEquals(
         Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),

Reply via email to