This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/sort_transform_elimate_topk in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4188b0edba9a9472dac77ef51d9608f2c7825aa6 Author: Beyyes <[email protected]> AuthorDate: Tue Jul 16 15:11:38 2024 +0800 add streamsort and sort elimination --- .../relational/planner/node/StreamSortNode.java | 12 + .../analyzer/LimitOffsetPushDownTest.java | 9 +- .../plan/relational/analyzer/SortTest.java | 486 +++++---------------- 3 files changed, 133 insertions(+), 374 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java index 6f5b7582418..7ee9dd322fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java @@ -26,11 +26,13 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import com.google.common.base.Objects; +import com.google.common.collect.Iterables; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; public class StreamSortNode extends SortNode { @@ -50,6 +52,16 @@ public class StreamSortNode extends SortNode { return streamCompareKeyEndIndex; } + @Override + public PlanNode replaceChildren(List<PlanNode> newChildren) { + return new StreamSortNode( + id, + Iterables.getOnlyElement(newChildren), + orderingScheme, + partial, + streamCompareKeyEndIndex); + } + @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitStreamSort(this, context); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java index 31b33a772b4..59490b86269 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/LimitOffsetPushDownTest.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan; @@ -108,7 +109,7 @@ public class LimitOffsetPushDownTest { } // order by all tags, limit can be pushed into TableScan, pushLimitToEachDevice==false - // Output - Limit - Offset - Project - MergeSort - Sort - Project - TableScan + // Output - Limit - Offset - Project - MergeSort - Project - TableScan @Test public void orderByAllTagsTest() { sql = @@ -128,9 +129,9 @@ public class LimitOffsetPushDownTest { (MergeSortNode) getChildrenNode(distributedQueryPlan.getFragments().get(0).getPlanNodeTree(), 5); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - tableScanNode = (TableScanNode) getChildrenNode(mergeSortNode.getChildren().get(1), 2); + tableScanNode = (TableScanNode) getChildrenNode(mergeSortNode.getChildren().get(1), 1); assertEquals(4, tableScanNode.getDeviceEntries().size()); assertEquals(DESC, tableScanNode.getScanOrder()); assertTrue(tableScanNode.getPushDownLimit() == 15 && tableScanNode.getPushDownOffset() == 0); @@ -138,7 +139,7 @@ public class LimitOffsetPushDownTest { tableScanNode = (TableScanNode) - getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 3); + getChildrenNode(distributedQueryPlan.getFragments().get(1).getPlanNodeTree(), 2); assertEquals(2, tableScanNode.getDeviceEntries().size()); assertEquals(DESC, tableScanNode.getScanOrder()); assertTrue(tableScanNode.getPushDownLimit() == 15 && tableScanNode.getPushDownOffset() == 0); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java index 3286313881a..4501848c444 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java @@ -79,6 +79,10 @@ public class SortTest { LogicalPlanner logicalPlanner; LogicalQueryPlan logicalQueryPlan; PlanNode rootNode; + OutputNode outputNode; + PlanNode mergeSortNode; + ProjectNode projectNode; + StreamSortNode streamSortNode; TableDistributionPlanner distributionPlanner; DistributedQueryPlan distributedQueryPlan; TableScanNode tableScanNode; @@ -704,72 +708,40 @@ public class SortTest { .plan(actualAnalysis); rootNode = logicalQueryPlan.getRootNode(); - // LogicalPlan: `Output - Limit - Offset - Project - StreamSort - Project - Filter - TableScan` + // LogicalPlan: `Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan` assertTrue(rootNode instanceof OutputNode); assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode); assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode); assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode); assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode); - StreamSortNode streamSortNode = - (StreamSortNode) - rootNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); - assertTrue(streamSortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(streamSortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - assertTrue( - streamSortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof TableScanNode); - tableScanNode = - (TableScanNode) - streamSortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals("testdb.table1", tableScanNode.getQualifiedObjectName().toString()); assertEquals(8, tableScanNode.getAssignments().size()); assertEquals(6, tableScanNode.getDeviceEntries().size()); assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size()); - // DistributePlan: `Output - Limit - Offset - Project - MergeSort - StreamSort - Project - - // Filter - TableScan` + // DistributePlan: `Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan` distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); distributedQueryPlan = distributionPlanner.plan(); assertEquals(3, distributedQueryPlan.getFragments().size()); - assertTrue( - distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0) - instanceof OutputNode); - OutputNode outputNode = + outputNode = (OutputNode) distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0); - assertTrue(outputNode.getChildren().get(0) instanceof LimitNode); - assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); - assertTrue( - outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof ProjectNode); - MergeSortNode mergeSortNode = - (MergeSortNode) - outputNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode); + assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode); + assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode); + MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 4); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1); - assertTrue(streamSortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(streamSortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - tableScanNode = - (TableScanNode) - streamSortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals(4, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList( @@ -783,47 +755,12 @@ public class SortTest { assertEquals(DESC, tableScanNode.getScanOrder()); // DistributePlan: `IdentitySink - StreamSort - Project - Filter - TableScan` - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree() instanceof IdentitySinkNode); - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0) - instanceof SortNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof ProjectNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof FilterNode); - tableScanNode = - (TableScanNode) - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + streamSortNode = + (StreamSortNode) + distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals(2, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"), @@ -846,70 +783,39 @@ public class SortTest { .plan(actualAnalysis); rootNode = logicalQueryPlan.getRootNode(); - // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - ProjectNode - FilterNode - - // TableScanNode + // LogicalPlan: `Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan` assertTrue(rootNode instanceof OutputNode); - assertTrue(rootNode.getChildren().get(0) instanceof LimitNode); - assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); - assertTrue( - rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof ProjectNode); - SortNode sortNode = - (SortNode) - rootNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); - assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - tableScanNode = - (TableScanNode) sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode); + assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode); + assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode); + assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode); + StreamSortNode streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals("testdb.table1", tableScanNode.getQualifiedObjectName().toString()); assertEquals(8, tableScanNode.getAssignments().size()); assertEquals(6, tableScanNode.getDeviceEntries().size()); assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size()); - // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - SortNode - ProjectNode - - // FilterNode - - // TableScanNode + // DistributePlan: `Output-Limit-Offset-Project-MergeSort-Project-Filter-TableScan` distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); distributedQueryPlan = distributionPlanner.plan(); assertEquals(3, distributedQueryPlan.getFragments().size()); - assertTrue( - distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0) - instanceof OutputNode); - OutputNode outputNode = + outputNode = (OutputNode) distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0); - assertTrue(outputNode.getChildren().get(0) instanceof LimitNode); - assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); - assertTrue( - outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof ProjectNode); - MergeSortNode mergeSortNode = - (MergeSortNode) - outputNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode); + assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode); + assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode); + MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 4); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof ProjectNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - sortNode = (SortNode) mergeSortNode.getChildren().get(1); - assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - tableScanNode = - (TableScanNode) sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + projectNode = (ProjectNode) mergeSortNode.getChildren().get(1); + assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2); assertEquals(4, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList( @@ -922,48 +828,12 @@ public class SortTest { .collect(Collectors.toList())); assertEquals(DESC, tableScanNode.getScanOrder()); - // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree() instanceof IdentitySinkNode); - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0) - instanceof SortNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof ProjectNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof FilterNode); - tableScanNode = - (TableScanNode) - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + // DistributePlan: `IdentitySink-Project-Filter-TableScan` + projectNode = + (ProjectNode) + distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0); + assertTrue(getChildrenNode(projectNode, 1) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(projectNode, 2); assertEquals(2, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"), @@ -986,73 +856,41 @@ public class SortTest { .plan(actualAnalysis); rootNode = logicalQueryPlan.getRootNode(); - // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - ProjectNode - FilterNode - - // TableScanNode + // LogicalPlan: `Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan` assertTrue(rootNode instanceof OutputNode); - assertTrue(rootNode.getChildren().get(0) instanceof LimitNode); - assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); - assertTrue( - rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof ProjectNode); - SortNode sortNode = - (SortNode) - rootNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); - assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - tableScanNode = - (TableScanNode) sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode); + assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode); + assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode); + assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode); + streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4); + assertEquals(2, streamSortNode.getStreamCompareKeyEndIndex()); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals("testdb.table1", tableScanNode.getQualifiedObjectName().toString()); assertEquals(8, tableScanNode.getAssignments().size()); assertEquals(6, tableScanNode.getDeviceEntries().size()); assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size()); - // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - SortNode - ProjectNode - - // FilterNode - - // TableScanNode + // DistributePlan: `Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan` distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); distributedQueryPlan = distributionPlanner.plan(); assertEquals(3, distributedQueryPlan.getFragments().size()); - assertTrue( - distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0) - instanceof OutputNode); - OutputNode outputNode = + outputNode = (OutputNode) distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0); - assertTrue(outputNode.getChildren().get(0) instanceof LimitNode); - assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); - assertTrue( - outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof ProjectNode); - MergeSortNode mergeSortNode = - (MergeSortNode) - outputNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode); + assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode); + assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode); + MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 4); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - sortNode = (SortNode) mergeSortNode.getChildren().get(1); - assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - assertTrue( - sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof TableScanNode); - tableScanNode = - (TableScanNode) sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals(4, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList( @@ -1065,48 +903,13 @@ public class SortTest { .collect(Collectors.toList())); assertEquals(ASC, tableScanNode.getScanOrder()); - // IdentitySinkNode - SortNode - ProjectNode - FilterNode - TableScanNode - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree() instanceof IdentitySinkNode); - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0) - instanceof SortNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof ProjectNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof FilterNode); - tableScanNode = - (TableScanNode) - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + // DistributePlan: `IdentitySink - StreamSort - Project - Filter - TableScan` + streamSortNode = + (StreamSortNode) + distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals(2, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"), @@ -1129,70 +932,48 @@ public class SortTest { .plan(actualAnalysis); rootNode = logicalQueryPlan.getRootNode(); - // OutputNode - LimitNode - OffsetNode - ProjectNode - SortNode - ProjectNode - FilterNode - - // TableScanNode + context = new MPPQueryContext(sql, queryId, sessionInfo, null, null); + actualAnalysis = analyzeSQL(sql, metadata); + logicalQueryPlan = + new LogicalPlanner(context, metadata, sessionInfo, planOptimizerList, WarningCollector.NOOP) + .plan(actualAnalysis); + rootNode = logicalQueryPlan.getRootNode(); + + // LogicalPlan: `Output-Limit-Offset-Project-StreamSort-Project-Filter-TableScan` assertTrue(rootNode instanceof OutputNode); - assertTrue(rootNode.getChildren().get(0) instanceof LimitNode); - assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); - assertTrue( - rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof ProjectNode); - SortNode sortNode = - (SortNode) - rootNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); - assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - tableScanNode = - (TableScanNode) sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + assertTrue(getChildrenNode(rootNode, 1) instanceof LimitNode); + assertTrue(getChildrenNode(rootNode, 2) instanceof OffsetNode); + assertTrue(getChildrenNode(rootNode, 3) instanceof ProjectNode); + assertTrue(getChildrenNode(rootNode, 4) instanceof StreamSortNode); + streamSortNode = (StreamSortNode) getChildrenNode(rootNode, 4); + assertEquals(3, streamSortNode.getStreamCompareKeyEndIndex()); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + assertTrue(getChildrenNode(streamSortNode, 3) instanceof TableScanNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals("testdb.table1", tableScanNode.getQualifiedObjectName().toString()); assertEquals(8, tableScanNode.getAssignments().size()); assertEquals(6, tableScanNode.getDeviceEntries().size()); assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size()); - // OutputNode - LimitNode - OffsetNode - ProjectNode - MergeSortNode - SortNode - ProjectNode - - // FilterNode - - // TableScanNode + // DistributePlan: `Output-Limit-Offset-Project-MergeSort-StreamSort-Project-Filter-TableScan` distributionPlanner = new TableDistributionPlanner(actualAnalysis, logicalQueryPlan, context); distributedQueryPlan = distributionPlanner.plan(); assertEquals(3, distributedQueryPlan.getFragments().size()); - assertTrue( - distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0) - instanceof OutputNode); - OutputNode outputNode = + outputNode = (OutputNode) distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0); - assertTrue(outputNode.getChildren().get(0) instanceof LimitNode); - assertTrue(outputNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); - assertTrue( - outputNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof ProjectNode); - MergeSortNode mergeSortNode = - (MergeSortNode) - outputNode - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + assertTrue(getChildrenNode(outputNode, 1) instanceof LimitNode); + assertTrue(getChildrenNode(outputNode, 2) instanceof OffsetNode); + assertTrue(getChildrenNode(outputNode, 3) instanceof ProjectNode); + MergeSortNode mergeSortNode = (MergeSortNode) getChildrenNode(outputNode, 4); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - sortNode = (SortNode) mergeSortNode.getChildren().get(1); - assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); - assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof FilterNode); - tableScanNode = - (TableScanNode) sortNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + streamSortNode = (StreamSortNode) mergeSortNode.getChildren().get(1); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals(4, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList( @@ -1205,48 +986,13 @@ public class SortTest { .collect(Collectors.toList())); assertEquals(ASC, tableScanNode.getScanOrder()); - // IdentitySinkNode - SortNode - ProjectNode - ProjectNode - FilterNode - TableScanNode - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree() instanceof IdentitySinkNode); - assertTrue( - distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0) - instanceof SortNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof ProjectNode); - assertTrue( - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - instanceof FilterNode); - tableScanNode = - (TableScanNode) - distributedQueryPlan - .getFragments() - .get(1) - .getPlanNodeTree() - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0) - .getChildren() - .get(0); + // DistributePlan: `IdentitySink - StreamSort - Project - Filter - TableScan` + streamSortNode = + (StreamSortNode) + distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0); + assertTrue(getChildrenNode(streamSortNode, 1) instanceof ProjectNode); + assertTrue(getChildrenNode(streamSortNode, 2) instanceof FilterNode); + tableScanNode = (TableScanNode) getChildrenNode(streamSortNode, 3); assertEquals(2, tableScanNode.getDeviceEntries().size()); assertEquals( Arrays.asList("table1.shenzhen.B2.ZZ", "table1.shenzhen.B1.XX"),
