This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b50888aaf [feature](Nereids) prune runtime filters which cannot 
reduce the tuple number of probe table (#13990)
9b50888aaf is described below

commit 9b50888aaf10d3d1f95372b74b54bf2a3b7f83bb
Author: minghong <[email protected]>
AuthorDate: Fri Nov 11 13:13:29 2022 +0800

    [feature](Nereids) prune runtime filters which cannot reduce the tuple 
number of probe table (#13990)
    
    1. add a post processor: runtime filter pruner
    Doris generates RFs (runtime filter) on Join node to reduce the probe table 
at scan stage. But some RFs have no effect, because its selectivity is 100%. 
This pr will remove them.
    A RF is effective if
    a. the build column value range covers part of that of probe column, OR
    b. the build column ndv is less than that of probe column, OR
    c. the build column's ColumnStats.selectivity < 1, OR
    d. the build column is reduced by another RF, which satisfies above 
criterions.
    
    2. explain graph
    a. add RF info in Join and Scan node
    b. add predicate count in Scan node
    
    3. Rename session variable
    rename `enable_remove_no_conjuncts_runtime_filter_policy` to 
`enable_runtime_filter_prune`
    
    4. fix min/max column stats derive bug
    `select max(A) as X from T group by B`
    X.min is A.min, not A.max
---
 .../nereids/processor/post/PlanPostProcessors.java |  10 +
 .../processor/post/RuntimeFilterContext.java       |  41 +++++
 .../processor/post/RuntimeFilterGenerator.java     |   1 +
 .../processor/post/RuntimeFilterPruner.java        | 204 +++++++++++++++++++++
 .../doris/nereids/stats/ExpressionEstimation.java  |  12 +-
 .../org/apache/doris/planner/HashJoinNode.java     |   4 +
 .../org/apache/doris/planner/OlapScanNode.java     |   7 +
 .../java/org/apache/doris/planner/PlanNode.java    |  26 ++-
 .../org/apache/doris/planner/RuntimeFilter.java    |   2 +-
 .../doris/planner/RuntimeFilterGenerator.java      |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  26 ++-
 .../org/apache/doris/statistics/ColumnStat.java    |  19 +-
 .../apache/doris/statistics/ColumnStatistic.java   |  18 ++
 .../nereids/stats/ExpressionEstimationTest.java    |   9 +-
 .../suites/tpch_sf1_p1/tpch_sf1/nereids/q9.groovy  |   2 +-
 tools/tpch-tools/queries/q9.sql                    |   2 +-
 16 files changed, 352 insertions(+), 33 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b0f822cac1..23fec67a58 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.processor.post;
 
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
@@ -36,6 +37,12 @@ public class PlanPostProcessors {
         this.cascadesContext = Objects.requireNonNull(cascadesContext, 
"cascadesContext can not be null");
     }
 
+    /**
+     * post process
+     *
+     * @param physicalPlan input plan
+     * @return physcial plan
+     */
     public PhysicalPlan process(PhysicalPlan physicalPlan) {
         PhysicalPlan resultPlan = physicalPlan;
         for (PlanPostProcessor processor : getProcessors()) {
@@ -52,6 +59,9 @@ public class PlanPostProcessors {
         Builder<PlanPostProcessor> builder = ImmutableList.builder();
         if 
(cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsRuntimeFilter())
 {
             builder.add(new RuntimeFilterGenerator());
+            if 
(ConnectContext.get().getSessionVariable().enableRuntimeFilterPrune) {
+                builder.add(new RuntimeFilterPruner());
+            }
         }
         builder.add(new Validator());
         return builder.build();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index 03c7be5f17..58b871344e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
@@ -35,10 +36,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * runtime filter context used at post process and translation.
@@ -50,6 +54,8 @@ public class RuntimeFilterContext {
     // exprId of target to runtime filter.
     private final Map<ExprId, List<RuntimeFilter>> targetExprIdToFilter = 
Maps.newHashMap();
 
+    private final Map<Plan, List<ExprId>> joinToTargetExprId = 
Maps.newHashMap();
+
     // olap scan node that contains target of a runtime filter.
     private final Map<RelationId, List<Slot>> targetOnOlapScanNodeMap = 
Maps.newHashMap();
 
@@ -67,6 +73,7 @@ public class RuntimeFilterContext {
 
     private final Map<Slot, OlapScanNode> scanNodeOfLegacyRuntimeFilterTarget 
= Maps.newHashMap();
 
+    private final Set<Plan> effectiveSrcNodes = Sets.newHashSet();
     private final SessionVariable sessionVariable;
 
     private final FilterSizeLimits limits;
@@ -91,6 +98,24 @@ public class RuntimeFilterContext {
         this.targetExprIdToFilter.computeIfAbsent(id, k -> 
Lists.newArrayList()).add(filter);
     }
 
+    /**
+     * remove rf from builderNode to target
+     *
+     * @param targetId rf target
+     * @param builderNode rf src
+     */
+    public void removeFilter(ExprId targetId, PhysicalHashJoin builderNode) {
+        List<RuntimeFilter> filters = targetExprIdToFilter.get(targetId);
+        if (filters != null) {
+            Iterator<RuntimeFilter> iter = filters.iterator();
+            while (iter.hasNext()) {
+                if (iter.next().getBuilderNode().equals(builderNode)) {
+                    iter.remove();
+                }
+            }
+        }
+    }
+
     public void setTargetsOnScanNode(RelationId id, Slot slot) {
         this.targetOnOlapScanNodeMap.computeIfAbsent(id, k -> 
Lists.newArrayList()).add(slot);
     }
@@ -147,8 +172,24 @@ public class RuntimeFilterContext {
         targetNullCount++;
     }
 
+    public void addEffectiveSrcNode(Plan node) {
+        effectiveSrcNodes.add(node);
+    }
+
+    public boolean isEffectiveSrcNode(Plan node) {
+        return effectiveSrcNodes.contains(node);
+    }
+
     @VisibleForTesting
     public int getTargetNullCount() {
         return targetNullCount;
     }
+
+    public void addJoinToTargetMap(PhysicalHashJoin join, ExprId exprId) {
+        joinToTargetExprId.computeIfAbsent(join, k -> 
Lists.newArrayList()).add(exprId);
+    }
+
+    public List<ExprId> getTargetExprIdByFilterJoin(PhysicalHashJoin join) {
+        return joinToTargetExprId.get(join);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 51c84f2dd6..fe1bcbeac5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -106,6 +106,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                         RuntimeFilter filter = new 
RuntimeFilter(generator.getNextId(),
                                 slots.second, slots.first, type,
                                 cnt.getAndIncrement(), join);
+                        ctx.addJoinToTargetMap(join, slots.first.getExprId());
                         ctx.setTargetExprIdToFilter(slots.first.getExprId(), 
filter);
                         ctx.setTargetsOnScanNode(
                                 aliasTransferMap.get((Slot) 
normalizedChildren.first).first,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
new file mode 100644
index 0000000000..c44f46b0ae
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLocalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.StatsDeriveResult;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Doris generates RFs (runtime filter) on Join node to reduce the probe table 
at scan stage.
+ * But some RFs have no effect, because its selectivity is 100%. This pr will 
remove them.
+ * A RF is effective if
+ *
+ * 1. the build column value range covers part of that of probe column, OR
+ * 2. the build column ndv is less than that of probe column, OR
+ * 3. the build column's ColumnStats.selectivity < 1, OR
+ * 4. the build column is reduced by another RF, which satisfies above 
criterions.
+ *
+ * TODO: item 2 is not used since the estimation is not accurate now.
+ */
+public class RuntimeFilterPruner extends PlanPostProcessor {
+
+    // *******************************
+    // Physical plans
+    // *******************************
+    @Override
+    public PhysicalAggregate visitPhysicalAggregate(PhysicalAggregate<? 
extends Plan> agg, CascadesContext context) {
+        agg.child().accept(this, context);
+        context.getRuntimeFilterContext().addEffectiveSrcNode(agg);
+        return agg;
+    }
+
+    @Override
+    public PhysicalQuickSort visitPhysicalQuickSort(PhysicalQuickSort<? 
extends Plan> sort, CascadesContext context) {
+        sort.child().accept(this, context);
+        if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(sort.child())) {
+            context.getRuntimeFilterContext().addEffectiveSrcNode(sort);
+        }
+        return sort;
+    }
+
+    @Override
+    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, 
CascadesContext context) {
+        topN.child().accept(this, context);
+        context.getRuntimeFilterContext().addEffectiveSrcNode(topN);
+        return topN;
+    }
+
+    public PhysicalLimit visitPhysicalLimit(PhysicalLimit<? extends Plan> 
limit, CascadesContext context) {
+        limit.child().accept(this, context);
+        context.getRuntimeFilterContext().addEffectiveSrcNode(limit);
+        return limit;
+    }
+
+    @Override
+    public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends 
Plan, ? extends Plan> join,
+            CascadesContext context) {
+        join.right().accept(this, context);
+        if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(join.right())) {
+            context.getRuntimeFilterContext().addEffectiveSrcNode(join);
+        } else {
+            RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+            List<ExprId> exprIds = ctx.getTargetExprIdByFilterJoin(join);
+            if (exprIds != null && !exprIds.isEmpty()) {
+                boolean isEffective = false;
+                for (Expression expr : join.getHashJoinConjuncts()) {
+                    if (isEffectiveRuntimeFilter((EqualTo) expr, join)) {
+                        isEffective = true;
+                    }
+                }
+                if (!isEffective) {
+                    exprIds.stream().forEach(exprId -> 
context.getRuntimeFilterContext().removeFilter(exprId, join));
+                }
+            }
+        }
+        join.left().accept(this, context);
+        return join;
+    }
+
+    @Override
+    public PhysicalProject visitPhysicalProject(PhysicalProject<? extends 
Plan> project, CascadesContext context) {
+        project.child().accept(this, context);
+        if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(project.child())) {
+            context.getRuntimeFilterContext().addEffectiveSrcNode(project);
+        }
+        return project;
+    }
+
+    @Override
+    public PhysicalFilter visitPhysicalFilter(PhysicalFilter<? extends Plan> 
filter, CascadesContext context) {
+        filter.child().accept(this, context);
+        context.getRuntimeFilterContext().addEffectiveSrcNode(filter);
+        return filter;
+    }
+
+    @Override
+    public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan olapScan, 
CascadesContext context) {
+        List<Slot> slots = 
context.getRuntimeFilterContext().getTargetOnOlapScanNodeMap().get(olapScan.getId());
+        if (slots != null && !slots.isEmpty()) {
+            context.getRuntimeFilterContext().addEffectiveSrcNode(olapScan);
+        }
+        return olapScan;
+    }
+
+    // *******************************
+    // Physical enforcer
+    // *******************************
+    public PhysicalDistribute visitPhysicalDistribute(PhysicalDistribute<? 
extends Plan> distribute,
+            CascadesContext context) {
+        distribute.child().accept(this, context);
+        if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(distribute.child())) {
+            context.getRuntimeFilterContext().addEffectiveSrcNode(distribute);
+        }
+        return distribute;
+    }
+
+    public PhysicalLocalQuickSort 
visitPhysicalLocalQuickSort(PhysicalLocalQuickSort<? extends Plan> sort,
+            CascadesContext context) {
+        sort.child().accept(this, context);
+        if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(sort.child())) {
+            context.getRuntimeFilterContext().addEffectiveSrcNode(sort);
+        }
+        return sort;
+    }
+
+    public PhysicalAssertNumRows 
visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
+            CascadesContext context) {
+        assertNumRows.child().accept(this, context);
+        return assertNumRows;
+    }
+
+    /**
+     * consider L join R on L.a=R.b
+     * runtime-filter: L.a<-R.b is effective,
+     * if R.b.selectivity<1 or b is partly covered by a
+     *
+     * TODO: min-max
+     * @param equalTo join condition
+     * @param join join node
+     * @return true if runtime-filter is effective
+     */
+    private boolean isEffectiveRuntimeFilter(EqualTo equalTo, PhysicalHashJoin 
join) {
+        StatsDeriveResult leftStats = ((AbstractPlan) 
join.child(0)).getStats();
+        StatsDeriveResult rightStats = ((AbstractPlan) 
join.child(1)).getStats();
+        Set<Slot> leftSlots = equalTo.child(0).getInputSlots();
+        if (leftSlots.size() > 1) {
+            return false;
+        }
+        Set<Slot> rightSlots = equalTo.child(1).getInputSlots();
+        if (rightSlots.size() > 1) {
+            return false;
+        }
+        Slot leftSlot = leftSlots.iterator().next();
+        Slot rightSlot = rightSlots.iterator().next();
+        ColumnStatistic probeColumnStat = 
leftStats.getColumnStatsBySlotId(leftSlot.getExprId());
+        ColumnStatistic buildColumnStat = 
rightStats.getColumnStatsBySlotId(rightSlot.getExprId());
+        //TODO remove these code when we ensure left child if from probe side
+        if (probeColumnStat == null || buildColumnStat == null) {
+            probeColumnStat = 
leftStats.getColumnStatsBySlotId(rightSlot.getExprId());
+            buildColumnStat = 
rightStats.getColumnStatsBySlotId(leftSlot.getExprId());
+            if (probeColumnStat == null || buildColumnStat == null) {
+                return false;
+            }
+        }
+        return buildColumnStat.selectivity < 1
+                || probeColumnStat.coverage(buildColumnStat) < 1
+                || buildColumnStat.ndv < probeColumnStat.ndv * 0.95;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java
index 5bb7277ad1..3cd90143a3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/ExpressionEstimation.java
@@ -175,10 +175,14 @@ public class ExpressionEstimation extends 
ExpressionVisitor<ColumnStatistic, Sta
         if (columnStat == ColumnStatistic.UNKNOWN) {
             return ColumnStatistic.UNKNOWN;
         }
+        /*
+        we keep columnStat.min and columnStat.max, but set ndv=1.
+        if there is group-by keys, we will update ndv when visiting group 
clause
+        */
         double width = min.child().getDataType().width();
         return new 
ColumnStatisticBuilder().setCount(1).setNdv(1).setAvgSizeByte(width).setNumNulls(width)
                 
.setDataSize(child.getDataType().width()).setMinValue(columnStat.minValue)
-                .setMaxValue(columnStat.minValue).setSelectivity(1.0)
+                .setMaxValue(columnStat.maxValue).setSelectivity(1.0)
                 .setMinExpr(null).build();
     }
 
@@ -189,9 +193,13 @@ public class ExpressionEstimation extends 
ExpressionVisitor<ColumnStatistic, Sta
         if (columnStat == ColumnStatistic.UNKNOWN) {
             return ColumnStatistic.UNKNOWN;
         }
+        /*
+        we keep columnStat.min and columnStat.max, but set ndv=1.
+        if there is group-by keys, we will update ndv when visiting group 
clause
+        */
         int width = max.child().getDataType().width();
         return new 
ColumnStatisticBuilder().setCount(1D).setNdv(1D).setAvgSizeByte(width).setNumNulls(0)
-                
.setDataSize(width).setMinValue(columnStat.maxValue).setMaxValue(columnStat.maxValue)
+                
.setDataSize(width).setMinValue(columnStat.minValue).setMaxValue(columnStat.maxValue)
                 .setSelectivity(1.0).setMaxExpr(null).setMinExpr(null).build();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 01c6292253..a0bd60c10f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -784,6 +784,10 @@ public class HashJoinNode extends JoinNodeBase {
 
         if (detailLevel == TExplainLevel.BRIEF) {
             output.append(detailPrefix).append(String.format("cardinality=%s", 
cardinality)).append("\n");
+            if (!runtimeFilters.isEmpty()) {
+                output.append(detailPrefix).append("Build RFs: ");
+                output.append(getRuntimeFilterExplainString(true, true));
+            }
             return output.toString();
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 7a09c391dd..00e32338c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -946,6 +946,13 @@ public class OlapScanNode extends ScanNode {
                 .append("(").append(indexName).append(")");
         if (detailLevel == TExplainLevel.BRIEF) {
             
output.append("\n").append(prefix).append(String.format("cardinality=%s", 
cardinality));
+            if (!runtimeFilters.isEmpty()) {
+                output.append("\n").append(prefix).append("Apply RFs: ");
+                output.append(getRuntimeFilterExplainString(false, true));
+            }
+            if (!conjuncts.isEmpty()) {
+                output.append("\n").append(prefix).append("PREDICATES: 
").append(conjuncts.size()).append("\n");
+            }
             return output.toString();
         }
         if (isPreAggregation) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index acb0971cc5..9df9370245 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -918,7 +918,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         runtimeFilters.clear();
     }
 
-    protected String getRuntimeFilterExplainString(boolean isBuildNode) {
+    protected String getRuntimeFilterExplainString(boolean isBuildNode, 
boolean isBrief) {
         if (runtimeFilters.isEmpty()) {
             return "";
         }
@@ -926,21 +926,27 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         for (RuntimeFilter filter : runtimeFilters) {
             StringBuilder filterStr = new StringBuilder();
             filterStr.append(filter.getFilterId());
-            filterStr.append("[");
-            filterStr.append(filter.getType().toString().toLowerCase());
-            filterStr.append("]");
-            if (isBuildNode) {
-                filterStr.append(" <- ");
-                filterStr.append(filter.getSrcExpr().toSql());
-            } else {
-                filterStr.append(" -> ");
-                filterStr.append(filter.getTargetExpr(getId()).toSql());
+            if (!isBrief) {
+                filterStr.append("[");
+                filterStr.append(filter.getType().toString().toLowerCase());
+                filterStr.append("]");
+                if (isBuildNode) {
+                    filterStr.append(" <- ");
+                    filterStr.append(filter.getSrcExpr().toSql());
+                } else {
+                    filterStr.append(" -> ");
+                    filterStr.append(filter.getTargetExpr(getId()).toSql());
+                }
             }
             filtersStr.add(filterStr.toString());
         }
         return Joiner.on(", ").join(filtersStr) + "\n";
     }
 
+    protected String getRuntimeFilterExplainString(boolean isBuildNode) {
+        return getRuntimeFilterExplainString(isBuildNode, false);
+    }
+
     public void convertToVectoriezd() {
         if (!conjuncts.isEmpty()) {
             vconjunct = convertConjunctsToAndCompoundPredicate(conjuncts);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 13c74472ac..6f71cf5552 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -279,7 +279,7 @@ public final class RuntimeFilter {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Generating runtime filter from predicate " + 
joinPredicate);
         }
-        if 
(ConnectContext.get().getSessionVariable().enableRemoveNoConjunctsRuntimeFilterPolicy)
 {
+        if 
(ConnectContext.get().getSessionVariable().isEnableRuntimeFilterPrune()) {
             if (srcExpr instanceof SlotRef) {
                 if (!tupleHasConjuncts.contains(((SlotRef) 
srcExpr).getDesc().getParent().getId())) {
                     // src tuple has no conjunct, don't create runtime filter
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
index b926c5147b..9f7c2838a6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -162,7 +162,7 @@ public final class RuntimeFilterGenerator {
         Preconditions.checkState(runtimeFilterType >= 0, "runtimeFilterType 
not expected");
         Preconditions.checkState(runtimeFilterType <= 
Arrays.stream(TRuntimeFilterType.values())
                 .mapToInt(TRuntimeFilterType::getValue).sum(), 
"runtimeFilterType not expected");
-        if 
(ConnectContext.get().getSessionVariable().enableRemoveNoConjunctsRuntimeFilterPolicy)
 {
+        if 
(ConnectContext.get().getSessionVariable().enableRuntimeFilterPrune) {
             filterGenerator.findAllTuplesHavingConjuncts(plan);
         }
         filterGenerator.generateFilters(plan);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index eda12cf911..f40b8bfcfb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -197,8 +197,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_NEREIDS_REORDER_TO_ELIMINATE_CROSS_JOIN =
             "enable_nereids_reorder_to_eliminate_cross_join";
 
-    public static final String ENABLE_REMOVE_NO_CONJUNCTS_RUNTIME_FILTER =
-            "enable_remove_no_conjuncts_runtime_filter_policy";
+    public static final String ENABLE_RUNTIME_FILTER_PRUNE =
+            "enable_runtime_filter_prune";
 
     static final String SESSION_CONTEXT = "session_context";
 
@@ -531,8 +531,8 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_NEREIDS_REORDER_TO_ELIMINATE_CROSS_JOIN)
     private boolean enableNereidsReorderToEliminateCrossJoin = true;
 
-    @VariableMgr.VarAttr(name = ENABLE_REMOVE_NO_CONJUNCTS_RUNTIME_FILTER)
-    public boolean enableRemoveNoConjunctsRuntimeFilterPolicy = false;
+    @VariableMgr.VarAttr(name = ENABLE_RUNTIME_FILTER_PRUNE)
+    public boolean enableRuntimeFilterPrune = false;
 
     /**
      * The client can pass some special information by setting this session 
variable in the format: "k1:v1;k2:v2".
@@ -1156,24 +1156,22 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableNereidsStatsDeriveV2 = enableNereidsStatsDeriveV2;
     }
 
-    /**
-     * Serialize to thrift object.
-     * Used for rest api.
-     **/
-    public boolean isEnableRemoveNoConjunctsRuntimeFilterPolicy() {
-        return enableRemoveNoConjunctsRuntimeFilterPolicy;
+    public boolean isEnableRuntimeFilterPrune() {
+        return enableRuntimeFilterPrune;
     }
 
-    public void setEnableRemoveNoConjunctsRuntimeFilterPolicy(boolean 
enableRemoveNoConjunctsRuntimeFilterPolicy) {
-        this.enableRemoveNoConjunctsRuntimeFilterPolicy = 
enableRemoveNoConjunctsRuntimeFilterPolicy;
+    public void setEnableRuntimeFilterPrune(boolean enableRuntimeFilterPrune) {
+        this.enableRuntimeFilterPrune = enableRuntimeFilterPrune;
     }
 
     public void setFragmentTransmissionCompressionCodec(String codec) {
         this.fragmentTransmissionCompressionCodec = codec;
     }
 
-    // Serialize to thrift object
-    // used for rest api
+    /**
+     * Serialize to thrift object.
+     * Used for rest api.
+     */
     public TQueryOptions toThrift() {
         TQueryOptions tResult = new TQueryOptions();
         tResult.setMemLimit(maxExecMemByte);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStat.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStat.java
index f9d64238a7..2c78845e34 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStat.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStat.java
@@ -310,5 +310,22 @@ public class ColumnStat {
         this.selectivity = selectivity;
     }
 
-
+    public double ndvIntersection(ColumnStat other) {
+        if (maxValue == minValue) {
+            if (minValue <= other.maxValue && minValue >= other.minValue) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+        double min = Math.max(minValue, other.minValue);
+        double max = Math.min(maxValue, other.maxValue);
+        if (min < max) {
+            return Math.ceil(ndv * (max - min) / (maxValue - minValue));
+        } else if (min > max) {
+            return 0;
+        } else {
+            return 1;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
index 2b2f2f9ec8..c8511c7867 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
@@ -190,4 +190,22 @@ public class ColumnStatistic {
         }
     }
 
+    /**
+     * the percentage of intersection range to this range
+     * @param other
+     * @return
+     */
+    public double coverage(ColumnStatistic other) {
+        if (minValue == maxValue) {
+            if (other.minValue <= minValue && minValue <= other.maxValue) {
+                return 1.0;
+            } else {
+                return 0.0;
+            }
+        } else {
+            double myRange = maxValue - minValue;
+            double interSection = Math.min(maxValue, other.maxValue) - 
Math.max(minValue, other.minValue);
+            return interSection / myRange;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/ExpressionEstimationTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/ExpressionEstimationTest.java
index 8cdf71d167..991c5713b4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/ExpressionEstimationTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/ExpressionEstimationTest.java
@@ -55,8 +55,11 @@ class ExpressionEstimationTest {
                 .setMaxValue(500);
         slotToColumnStat.put(a.getExprId(), builder.build());
         StatsDeriveResult stat = new StatsDeriveResult(1000, slotToColumnStat);
+
+        //min/max not changed. select min(A) as X from T group by B. X.max is 
A.max, not A.min
         ColumnStatistic estimated = ExpressionEstimation.estimate(max, stat);
-        Assertions.assertEquals(500, estimated.minValue);
+        Assertions.assertEquals(0, estimated.minValue);
+        Assertions.assertEquals(500, estimated.maxValue);
         Assertions.assertEquals(1, estimated.ndv);
     }
 
@@ -76,8 +79,10 @@ class ExpressionEstimationTest {
         slotToColumnStat.put(a.getExprId(), builder.build());
         StatsDeriveResult stat = new StatsDeriveResult(1000, slotToColumnStat);
         Min max = new Min(a);
+        //min/max not changed. select max(A) as X from T group by B. X.min is 
A.min, not A.max
         ColumnStatistic estimated = ExpressionEstimation.estimate(max, stat);
-        Assertions.assertEquals(0, estimated.maxValue);
+        Assertions.assertEquals(0, estimated.minValue);
+        Assertions.assertEquals(1000, estimated.maxValue);
         Assertions.assertEquals(1, estimated.ndv);
     }
 
diff --git a/regression-test/suites/tpch_sf1_p1/tpch_sf1/nereids/q9.groovy 
b/regression-test/suites/tpch_sf1_p1/tpch_sf1/nereids/q9.groovy
index 89055edec8..ae46f3f443 100644
--- a/regression-test/suites/tpch_sf1_p1/tpch_sf1/nereids/q9.groovy
+++ b/regression-test/suites/tpch_sf1_p1/tpch_sf1/nereids/q9.groovy
@@ -64,7 +64,7 @@ suite("tpch_sf1_q9_nereids") {
     """
 
     qt_select """
-    select/*+SET_VAR(exec_mem_limit=17179869184, 
parallel_fragment_exec_instance_num=4, enable_vectorized_engine=true, 
batch_size=4096, disable_join_reorder=false, 
enable_cost_based_join_reorder=false, enable_projection=true, 
enable_remove_no_conjuncts_runtime_filter_policy=true, 
runtime_filter_wait_time_ms=10000) */
+    select/*+SET_VAR(exec_mem_limit=17179869184, 
parallel_fragment_exec_instance_num=4, enable_vectorized_engine=true, 
batch_size=4096, disable_join_reorder=false, 
enable_cost_based_join_reorder=false, enable_projection=true, 
enable_runtime_filter_prune=true, runtime_filter_wait_time_ms=10000) */
         nation,
         o_year,
         sum(amount) as sum_profit
diff --git a/tools/tpch-tools/queries/q9.sql b/tools/tpch-tools/queries/q9.sql
index 321266719c..9462dea0b5 100644
--- a/tools/tpch-tools/queries/q9.sql
+++ b/tools/tpch-tools/queries/q9.sql
@@ -17,7 +17,7 @@
 
 -- Modified
 
-select/*+SET_VAR(exec_mem_limit=37179869184, 
parallel_fragment_exec_instance_num=8, enable_vectorized_engine=true, 
batch_size=4096, disable_join_reorder=false, 
enable_cost_based_join_reorder=false, enable_projection=true, 
enable_remove_no_conjuncts_runtime_filter_policy=true, 
runtime_filter_wait_time_ms=10000) */
+select/*+SET_VAR(exec_mem_limit=37179869184, 
parallel_fragment_exec_instance_num=8, enable_vectorized_engine=true, 
batch_size=4096, disable_join_reorder=false, 
enable_cost_based_join_reorder=false, enable_projection=true, 
enable_runtime_filter_prune=true, runtime_filter_wait_time_ms=10000) */
     nation,
     o_year,
     sum(amount) as sum_profit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to