This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpch500 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 744daec24309e2bcedbf87d03ee5de4c93cb547a Author: englefly <[email protected]> AuthorDate: Tue Dec 26 13:53:53 2023 +0800 prune some rf for external db --- .../nereids/processor/post/PlanPostProcessors.java | 3 + .../processor/post/RuntimeFilterContext.java | 20 ++- .../processor/post/RuntimeFilterGenerator.java | 6 +- .../post/RuntimeFilterPrunerForExternalTable.java | 158 +++++++++++++++++++++ .../trees/plans/physical/AbstractPhysicalPlan.java | 4 +- .../trees/plans/physical/PhysicalHashJoin.java | 1 + .../trees/plans/physical/RuntimeFilter.java | 23 ++- .../apache/doris/nereids/util/MutableState.java | 2 + .../java/org/apache/doris/qe/SessionVariable.java | 6 + 9 files changed, 206 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index 7e69db04773..17538d55d45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -69,6 +69,9 @@ public class PlanPostProcessors { builder.add(new RuntimeFilterGenerator()); if (ConnectContext.get().getSessionVariable().enableRuntimeFilterPrune) { builder.add(new RuntimeFilterPruner()); + if (ConnectContext.get().getSessionVariable().runtimeFilterPruneForExternal) { + builder.add(new RuntimeFilterPrunerForExternalTable()); + } } } builder.add(new Validator()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index e986921d72e..b7858d42768 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -96,6 +96,8 @@ public class RuntimeFilterContext { public List<RuntimeFilter> prunedRF = Lists.newArrayList(); + public final List<Plan> needRfPlans = Lists.newArrayList(); + private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator(); // exprId of target to runtime filter. @@ -197,14 +199,20 @@ public class RuntimeFilterContext { RuntimeFilter rf = iter.next(); if (rf.getBuilderNode().equals(builderNode)) { builderNode.getRuntimeFilters().remove(rf); - for (Slot target : rf.getTargetSlots()) { - if (target.getExprId().equals(targetId)) { - Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(target); - if (pair != null) { - pair.first.removeAppliedRuntimeFilter(rf); - } + for (int i = 0; i < rf.getTargetSlots().size(); i++) { + Slot targetSlot = rf.getTargetSlots().get(i); + if (targetSlot.getExprId().equals(targetId)) { + rf.getTargetScans().get(i).removeAppliedRuntimeFilter(rf); } } + // for (Slot target : rf.getTargetSlots()) { + // if (target.getExprId().equals(targetId)) { + // Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(target); + // if (pair != null) { + // pair.first.removeAppliedRuntimeFilter(rf); + // } + // } + // } iter.remove(); prunedRF.add(rf); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index cff906df208..4aa71279f4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -186,7 +186,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { PhysicalRelation scan = aliasTransferMap.get(targetSlot).first; RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), bitmapContains.child(0), ImmutableList.of(scanSlot), - ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L); + ImmutableList.of(bitmapContains.child(1)), type, i, join, isNot, -1L, scan); scan.addAppliedRuntimeFilter(filter); ctx.addJoinToTargetMap(join, scanSlot.getExprId()); ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); @@ -267,7 +267,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), compare.child(1), ImmutableList.of(olapScanSlot), ImmutableList.of(olapScanSlot), TRuntimeFilterType.MIN_MAX, exprOrder, join, true, buildSideNdv, - getMinMaxType(compare)); + getMinMaxType(compare), scan); scan.addAppliedRuntimeFilter(filter); ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); @@ -618,7 +618,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { // build multi-target runtime filter // since always on different join, set the expr_order as 0 RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), - equalTo.right(), targetList, type, 0, join, buildSideNdv); + equalTo.right(), targetList, type, 0, join, buildSideNdv, cteNode); targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter)); for (Slot slot : targetList) { ctx.setTargetExprIdToFilter(slot.getExprId(), filter); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java new file mode 100644 index 00000000000..dd104173b21 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Join; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; +import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; +import org.apache.doris.nereids.util.MutableState; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Optional; + +/** + * prune rf for external db + */ +public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor { + /** + * add parent to plan node and then remove rf if it is an external scan and only used as probe + */ + @Override + public Plan processRoot(Plan plan, CascadesContext ctx) { + plan = plan.accept(this, ctx); + RuntimeFilterContext rfCtx = ctx.getRuntimeFilterContext(); + for (RuntimeFilter rf : rfCtx.getNereidsRuntimeFilter()) { + AbstractPhysicalJoin join = rf.getBuilderNode(); + if (join instanceof PhysicalHashJoin) { + List<Plan> joinAncestors = getAncestors(rf.getBuilderNode()); + for (int i = 0; i < rf.getTargetScans().size(); i++) { + PhysicalRelation scan = rf.getTargetScans().get(i); + if (canPrune(scan, joinAncestors)) { + rfCtx.removeFilter(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) join); + } + } + } + } + return plan; + } + + @Override + public Plan visit(Plan plan, CascadesContext context) { + for (Plan child : plan.children()) { + child.setMutableState(MutableState.KEY_PARENT, plan); + child.accept(this, context); + } + setMaxChildRuntimeFilterJump(plan); + return plan; + } + + @Override + public PhysicalRelation visitPhysicalRelation(PhysicalRelation scan, CascadesContext context) { + RuntimeFilterContext rfCtx = context.getRuntimeFilterContext(); + List<Slot> slots = rfCtx.getTargetListByScan(scan); + int maxJump = -1; + for (Slot slot : slots) { + if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) { + for (RuntimeFilter rf : rfCtx.getTargetExprIdToFilter().get(slot.getExprId())) { + Optional<Object> oJump = rf.getBuilderNode().getMutableState(MutableState.KEY_RF_JUMP); + if (oJump.isPresent()) { + Integer jump = (Integer) (oJump.get()); + if (jump > maxJump) { + maxJump = jump; + } + } + } + } + } + scan.setMutableState(MutableState.KEY_RF_JUMP, maxJump + 1); + return scan; + } + + @Override + public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, + CascadesContext context) { + join.right().accept(this, context); + join.right().setMutableState(MutableState.KEY_PARENT, join); + join.setMutableState(MutableState.KEY_RF_JUMP, join.right().getMutableState(MutableState.KEY_RF_JUMP).get()); + join.left().accept(this, context); + join.left().setMutableState(MutableState.KEY_PARENT, join); + return join; + } + + private List<Plan> getAncestors(Plan plan) { + List<Plan> ancestors = Lists.newArrayList(); + ancestors.add(plan); + Optional<Object> parent = plan.getMutableState(MutableState.KEY_PARENT); + while (parent.isPresent()) { + ancestors.add((Plan) parent.get()); + parent = ((Plan) parent.get()).getMutableState(MutableState.KEY_PARENT); + } + return ancestors; + } + + private boolean canPrune(PhysicalRelation scan, List<Plan> joinAndAncestors) { + if (!(scan instanceof PhysicalFileScan)) { + return false; + } + Plan cursor = scan; + Optional<Plan> parent = cursor.getMutableState(MutableState.KEY_PARENT); + while (parent.isPresent()) { + if (joinAndAncestors.contains(parent.get())) { + Optional oi = parent.get().getMutableState(MutableState.KEY_RF_JUMP); + if (oi.isPresent() && ConnectContext.get() != null + && (int) (oi.get()) > ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) { + return true; + } + } else { + if (isBuildSide(parent.get(), cursor)) { + return false; + } + } + cursor = parent.get(); + parent = cursor.getMutableState(MutableState.KEY_PARENT); + } + return false; + } + + private boolean isBuildSide(Plan parent, Plan child) { + return parent instanceof Join && child.equals(parent.child(1)); + } + + private void setMaxChildRuntimeFilterJump(Plan plan) { + int maxJump = 0; + for (Plan child : plan.children()) { + Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP); + if (oi.isPresent()) { + int jump = (Integer) (oi.get()); + if (jump > maxJump) { + maxJump = jump; + } + } + } + plan.setMutableState(MutableState.KEY_RF_JUMP, maxJump); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 14aa44e7943..47b1d67ced4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -123,14 +123,14 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi Preconditions.checkState(scanSlot != null, "scan slot is null"); if (filter != null) { this.addAppliedRuntimeFilter(filter); - filter.addTargetSlot(scanSlot); + filter.addTargetSlot(scanSlot, scan); filter.addTargetExpression(scanSlot); ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId()); ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first, scanSlot); } else { filter = new RuntimeFilter(generator.getNextId(), - src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv); + src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan); this.addAppliedRuntimeFilter(filter); ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId()); ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java index 183ccaabfa8..39462be71ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java @@ -260,6 +260,7 @@ public class PhysicalHashJoin< builder.append(" build RFs:").append(runtimeFilters.stream() .map(rf -> rf.shapeInfo()).collect(Collectors.joining(";"))); } + // builder.append("jump: ").append(getMutableState(MutableState.KEY_RF_JUMP)); return builder.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java index d928c45078b..a8f4c3cd7c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java @@ -50,20 +50,24 @@ public class RuntimeFilter { // use for min-max filter only. specify if the min or max side is valid private final TMinMaxRuntimeFilterType tMinMaxType; + private final List<PhysicalRelation> targetScans = Lists.newArrayList(); + /** * constructor */ public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, TRuntimeFilterType type, - int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv) { + int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv, + PhysicalRelation scan) { this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, - builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX); + builderNode, false, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX, scan); } public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions, TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, - boolean bitmapFilterNotIn, long buildSideNdv) { + boolean bitmapFilterNotIn, long buildSideNdv, + PhysicalRelation scan) { this(id, src, targets, targetExpressions, type, exprOrder, - builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX); + builderNode, bitmapFilterNotIn, buildSideNdv, TMinMaxRuntimeFilterType.MIN_MAX, scan); } /** @@ -71,7 +75,8 @@ public class RuntimeFilter { */ public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> targets, List<Expression> targetExpressions, TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin builderNode, - boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType) { + boolean bitmapFilterNotIn, long buildSideNdv, TMinMaxRuntimeFilterType tMinMaxType, + PhysicalRelation scan) { this.id = id; this.srcSlot = src; this.targetSlots = Lists.newArrayList(targets); @@ -83,6 +88,7 @@ public class RuntimeFilter { this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv; this.tMinMaxType = tMinMaxType; builderNode.addRuntimeFilter(this); + this.targetScans.add(scan); } public TMinMaxRuntimeFilterType gettMinMaxType() { @@ -125,8 +131,9 @@ public class RuntimeFilter { return buildSideNdv; } - public void addTargetSlot(Slot target) { + public void addTargetSlot(Slot target, PhysicalRelation scan) { targetSlots.add(target); + targetScans.add(scan); } public List<Slot> getTargetSlots() { @@ -137,6 +144,10 @@ public class RuntimeFilter { targetExpressions.add(targetExpr); } + public List<PhysicalRelation> getTargetScans() { + return targetScans; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java index 37234e954df..6f03de77fa5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java @@ -25,6 +25,8 @@ import java.util.Optional; public interface MutableState { String KEY_GROUP = "group"; String KEY_FRAGMENT = "fragment"; + String KEY_PARENT = "parent"; + String KEY_RF_JUMP = "rf-jump"; <T> Optional<T> get(String key); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8bd68a93d3e..8761f82ed3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -555,6 +555,12 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + @VariableMgr.VarAttr(name = "runtime_filter_prune_for_external") + public boolean runtimeFilterPruneForExternal = true; + + @VariableMgr.VarAttr(name = "runtime_filter_jump_threshold") + public int runtimeFilterJumpThreshold = 2; + // using hashset instead of group by + count can improve performance // but may cause rpc failed when cluster has less BE // Whether this switch is turned on depends on the BE number --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
