This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/refactorColumnInjectionPushDown in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 48cc9f0a9e2f0ff70caecc116a1ed9448ab8edf4 Author: Minghui Liu <[email protected]> AuthorDate: Mon Dec 18 12:07:20 2023 +0800 refactor ColumnInjectionPushDown --- .../plan/optimization/ColumnInjectionPushDown.java | 150 +++++++++++++++++++++ .../plan/optimization/PlanNodePushDown.java | 76 ----------- .../optimization/base/ColumnInjectionPushDown.java | 24 ---- .../planner/distribution/DistributionPlanner.java | 12 +- .../plan/planner/plan/node/PlanVisitor.java | 11 +- .../planner/plan/node/process/AggregationNode.java | 4 +- .../plan/node/process/ColumnInjectNode.java | 22 +++ .../node/process/SlidingWindowAggregationNode.java | 5 +- .../node/source/SeriesAggregationSourceNode.java | 11 +- 9 files changed, 192 insertions(+), 123 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java new file mode 100644 index 00000000000..475ce1c34d4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.optimization; + +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; + +/** <b>Optimization phase:</b> Distributed plan planning */ +public class ColumnInjectionPushDown implements PlanOptimizer { + + @Override + public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) { + if (analysis.getStatement().getType() != StatementType.QUERY) { + return plan; + } + QueryStatement queryStatement = (QueryStatement) analysis.getStatement(); + if (queryStatement.isGroupByTime() && queryStatement.isOutputEndTime()) { + // When the aggregation with GROUP BY TIME isn't rawDataQuery, there are AggregationNode and + // SeriesAggregationNode, + // If it is and has overlap in groupByParameter, there is SlidingWindowNode + // There will be a ColumnInjectNode on them, so we need to check if it can be pushed down. + return plan.accept(new Rewriter(), new RewriterContext()); + } + return plan; + } + + private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> { + + @Override + public PlanNode visitPlan(PlanNode node, RewriterContext context) { + for (PlanNode child : node.getChildren()) { + context.setParent(node); + child.accept(this, context); + } + return node; + } + + @Override + public PlanNode visitColumnInject(ColumnInjectNode node, RewriterContext context) { + PlanNode parent = context.getParent(); + + context.setParent(node); + context.setMeetColumnInject(true); + node.getChild().accept(this, context); + if (context.columnInjectPushDown()) { + return concatParentWithChild(parent, node.getChild()); + } + return node; + } + + private PlanNode concatParentWithChild(PlanNode parent, PlanNode child) { + if (parent != null) { + ((SingleChildProcessNode) parent).setChild(child); + return parent; + } else { + return child; + } + } + + @Override + public PlanNode visitSeriesAggregationSourceNode( + SeriesAggregationSourceNode node, RewriterContext context) { + if (context.meetColumnInject()) { + node.setOutputEndTime(true); + context.setColumnInjectPushDown(true); + } + // meet leaf node, stop visiting + return node; + } + + @Override + public PlanNode visitSlidingWindowAggregation( + SlidingWindowAggregationNode node, RewriterContext context) { + if (context.meetColumnInject()) { + node.setOutputEndTime(true); + context.setColumnInjectPushDown(true); + } + // stop visiting its child + return node; + } + + @Override + public PlanNode visitAggregation(AggregationNode node, RewriterContext context) { + if (context.meetColumnInject()) { + node.setOutputEndTime(true); + context.setColumnInjectPushDown(true); + } + // stop visiting its children + return node; + } + } + + private static class RewriterContext { + + private PlanNode parent; + + private boolean meetColumnInject = false; + private boolean columnInjectPushDown = false; + + public PlanNode getParent() { + return parent; + } + + public void setParent(PlanNode parent) { + this.parent = parent; + } + + public boolean meetColumnInject() { + return meetColumnInject; + } + + public void setMeetColumnInject(boolean meetColumnInject) { + this.meetColumnInject = meetColumnInject; + } + + public boolean columnInjectPushDown() { + return columnInjectPushDown; + } + + public void setColumnInjectPushDown(boolean columnInjectPushDown) { + this.columnInjectPushDown = columnInjectPushDown; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PlanNodePushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PlanNodePushDown.java deleted file mode 100644 index fb16d13d65e..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PlanNodePushDown.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.optimization; - -import org.apache.iotdb.db.queryengine.common.MPPQueryContext; -import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; -import org.apache.iotdb.db.queryengine.plan.optimization.base.ColumnInjectionPushDown; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; -import org.apache.iotdb.db.queryengine.plan.statement.StatementType; -import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; - -import java.util.ArrayList; -import java.util.List; - -public class PlanNodePushDown implements PlanOptimizer { - @Override - public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) { - if (analysis.getStatement().getType() != StatementType.QUERY) { - return plan; - } - QueryStatement queryStatement = (QueryStatement) analysis.getStatement(); - if (queryStatement.isGroupByTime() && queryStatement.isOutputEndTime()) { - // When the aggregation with GROUP BY TIME isn't rawDataQuery, there are AggregationNode and - // SeriesAggregationNode, - // If it is and has overlap in groupByParameter, there is SlidingWindowNode - // There will be a ColumnInjectNode on them, so we need to check if it can be pushed down. - return plan.accept(new NodePushDownVisitor(), new NodePushDownRewriter()); - } - return plan; - } - - private static class NodePushDownVisitor extends PlanVisitor<PlanNode, NodePushDownRewriter> { - @Override - public PlanNode visitPlan(PlanNode node, NodePushDownRewriter context) { - List<PlanNode> children = new ArrayList<>(); - for (PlanNode child : node.getChildren()) { - children.add(child.accept(this, context)); - } - return node.cloneWithChildren(children); - } - - @Override - public PlanNode visitColumnInject(ColumnInjectNode node, NodePushDownRewriter context) { - PlanNode child = node.getChild(); - child.accept(this, context); - - if (child instanceof ColumnInjectionPushDown) { - ((ColumnInjectionPushDown) child).setOutputEndTime(true); - return child; - } - - return node; - } - } - - private static class NodePushDownRewriter {} -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/base/ColumnInjectionPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/base/ColumnInjectionPushDown.java deleted file mode 100644 index a9b693bcf5e..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/base/ColumnInjectionPushDown.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.optimization.base; - -public interface ColumnInjectionPushDown { - void setOutputEndTime(boolean outputEndTime); -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java index 64f5f693bae..a0ba04cba5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java @@ -24,8 +24,8 @@ import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.optimization.ColumnInjectionPushDown; import org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown; -import org.apache.iotdb.db.queryengine.plan.optimization.PlanNodePushDown; import org.apache.iotdb.db.queryengine.plan.optimization.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; @@ -45,7 +45,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.commons.lang3.Validate; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -67,13 +67,7 @@ public class DistributionPlanner { this.logicalPlan = logicalPlan; this.context = logicalPlan.getContext(); - this.optimizers = - new ArrayList<PlanOptimizer>() { - { - add(new LimitOffsetPushDown()); - add(new PlanNodePushDown()); - } - }; + this.optimizers = Arrays.asList(new LimitOffsetPushDown(), new ColumnInjectionPushDown()); } public PlanNode rewriteSource() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 104242486d7..10bde9ccf6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -92,6 +92,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeri import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; @@ -124,16 +125,20 @@ public abstract class PlanVisitor<R, C> { return visitSourceNode(node, context); } - public R visitSeriesAggregationScan(SeriesAggregationScanNode node, C context) { + public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) { return visitSourceNode(node, context); } - public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) { + public R visitSeriesAggregationSourceNode(SeriesAggregationSourceNode node, C context) { return visitSourceNode(node, context); } + public R visitSeriesAggregationScan(SeriesAggregationScanNode node, C context) { + return visitSeriesAggregationSourceNode(node, context); + } + public R visitAlignedSeriesAggregationScan(AlignedSeriesAggregationScanNode node, C context) { - return visitSourceNode(node, context); + return visitSeriesAggregationSourceNode(node, context); } public R visitLastQueryScan(LastQueryScanNode node, C context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java index d6fb7972dd2..40a171b8622 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationNode.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.plan.expression.Expression; -import org.apache.iotdb.db.queryengine.plan.optimization.base.ColumnInjectionPushDown; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -51,7 +50,7 @@ import java.util.stream.Collectors; * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the * final series aggregated result represented by TsBlock. */ -public class AggregationNode extends MultiChildProcessNode implements ColumnInjectionPushDown { +public class AggregationNode extends MultiChildProcessNode { // The list of aggregate functions, each AggregateDescriptor will be output as one or two column // of result TsBlock @@ -153,7 +152,6 @@ public class AggregationNode extends MultiChildProcessNode implements ColumnInje return outputEndTime; } - @Override public void setOutputEndTime(boolean outputEndTime) { this.outputEndTime = outputEndTime; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ColumnInjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ColumnInjectNode.java index 9563b9e5e72..826b4f2440f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ColumnInjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ColumnInjectNode.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Objects; public class ColumnInjectNode extends SingleChildProcessNode { @@ -120,4 +121,25 @@ public class ColumnInjectNode extends SingleChildProcessNode { PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new ColumnInjectNode(planNodeId, targetIndex, columnGeneratorParameter); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ColumnInjectNode that = (ColumnInjectNode) o; + return targetIndex == that.targetIndex + && columnGeneratorParameter.equals(that.columnGeneratorParameter); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), targetIndex, columnGeneratorParameter); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SlidingWindowAggregationNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SlidingWindowAggregationNode.java index e11adf54543..dddb3f50022 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SlidingWindowAggregationNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SlidingWindowAggregationNode.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; -import org.apache.iotdb.db.queryengine.plan.optimization.base.ColumnInjectionPushDown; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -38,8 +37,7 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -public class SlidingWindowAggregationNode extends SingleChildProcessNode - implements ColumnInjectionPushDown { +public class SlidingWindowAggregationNode extends SingleChildProcessNode { // The list of aggregate functions, each AggregateDescriptor will be output as one column of // result TsBlock @@ -81,7 +79,6 @@ public class SlidingWindowAggregationNode extends SingleChildProcessNode return outputEndTime; } - @Override public void setOutputEndTime(boolean outputEndTime) { this.outputEndTime = outputEndTime; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java index a3645f3d28a..791122583f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/SeriesAggregationSourceNode.java @@ -21,8 +21,8 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.plan.expression.Expression; -import org.apache.iotdb.db.queryengine.plan.optimization.base.ColumnInjectionPushDown; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; @@ -34,8 +34,7 @@ import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -public abstract class SeriesAggregationSourceNode extends SeriesSourceNode - implements ColumnInjectionPushDown { +public abstract class SeriesAggregationSourceNode extends SeriesSourceNode { // The list of aggregate functions, each AggregateDescriptor will be output as one column in // result TsBlock @@ -88,7 +87,6 @@ public abstract class SeriesAggregationSourceNode extends SeriesSourceNode return outputEndTime; } - @Override public void setOutputEndTime(boolean outputEndTime) { this.outputEndTime = outputEndTime; } @@ -112,6 +110,11 @@ public abstract class SeriesAggregationSourceNode extends SeriesSourceNode return outputColumnNames; } + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitSeriesAggregationSourceNode(this, context); + } + @Override public boolean equals(Object o) { if (this == o) {
