This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/sort_transform_elimate_topk in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15ae2f7249f6e34a35f023be0ce740502233382a Author: Beyyes <[email protected]> AuthorDate: Thu Jul 11 23:44:19 2024 +0800 add streamsort impl --- .../plan/relational/analyzer/Analysis.java | 15 ++- .../plan/relational/planner/LogicalPlanner.java | 10 +- .../distribute/DistributedPlanGenerator.java | 32 +++++ .../distribute/TableDistributionPlanner.java | 13 +- .../TableModelTypeProviderExtractor.java | 7 ++ .../plan/relational/planner/node/FilterNode.java | 2 +- .../plan/relational/planner/node/LimitNode.java | 2 +- .../plan/relational/planner/node/OffsetNode.java | 2 +- .../plan/relational/planner/node/OutputNode.java | 2 +- .../plan/relational/planner/node/ProjectNode.java | 2 +- .../plan/relational/planner/node/SortNode.java | 6 +- .../relational/planner/node/StreamSortNode.java | 5 + .../planner/optimizations/AddStreamSort.java | 131 +++++++++++++++++++++ .../planner/optimizations/LimitOffsetPushDown.java | 6 + .../planner/optimizations/PruneUnUsedColumns.java | 16 ++- .../RemoveRedundantIdentityProjections.java | 15 ++- .../planner/optimizations/SortElimination.java | 35 ++++++ .../plan/relational/analyzer/SortTest.java | 47 ++++---- 18 files changed, 294 insertions(+), 54 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index f913ddbf98e..095248a7391 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -169,9 +169,12 @@ public class Analysis implements IAnalysis { private boolean finishQueryAfterAnalyze; - // indicate is there a value filter + // indicate if value filter exists in query private boolean hasValueFilter = false; + // indicate if sort node exists in query + private boolean hasSortNode = false; + // if emptyDataSource, there is no need to execute the query in BE private boolean emptyDataSource = false; @@ -585,10 +588,18 @@ public class Analysis implements IAnalysis { return hasValueFilter; } - public void setHasValueFilter(boolean hasValueFilter) { + public void setValueFilter(boolean hasValueFilter) { this.hasValueFilter = hasValueFilter; } + public boolean hasSortNode() { + return hasSortNode; + } + + public void setSortNode(boolean hasSortNode) { + this.hasSortNode = hasSortNode; + } + public boolean isEmptyDataSource() { return emptyDataSource; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java index ac17c3ff500..8cbab03fa81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.AddStreamSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneUnUsedColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections; @@ -85,7 +86,8 @@ public class LogicalPlanner { new SimplifyExpressions(), new PruneUnUsedColumns(), new PushPredicateIntoTableScan(), - new RemoveRedundantIdentityProjections()); + new RemoveRedundantIdentityProjections(), + new AddStreamSort()); } public LogicalPlanner( @@ -103,9 +105,9 @@ public class LogicalPlanner { public LogicalQueryPlan plan(Analysis analysis) { PlanNode planNode = planStatement(analysis, analysis.getStatement()); - - tablePlanOptimizers.forEach( - optimizer -> optimizer.optimize(planNode, analysis, metadata, sessionInfo, context)); + for (TablePlanOptimizer optimizer : tablePlanOptimizers) { + planNode = optimizer.optimize(planNode, analysis, metadata, sessionInfo, context); + } return new LogicalQueryPlan(context, planNode); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java index 61543eda7d1..f37c384e4d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -218,6 +219,37 @@ public class DistributedPlanGenerator return Collections.singletonList(mergeSortNode); } + @Override + public List<PlanNode> visitStreamSort(StreamSortNode node, PlanContext context) { + context.expectedOrderingScheme = node.getOrderingScheme(); + context.hasSortNode = true; + nodeOrderingMap.put(node.getPlanNodeId(), node.getOrderingScheme()); + + List<PlanNode> childrenNodes = node.getChild().accept(this, context); + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } + + // may have ProjectNode above SortNode later, so use MergeSortNode but not return SortNode list + MergeSortNode mergeSortNode = + new MergeSortNode( + queryId.genPlanNodeId(), node.getOrderingScheme(), node.getOutputSymbols()); + for (PlanNode child : childrenNodes) { + StreamSortNode subSortNode = + new StreamSortNode( + queryId.genPlanNodeId(), + child, + node.getOrderingScheme(), + false, + node.getStreamCompareKeyEndIndex()); + mergeSortNode.addChild(subSortNode); + } + nodeOrderingMap.put(mergeSortNode.getPlanNodeId(), mergeSortNode.getOrderingScheme()); + + return Collections.singletonList(mergeSortNode); + } + @Override public List<PlanNode> visitFilter(FilterNode node, PlanContext context) { List<PlanNode> childrenNodes = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java index b47eb51db94..107eff8c6a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java @@ -63,16 +63,15 @@ public class TableDistributionPlanner { .genResult(logicalQueryPlan.getRootNode(), planContext); checkArgument(distributedPlanResult.size() == 1, "Root node must return only one"); - // distribute plan optimize rule - this.optimizers.forEach( - optimizer -> - optimizer.optimize( - distributedPlanResult.get(0), analysis, null, null, mppQueryContext)); + PlanNode optimizedPlanNode = distributedPlanResult.get(0); + for (TablePlanOptimizer optimizer : optimizers) { + optimizedPlanNode = + optimizer.optimize(optimizedPlanNode, analysis, null, null, mppQueryContext); + } // add exchange node for distributed plan PlanNode outputNodeWithExchange = - new AddExchangeNodes(mppQueryContext) - .addExchangeNodes(distributedPlanResult.get(0), planContext); + new AddExchangeNodes(mppQueryContext).addExchangeNodes(optimizedPlanNode, planContext); if (analysis.getStatement() instanceof Query) { analysis .getRespDatasetHeader() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java index 9065ddaa4dc..08369519559 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; @@ -119,6 +120,12 @@ public class TableModelTypeProviderExtractor { return null; } + @Override + public Void visitStreamSort(StreamSortNode node, Void context) { + node.getChild().accept(this, context); + return null; + } + @Override public Void visitMergeSort(MergeSortNode node, Void context) { node.getChildren().forEach(c -> c.accept(this, context)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java index 2258e35ffeb..f75b45489cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java @@ -49,7 +49,7 @@ public class FilterNode extends SingleChildProcessNode { @Override public PlanNode clone() { - return new FilterNode(id, child, predicate); + return new FilterNode(id, null, predicate); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java index 03fa0b97ec0..f5f4b93a6ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java @@ -48,7 +48,7 @@ public class LimitNode extends SingleChildProcessNode { @Override public PlanNode clone() { - return new LimitNode(id, child, count, tiesResolvingScheme); + return new LimitNode(id, null, count, tiesResolvingScheme); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java index f08c3ae2dfc..174f564687e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java @@ -39,7 +39,7 @@ public class OffsetNode extends SingleChildProcessNode { @Override public PlanNode clone() { - return new OffsetNode(id, child, count); + return new OffsetNode(id, null, count); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java index 87f15146a11..c68d213001c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java @@ -61,7 +61,7 @@ public class OutputNode extends SingleChildProcessNode { @Override public PlanNode clone() { - return new OutputNode(id, child, columnNames, outputSymbols); + return new OutputNode(id, null, columnNames, outputSymbols); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java index 140951a0e97..dcf6f0aa7e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java @@ -48,7 +48,7 @@ public class ProjectNode extends SingleChildProcessNode { @Override public PlanNode clone() { - return new ProjectNode(id, child, assignments); + return new ProjectNode(id, null, assignments); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java index 64ecf359045..e90fb1c2112 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java @@ -42,7 +42,7 @@ public class SortNode extends SingleChildProcessNode { @Override public PlanNode clone() { - return new SortNode(id, child, orderingScheme, partial); + return new SortNode(id, null, orderingScheme, partial); } @Override @@ -85,6 +85,10 @@ public class SortNode extends SingleChildProcessNode { return orderingScheme; } + public boolean isPartial() { + return this.partial; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java index b0d419b0840..6f5b7582418 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/StreamSortNode.java @@ -55,6 +55,11 @@ public class StreamSortNode extends SortNode { return visitor.visitStreamSort(this, context); } + @Override + public PlanNode clone() { + return new StreamSortNode(id, null, orderingScheme, partial, streamCompareKeyEndIndex); + } + @Override protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.TABLE_STREAM_SORT_NODE.serialize(byteBuffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/AddStreamSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/AddStreamSort.java new file mode 100644 index 00000000000..91900dbefbb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/AddStreamSort.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; + +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; + +import java.util.Map; + +/** + * <b>Optimization phase:</b> Logical plan planning. + * + * <p>This optimize rule implement the rules below. + * <li>When the sort order is `IDColumns,Time` or `IDColumns,Others` in SortNode, SortNode can be + * transformed to StreamSortNode. + */ +public class AddStreamSort implements TablePlanOptimizer { + + @Override + public PlanNode optimize( + PlanNode planNode, + Analysis analysis, + Metadata metadata, + SessionInfo sessionInfo, + MPPQueryContext context) { + if (!analysis.hasSortNode()) { + return planNode; + } + + return planNode.accept(new Rewriter(analysis, context), new Context()); + } + + private static class Rewriter extends PlanVisitor<PlanNode, Context> { + private final Analysis analysis; + private final MPPQueryContext queryContext; + + public Rewriter(Analysis analysis, MPPQueryContext queryContext) { + this.analysis = analysis; + this.queryContext = queryContext; + } + + @Override + public PlanNode visitPlan(PlanNode node, Context context) { + PlanNode newNode = node.clone(); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, context)); + } + return newNode; + } + + @Override + public PlanNode visitSort(SortNode node, Context context) { + PlanNode child = node.getChild().accept(this, context); + TableScanNode tableScanNode = context.getTableScanNode(); + Map<Symbol, ColumnSchema> tableColumnSchema = + analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName()); + + OrderingScheme orderingScheme = node.getOrderingScheme(); + int streamSortIndex = 0; + for (Symbol orderBy : orderingScheme.getOrderBy()) { + if (!tableColumnSchema.containsKey(orderBy) + || tableColumnSchema.get(orderBy).getColumnCategory() + == TsTableColumnCategory.MEASUREMENT + || tableColumnSchema.get(orderBy).getColumnCategory() == TsTableColumnCategory.TIME) { + break; + } else { + streamSortIndex++; + } + } + + if (streamSortIndex > 0) { + return new StreamSortNode( + queryContext.getQueryId().genPlanNodeId(), + child, + node.getOrderingScheme(), + node.isPartial(), + streamSortIndex); + } + + return node; + } + + @Override + public PlanNode visitTableScan(TableScanNode node, Context context) { + context.setTableScanNode(node); + return node; + } + } + + private static class Context { + private TableScanNode tableScanNode; + + public Context() {} + + public TableScanNode getTableScanNode() { + return tableScanNode; + } + + public void setTableScanNode(TableScanNode tableScanNode) { + this.tableScanNode = tableScanNode; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java index 4bf01ac1f72..efbf67ec127 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LimitOffsetPushDown.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query; @@ -170,6 +171,11 @@ public class LimitOffsetPushDown implements TablePlanOptimizer { return node; } + @Override + public PlanNode visitStreamSort(StreamSortNode node, Context context) { + return visitSort(node, context); + } + @Override public PlanNode visitFilter(FilterNode node, Context context) { // If there is still a FilterNode here, it means that there are read filter conditions that diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java index c67a7a59b60..e2640efca56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneUnUsedColumns.java @@ -43,11 +43,11 @@ import java.util.Set; import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME; /** - * Remove unused columns in TableScanNode. + * <b>Optimization phase:</b> Logical plan planning. * - * <p>For example, The output columns of TableScanNode in `select * from table1` query are `tag1, - * attr1, s1`, but the output columns of TableScanNode in `select s1 from table1` query can only be - * `s1`. + * <p>Remove unused columns in TableScanNode. For example, The output columns of TableScanNode in + * `select * from table1` query are `tag1, attr1, s1`, but the output columns of TableScanNode in + * `select s1 from table1` query can only be `s1`. */ public class PruneUnUsedColumns implements TablePlanOptimizer { @@ -58,10 +58,15 @@ public class PruneUnUsedColumns implements TablePlanOptimizer { Metadata metadata, SessionInfo sessionInfo, MPPQueryContext context) { - return planNode.accept(new Rewriter(), new RewriterContext()); + return planNode.accept(new Rewriter(analysis), new RewriterContext()); } private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> { + private final Analysis analysis; + + public Rewriter(Analysis analysis) { + this.analysis = analysis; + } @Override public PlanNode visitPlan(PlanNode node, RewriterContext context) { @@ -80,6 +85,7 @@ public class PruneUnUsedColumns implements TablePlanOptimizer { @Override public PlanNode visitSort(SortNode node, RewriterContext context) { + analysis.setSortNode(true); context.allUsedSymbolSet.addAll(node.getOutputSymbols()); node.getChild().accept(this, context); return node; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java index 7e89c5e1f62..e9b7fb498a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod import java.util.Collections; import java.util.List; +/** <b>Optimization phase:</b> Logical plan planning. */ public class RemoveRedundantIdentityProjections implements TablePlanOptimizer { @Override @@ -36,13 +37,13 @@ public class RemoveRedundantIdentityProjections implements TablePlanOptimizer { Metadata metadata, SessionInfo sessionInfo, MPPQueryContext context) { - return planNode.accept(new Rewriter(), new RewriterContext()); + return planNode.accept(new Rewriter(), new Context()); } - private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> { + private static class Rewriter extends PlanVisitor<PlanNode, Context> { @Override - public PlanNode visitPlan(PlanNode node, RewriterContext context) { + public PlanNode visitPlan(PlanNode node, Context context) { PlanNode newNode = node.clone(); for (PlanNode child : node.getChildren()) { context.setParent(node); @@ -52,7 +53,7 @@ public class RemoveRedundantIdentityProjections implements TablePlanOptimizer { } @Override - public PlanNode visitProject(ProjectNode projectNode, RewriterContext context) { + public PlanNode visitProject(ProjectNode projectNode, Context context) { // TODO change the impl using the method of context.getParent() if (projectNode.getChild() instanceof ProjectNode && projectNode.getOutputSymbols().equals(projectNode.getChild().getOutputSymbols())) { @@ -77,16 +78,14 @@ public class RemoveRedundantIdentityProjections implements TablePlanOptimizer { } @Override - public PlanNode visitTableScan(TableScanNode node, RewriterContext context) { + public PlanNode visitTableScan(TableScanNode node, Context context) { return node; } } - private static class RewriterContext { + private static class Context { private PlanNode parent; - public RewriterContext() {} - public PlanNode getParent() { return this.parent; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java new file mode 100644 index 00000000000..424f170e0d5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; + +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; + +/** <b>Optimization phase:</b> Distributed plan planning. */ +public class SortElimination implements TablePlanOptimizer { + + @Override + public PlanNode optimize( + PlanNode planNode, + Analysis analysis, + Metadata metadata, + SessionInfo sessionInfo, + MPPQueryContext context) { + return null; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java index aa539c1cf19..71330d8cf46 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/SortTest.java @@ -41,7 +41,9 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.AddStreamSort; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneUnUsedColumns; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections; @@ -87,7 +89,8 @@ public class SortTest { new SimplifyExpressions(), new PruneUnUsedColumns(), new PushPredicateIntoTableScan(), - new RemoveRedundantIdentityProjections()); + new RemoveRedundantIdentityProjections(), + new AddStreamSort()); /* * order by time, others, some_ids @@ -851,9 +854,9 @@ public class SortTest { assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); assertTrue( rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof SortNode); - SortNode sortNode = - (SortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + instanceof StreamSortNode); + StreamSortNode sortNode = + (StreamSortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -903,9 +906,9 @@ public class SortTest { .map(Symbol::getName) .collect(Collectors.toList())); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - sortNode = (SortNode) mergeSortNode.getChildren().get(1); + sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -1028,9 +1031,9 @@ public class SortTest { assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); assertTrue( rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof SortNode); - SortNode sortNode = - (SortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + instanceof StreamSortNode); + StreamSortNode sortNode = + (StreamSortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -1082,7 +1085,7 @@ public class SortTest { assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - sortNode = (SortNode) mergeSortNode.getChildren().get(1); + sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -1205,9 +1208,9 @@ public class SortTest { assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); assertTrue( rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof SortNode); - SortNode sortNode = - (SortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + instanceof StreamSortNode); + StreamSortNode sortNode = + (StreamSortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -1257,9 +1260,9 @@ public class SortTest { .map(Symbol::getName) .collect(Collectors.toList())); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - sortNode = (SortNode) mergeSortNode.getChildren().get(1); + sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -1296,7 +1299,7 @@ public class SortTest { distributedQueryPlan.getFragments().get(1).getPlanNodeTree() instanceof IdentitySinkNode); assertTrue( distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0) - instanceof SortNode); + instanceof StreamSortNode); assertTrue( distributedQueryPlan .getFragments() @@ -1382,9 +1385,9 @@ public class SortTest { assertTrue(rootNode.getChildren().get(0).getChildren().get(0) instanceof OffsetNode); assertTrue( rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0) - instanceof SortNode); - SortNode sortNode = - (SortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); + instanceof StreamSortNode); + StreamSortNode sortNode = + (StreamSortNode) rootNode.getChildren().get(0).getChildren().get(0).getChildren().get(0); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -1434,9 +1437,9 @@ public class SortTest { .map(Symbol::getName) .collect(Collectors.toList())); assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode); - assertTrue(mergeSortNode.getChildren().get(1) instanceof SortNode); + assertTrue(mergeSortNode.getChildren().get(1) instanceof StreamSortNode); assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode); - sortNode = (SortNode) mergeSortNode.getChildren().get(1); + sortNode = (StreamSortNode) mergeSortNode.getChildren().get(1); assertTrue(sortNode.getChildren().get(0) instanceof ProjectNode); assertTrue(sortNode.getChildren().get(0).getChildren().get(0) instanceof ProjectNode); assertTrue( @@ -1473,7 +1476,7 @@ public class SortTest { distributedQueryPlan.getFragments().get(1).getPlanNodeTree() instanceof IdentitySinkNode); assertTrue( distributedQueryPlan.getFragments().get(1).getPlanNodeTree().getChildren().get(0) - instanceof SortNode); + instanceof StreamSortNode); assertTrue( distributedQueryPlan .getFragments()
