This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/PredicatePushDown in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 39de47d3bfd8002044fc52338ced29a186680ff7 Author: Minghui Liu <[email protected]> AuthorDate: Sun Jan 7 22:31:13 2024 +0800 implement new PlanOptimizer: PredicatePushDown --- .../plan/optimization/PredicatePushDown.java | 307 +++++++++++++++++++++ 1 file changed, 307 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java new file mode 100644 index 00000000000..d4d412017e7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.optimization; + +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer; +import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanSourceNode; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; + +import java.util.ArrayList; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +public class PredicatePushDown implements PlanOptimizer { + + @Override + public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) { + if (analysis.getStatement().getType() != StatementType.QUERY) { + return plan; + } + QueryStatement queryStatement = (QueryStatement) analysis.getStatement(); + if (queryStatement.isLastQuery() || !analysis.hasValueFilter()) { + return plan; + } + return plan.accept(new Rewriter(), new RewriterContext(context.getQueryId())); + } + + private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> { + + @Override + public PlanNode visitPlan(PlanNode node, RewriterContext context) { + throw new IllegalArgumentException("Unexpected plan node: " + node); + } + + @Override + public PlanNode visitSingleChildProcess(SingleChildProcessNode node, RewriterContext context) { + PlanNode rewrittenChild = node.getChild().accept(this, context); + node.setChild(rewrittenChild); + return node; + } + + @Override + public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) { + List<PlanNode> rewrittenChildren = new ArrayList<>(); + for (PlanNode child : node.getChildren()) { + rewrittenChildren.add(child.accept(this, context)); + } + node.setChildren(rewrittenChildren); + return node; + } + + @Override + public PlanNode visitFilter(FilterNode node, RewriterContext context) { + if (fromHaving(node.getPredicate())) { + return visitSingleChildProcess(node, context); + } + + context.setPushDownFilterNode(node); + PlanNode rewrittenChild = node.getChild().accept(this, context); + + boolean enablePushDown = context.isEnablePushDown(); + context.reset(); + + if (enablePushDown) { + return rewrittenChild; + } + return node; + } + + private boolean fromHaving(Expression predicate) { + List<Expression> aggregations = ExpressionAnalyzer.searchAggregationExpressions(predicate); + return aggregations != null && !aggregations.isEmpty(); + } + + @Override + public PlanNode visitFullOuterTimeJoin(FullOuterTimeJoinNode node, RewriterContext context) { + if (context.hasNotInheritedPredicate()) { + return node; + } + + Expression inheritedPredicate = context.getInheritedPredicate(); + List<Expression> conjuncts = PredicateUtils.extractConjuncts(inheritedPredicate); + + List<PlanNode> children = node.getChildren(); + + List<List<Expression>> pushDownConjunctsForEachChild = new ArrayList<>(children.size()); + // empty list for each child at first + for (int i = 0; i < children.size(); i++) { + pushDownConjunctsForEachChild.add(new ArrayList<>()); + } + + List<Expression> cannotPushDownConjuncts = new ArrayList<>(); + extractPushDownConjunctsForEachChild( + conjuncts, children, pushDownConjunctsForEachChild, cannotPushDownConjuncts); + + if (cannotPushDownConjuncts.size() == conjuncts.size()) { + // all conjuncts cannot push down + return node; + } + + context.setEnablePushDown(true); + + List<PlanNode> childrenWithPredicate = new ArrayList<>(); + List<PlanNode> childrenWithoutPredicate = new ArrayList<>(); + for (int i = 0; i < children.size(); i++) { + SeriesScanSourceNode child = (SeriesScanSourceNode) children.get(i); + if (pushDownConjunctsForEachChild.get(i).isEmpty()) { + childrenWithoutPredicate.add(child); + } else { + child.setPushDownPredicate( + PredicateUtils.combineConjuncts(pushDownConjunctsForEachChild.get(i))); + childrenWithPredicate.add(child); + } + } + + PlanNode left = planInnerTimeJoin(childrenWithPredicate, node.getMergeOrder(), context); + PlanNode right = + planFullOuterTimeJoin(childrenWithoutPredicate, node.getMergeOrder(), context); + + PlanNode resultNode = planLeftOuterTimeJoin(left, right, node.getMergeOrder(), context); + + if (!cannotPushDownConjuncts.isEmpty()) { + resultNode = + planFilter( + resultNode, PredicateUtils.combineConjuncts(cannotPushDownConjuncts), context); + } + return resultNode; + } + + private void extractPushDownConjunctsForEachChild( + List<Expression> conjuncts, + List<PlanNode> children, + List<List<Expression>> pushDownConjunctsForEachChild, + List<Expression> cannotPushDownConjuncts) { + // find the source symbol for each child + List<String> sourceSymbolForEachChild = new ArrayList<>(children.size()); + for (PlanNode child : children) { + checkArgument( + child instanceof SeriesScanSourceNode, "Unexpected node type: " + child.getClass()); + sourceSymbolForEachChild.add(((SeriesScanSourceNode) child).getSourceSymbol()); + } + + // distinguish conjuncts that can push down and cannot push down + for (Expression conjunct : conjuncts) { + boolean canPushDown = false; + for (int i = 0; i < sourceSymbolForEachChild.size(); i++) { + if (PredicateUtils.isPredicateOnlyContainSourceSymbol( + conjunct, sourceSymbolForEachChild.get(i))) { + pushDownConjunctsForEachChild.get(i).add(conjunct); + canPushDown = true; + break; + } + } + if (!canPushDown) { + cannotPushDownConjuncts.add(conjunct); + } + } + } + + private PlanNode planInnerTimeJoin( + List<PlanNode> children, Ordering mergeOrder, RewriterContext context) { + PlanNode resultNode = null; + if (children.size() == 1) { + resultNode = children.get(0); + } else if (children.size() > 1) { + resultNode = new InnerTimeJoinNode(context.genPlanNodeId(), mergeOrder, children); + } + return resultNode; + } + + private PlanNode planFullOuterTimeJoin( + List<PlanNode> children, Ordering mergeOrder, RewriterContext context) { + PlanNode resultNode = null; + if (children.size() == 1) { + resultNode = children.get(0); + } else if (children.size() > 1) { + resultNode = new FullOuterTimeJoinNode(context.genPlanNodeId(), mergeOrder, children); + } + return resultNode; + } + + private PlanNode planLeftOuterTimeJoin( + PlanNode left, PlanNode right, Ordering mergeOrder, RewriterContext context) { + checkState(left != null || right != null); + PlanNode resultNode; + if (left == null) { + resultNode = right; + } else if (right == null) { + resultNode = left; + } else { + resultNode = new LeftOuterTimeJoinNode(context.genPlanNodeId(), mergeOrder, left, right); + } + return resultNode; + } + + private PlanNode planFilter(PlanNode child, Expression predicate, RewriterContext context) { + FilterNode pushDownFilterNode = context.getPushDownFilterNode(); + return new FilterNode( + context.genPlanNodeId(), + child, + pushDownFilterNode.getOutputExpressions(), + predicate, + pushDownFilterNode.isKeepNull(), + pushDownFilterNode.getZoneId(), + pushDownFilterNode.getScanOrder()); + } + + @Override + public PlanNode visitSeriesScanSource(SeriesScanSourceNode node, RewriterContext context) { + if (context.hasNotInheritedPredicate()) { + return node; + } + + Expression inheritedPredicate = context.getInheritedPredicate(); + if (PredicateUtils.isPredicateOnlyContainSourceSymbol( + inheritedPredicate, node.getSourceSymbol())) { + node.setPushDownPredicate(inheritedPredicate); + context.setEnablePushDown(true); + return node; + } + + // cannot push down + return node; + } + } + + private static class RewriterContext { + + private final QueryId queryId; + + private FilterNode pushDownFilterNode; + + private boolean enablePushDown = false; + + private RewriterContext(QueryId queryId) { + this.queryId = queryId; + } + + public PlanNodeId genPlanNodeId() { + return queryId.genPlanNodeId(); + } + + public FilterNode getPushDownFilterNode() { + return pushDownFilterNode; + } + + public void setPushDownFilterNode(FilterNode pushDownFilterNode) { + this.pushDownFilterNode = pushDownFilterNode; + } + + public boolean hasNotInheritedPredicate() { + return pushDownFilterNode == null; + } + + public Expression getInheritedPredicate() { + checkState(pushDownFilterNode != null); + return pushDownFilterNode.getPredicate(); + } + + public boolean isEnablePushDown() { + return enablePushDown; + } + + public void setEnablePushDown(boolean enablePushDown) { + this.enablePushDown = enablePushDown; + } + + public void reset() { + this.pushDownFilterNode = null; + this.enablePushDown = false; + } + } +}
