This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpch500
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 744daec24309e2bcedbf87d03ee5de4c93cb547a
Author: englefly <[email protected]>
AuthorDate: Tue Dec 26 13:53:53 2023 +0800

    prune some rf for external db
---
 .../nereids/processor/post/PlanPostProcessors.java |   3 +
 .../processor/post/RuntimeFilterContext.java       |  20 ++-
 .../processor/post/RuntimeFilterGenerator.java     |   6 +-
 .../post/RuntimeFilterPrunerForExternalTable.java  | 158 +++++++++++++++++++++
 .../trees/plans/physical/AbstractPhysicalPlan.java |   4 +-
 .../trees/plans/physical/PhysicalHashJoin.java     |   1 +
 .../trees/plans/physical/RuntimeFilter.java        |  23 ++-
 .../apache/doris/nereids/util/MutableState.java    |   2 +
 .../java/org/apache/doris/qe/SessionVariable.java  |   6 +
 9 files changed, 206 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index 7e69db04773..17538d55d45 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -69,6 +69,9 @@ public class PlanPostProcessors {
             builder.add(new RuntimeFilterGenerator());
             if 
(ConnectContext.get().getSessionVariable().enableRuntimeFilterPrune) {
                 builder.add(new RuntimeFilterPruner());
+                if 
(ConnectContext.get().getSessionVariable().runtimeFilterPruneForExternal) {
+                    builder.add(new RuntimeFilterPrunerForExternalTable());
+                }
             }
         }
         builder.add(new Validator());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index e986921d72e..b7858d42768 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -96,6 +96,8 @@ public class RuntimeFilterContext {
 
     public List<RuntimeFilter> prunedRF = Lists.newArrayList();
 
+    public final List<Plan> needRfPlans = Lists.newArrayList();
+
     private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
 
     // exprId of target to runtime filter.
@@ -197,14 +199,20 @@ public class RuntimeFilterContext {
                 RuntimeFilter rf = iter.next();
                 if (rf.getBuilderNode().equals(builderNode)) {
                     builderNode.getRuntimeFilters().remove(rf);
-                    for (Slot target : rf.getTargetSlots()) {
-                        if (target.getExprId().equals(targetId)) {
-                            Pair<PhysicalRelation, Slot> pair = 
aliasTransferMap.get(target);
-                            if (pair != null) {
-                                pair.first.removeAppliedRuntimeFilter(rf);
-                            }
+                    for (int i = 0; i < rf.getTargetSlots().size(); i++) {
+                        Slot targetSlot = rf.getTargetSlots().get(i);
+                        if (targetSlot.getExprId().equals(targetId)) {
+                            
rf.getTargetScans().get(i).removeAppliedRuntimeFilter(rf);
                         }
                     }
+                    // for (Slot target : rf.getTargetSlots()) {
+                    //     if (target.getExprId().equals(targetId)) {
+                    //         Pair<PhysicalRelation, Slot> pair = 
aliasTransferMap.get(target);
+                    //         if (pair != null) {
+                    //             pair.first.removeAppliedRuntimeFilter(rf);
+                    //         }
+                    //     }
+                    // }
                     iter.remove();
                     prunedRF.add(rf);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index cff906df208..4aa71279f4d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -186,7 +186,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                 PhysicalRelation scan = aliasTransferMap.get(targetSlot).first;
                 RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
                         bitmapContains.child(0), ImmutableList.of(scanSlot),
-                        ImmutableList.of(bitmapContains.child(1)), type, i, 
join, isNot, -1L);
+                        ImmutableList.of(bitmapContains.child(1)), type, i, 
join, isNot, -1L, scan);
                 scan.addAppliedRuntimeFilter(filter);
                 ctx.addJoinToTargetMap(join, scanSlot.getExprId());
                 ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
@@ -267,7 +267,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                 RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
                         compare.child(1), ImmutableList.of(olapScanSlot), 
ImmutableList.of(olapScanSlot),
                         TRuntimeFilterType.MIN_MAX, exprOrder, join, true, 
buildSideNdv,
-                        getMinMaxType(compare));
+                        getMinMaxType(compare), scan);
                 scan.addAppliedRuntimeFilter(filter);
                 ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
                 ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
@@ -618,7 +618,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                     // build multi-target runtime filter
                     // since always on different join, set the expr_order as 0
                     RuntimeFilter filter = new 
RuntimeFilter(generator.getNextId(),
-                            equalTo.right(), targetList, type, 0, join, 
buildSideNdv);
+                            equalTo.right(), targetList, type, 0, join, 
buildSideNdv, cteNode);
                     targetNodes.forEach(node -> 
node.addAppliedRuntimeFilter(filter));
                     for (Slot slot : targetList) {
                         ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
new file mode 100644
index 00000000000..dd104173b21
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Join;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.nereids.util.MutableState;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * prune rf for external db
+ */
+public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor {
+    /**
+     * add parent to plan node and then remove rf if it is an external scan 
and only used as probe
+     */
+    @Override
+    public Plan processRoot(Plan plan, CascadesContext ctx) {
+        plan = plan.accept(this, ctx);
+        RuntimeFilterContext rfCtx = ctx.getRuntimeFilterContext();
+        for (RuntimeFilter rf : rfCtx.getNereidsRuntimeFilter()) {
+            AbstractPhysicalJoin join = rf.getBuilderNode();
+            if (join instanceof PhysicalHashJoin) {
+                List<Plan> joinAncestors = getAncestors(rf.getBuilderNode());
+                for (int i = 0; i < rf.getTargetScans().size(); i++) {
+                    PhysicalRelation scan = rf.getTargetScans().get(i);
+                    if (canPrune(scan, joinAncestors)) {
+                        
rfCtx.removeFilter(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) 
join);
+                    }
+                }
+            }
+        }
+        return plan;
+    }
+
+    @Override
+    public Plan visit(Plan plan, CascadesContext context) {
+        for (Plan child : plan.children()) {
+            child.setMutableState(MutableState.KEY_PARENT, plan);
+            child.accept(this, context);
+        }
+        setMaxChildRuntimeFilterJump(plan);
+        return plan;
+    }
+
+    @Override
+    public PhysicalRelation visitPhysicalRelation(PhysicalRelation scan, 
CascadesContext context) {
+        RuntimeFilterContext rfCtx = context.getRuntimeFilterContext();
+        List<Slot> slots = rfCtx.getTargetListByScan(scan);
+        int maxJump = -1;
+        for (Slot slot : slots) {
+            if 
(!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
+                for (RuntimeFilter rf : 
rfCtx.getTargetExprIdToFilter().get(slot.getExprId())) {
+                    Optional<Object> oJump = 
rf.getBuilderNode().getMutableState(MutableState.KEY_RF_JUMP);
+                    if (oJump.isPresent()) {
+                        Integer jump = (Integer) (oJump.get());
+                        if (jump > maxJump) {
+                            maxJump = jump;
+                        }
+                    }
+                }
+            }
+        }
+        scan.setMutableState(MutableState.KEY_RF_JUMP, maxJump + 1);
+        return scan;
+    }
+
+    @Override
+    public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends 
Plan, ? extends Plan> join,
+                                                  CascadesContext context) {
+        join.right().accept(this, context);
+        join.right().setMutableState(MutableState.KEY_PARENT, join);
+        join.setMutableState(MutableState.KEY_RF_JUMP, 
join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
+        join.left().accept(this, context);
+        join.left().setMutableState(MutableState.KEY_PARENT, join);
+        return join;
+    }
+
+    private List<Plan> getAncestors(Plan plan) {
+        List<Plan> ancestors = Lists.newArrayList();
+        ancestors.add(plan);
+        Optional<Object> parent = 
plan.getMutableState(MutableState.KEY_PARENT);
+        while (parent.isPresent()) {
+            ancestors.add((Plan) parent.get());
+            parent = ((Plan) 
parent.get()).getMutableState(MutableState.KEY_PARENT);
+        }
+        return ancestors;
+    }
+
+    private boolean canPrune(PhysicalRelation scan, List<Plan> 
joinAndAncestors) {
+        if (!(scan instanceof PhysicalFileScan)) {
+            return false;
+        }
+        Plan cursor = scan;
+        Optional<Plan> parent = 
cursor.getMutableState(MutableState.KEY_PARENT);
+        while (parent.isPresent()) {
+            if (joinAndAncestors.contains(parent.get())) {
+                Optional oi = 
parent.get().getMutableState(MutableState.KEY_RF_JUMP);
+                if (oi.isPresent() && ConnectContext.get() != null
+                        && (int) (oi.get()) > 
ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
+                    return true;
+                }
+            } else {
+                if (isBuildSide(parent.get(), cursor)) {
+                    return false;
+                }
+            }
+            cursor = parent.get();
+            parent = cursor.getMutableState(MutableState.KEY_PARENT);
+        }
+        return false;
+    }
+
+    private boolean isBuildSide(Plan parent, Plan child) {
+        return parent instanceof Join && child.equals(parent.child(1));
+    }
+
+    private void setMaxChildRuntimeFilterJump(Plan plan) {
+        int maxJump = 0;
+        for (Plan child : plan.children()) {
+            Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP);
+            if (oi.isPresent()) {
+                int jump = (Integer) (oi.get());
+                if (jump > maxJump) {
+                    maxJump = jump;
+                }
+            }
+        }
+        plan.setMutableState(MutableState.KEY_RF_JUMP, maxJump);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index 14aa44e7943..47b1d67ced4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -123,14 +123,14 @@ public abstract class AbstractPhysicalPlan extends 
AbstractPlan implements Physi
         Preconditions.checkState(scanSlot != null, "scan slot is null");
         if (filter != null) {
             this.addAppliedRuntimeFilter(filter);
-            filter.addTargetSlot(scanSlot);
+            filter.addTargetSlot(scanSlot, scan);
             filter.addTargetExpression(scanSlot);
             ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
             ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
             ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first, 
scanSlot);
         } else {
             filter = new RuntimeFilter(generator.getNextId(),
-                    src, ImmutableList.of(scanSlot), type, exprOrder, 
builderNode, buildSideNdv);
+                    src, ImmutableList.of(scanSlot), type, exprOrder, 
builderNode, buildSideNdv, scan);
             this.addAppliedRuntimeFilter(filter);
             ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
             ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 183ccaabfa8..39462be71ef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -260,6 +260,7 @@ public class PhysicalHashJoin<
             builder.append(" build RFs:").append(runtimeFilters.stream()
                     .map(rf -> 
rf.shapeInfo()).collect(Collectors.joining(";")));
         }
+        // builder.append("jump: 
").append(getMutableState(MutableState.KEY_RF_JUMP));
         return builder.toString();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index d928c45078b..a8f4c3cd7c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -50,20 +50,24 @@ public class RuntimeFilter {
     // use for min-max filter only. specify if the min or max side is valid
     private final TMinMaxRuntimeFilterType tMinMaxType;
 
+    private final List<PhysicalRelation> targetScans = Lists.newArrayList();
+
     /**
      * constructor
      */
     public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> 
targets, TRuntimeFilterType type,
-            int exprOrder, AbstractPhysicalJoin builderNode, long 
buildSideNdv) {
+            int exprOrder, AbstractPhysicalJoin builderNode, long buildSideNdv,
+                         PhysicalRelation scan) {
         this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder,
-                builderNode, false, buildSideNdv, 
TMinMaxRuntimeFilterType.MIN_MAX);
+                builderNode, false, buildSideNdv, 
TMinMaxRuntimeFilterType.MIN_MAX, scan);
     }
 
     public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> 
targets, List<Expression> targetExpressions,
                          TRuntimeFilterType type, int exprOrder, 
AbstractPhysicalJoin builderNode,
-                         boolean bitmapFilterNotIn, long buildSideNdv) {
+                         boolean bitmapFilterNotIn, long buildSideNdv,
+                         PhysicalRelation scan) {
         this(id, src, targets, targetExpressions, type, exprOrder,
-                builderNode, bitmapFilterNotIn, buildSideNdv, 
TMinMaxRuntimeFilterType.MIN_MAX);
+                builderNode, bitmapFilterNotIn, buildSideNdv, 
TMinMaxRuntimeFilterType.MIN_MAX, scan);
     }
 
     /**
@@ -71,7 +75,8 @@ public class RuntimeFilter {
      */
     public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> 
targets, List<Expression> targetExpressions,
                          TRuntimeFilterType type, int exprOrder, 
AbstractPhysicalJoin builderNode,
-                         boolean bitmapFilterNotIn, long buildSideNdv, 
TMinMaxRuntimeFilterType tMinMaxType) {
+                         boolean bitmapFilterNotIn, long buildSideNdv, 
TMinMaxRuntimeFilterType tMinMaxType,
+                         PhysicalRelation scan) {
         this.id = id;
         this.srcSlot = src;
         this.targetSlots = Lists.newArrayList(targets);
@@ -83,6 +88,7 @@ public class RuntimeFilter {
         this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv;
         this.tMinMaxType = tMinMaxType;
         builderNode.addRuntimeFilter(this);
+        this.targetScans.add(scan);
     }
 
     public TMinMaxRuntimeFilterType gettMinMaxType() {
@@ -125,8 +131,9 @@ public class RuntimeFilter {
         return buildSideNdv;
     }
 
-    public void addTargetSlot(Slot target) {
+    public void addTargetSlot(Slot target, PhysicalRelation scan) {
         targetSlots.add(target);
+        targetScans.add(scan);
     }
 
     public List<Slot> getTargetSlots() {
@@ -137,6 +144,10 @@ public class RuntimeFilter {
         targetExpressions.add(targetExpr);
     }
 
+    public List<PhysicalRelation> getTargetScans() {
+        return targetScans;
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java
index 37234e954df..6f03de77fa5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/MutableState.java
@@ -25,6 +25,8 @@ import java.util.Optional;
 public interface MutableState {
     String KEY_GROUP = "group";
     String KEY_FRAGMENT = "fragment";
+    String KEY_PARENT = "parent";
+    String KEY_RF_JUMP = "rf-jump";
 
     <T> Optional<T> get(String key);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8bd68a93d3e..8761f82ed3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -555,6 +555,12 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
     public boolean enableProfile = false;
 
+    @VariableMgr.VarAttr(name = "runtime_filter_prune_for_external")
+    public boolean runtimeFilterPruneForExternal = true;
+
+    @VariableMgr.VarAttr(name = "runtime_filter_jump_threshold")
+    public int runtimeFilterJumpThreshold = 2;
+
     // using hashset instead of group by + count can improve performance
     //        but may cause rpc failed when cluster has less BE
     // Whether this switch is turned on depends on the BE number


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to