This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cf073ec8ce6 [runtimefilter](nerieds)support Non equal runtime filter 
for nested loop join #25193
cf073ec8ce6 is described below

commit cf073ec8ce6c7d5654249a8a699bc9185a822770
Author: minghong <[email protected]>
AuthorDate: Mon Oct 16 17:49:47 2023 +0800

    [runtimefilter](nerieds)support Non equal runtime filter for nested loop 
join #25193
---
 .../glue/translator/PhysicalPlanTranslator.java    |   3 +-
 .../glue/translator/RuntimeFilterTranslator.java   |   4 +-
 .../processor/post/RuntimeFilterGenerator.java     | 132 ++++++++++++++++++---
 .../trees/plans/physical/AbstractPhysicalJoin.java |  28 +++++
 .../trees/plans/physical/PhysicalHashJoin.java     |  29 -----
 .../plans/physical/PhysicalNestedLoopJoin.java     |  23 ++--
 .../trees/plans/physical/RuntimeFilter.java        |  22 +++-
 .../org/apache/doris/planner/DataStreamSink.java   |  19 +--
 .../java/org/apache/doris/planner/PlanNode.java    |  19 +--
 .../org/apache/doris/planner/RuntimeFilter.java    |  66 +++++++++--
 .../correctness_p0/test_runtime_filter.groovy      | 111 +++++++++++++++++
 11 files changed, 350 insertions(+), 106 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9361b7c63e3..5e4097bbfec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -180,6 +180,7 @@ import org.apache.doris.tablefunction.TableValuedFunctionIf;
 import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.thrift.TPushAggOp;
+import org.apache.doris.thrift.TRuntimeFilterType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -1392,7 +1393,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         .getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
                 filters.forEach(filter -> runtimeFilterTranslator
                         .createLegacyRuntimeFilter(filter, nestedLoopJoinNode, 
context));
-                if (!filters.isEmpty()) {
+                if (filters.stream().anyMatch(filter -> filter.getType() == 
TRuntimeFilterType.BITMAP)) {
                     nestedLoopJoinNode.setOutputLeftSideOnly(true);
                 }
             });
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 982c8677aa7..8ffd307e328 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -152,8 +152,8 @@ public class RuntimeFilterTranslator {
         if (!hasInvalidTarget) {
             org.apache.doris.planner.RuntimeFilter origFilter
                     = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
-                    filter.getId(), node, src, filter.getExprOrder(), 
targetExprList,
-                    targetTupleIdMapList, filter.getType(), 
context.getLimits(), filter.getBuildSideNdv());
+                    filter, node, src, targetExprList,
+                    targetTupleIdMapList, context.getLimits());
             if (node instanceof HashJoinNode) {
                 origFilter.setIsBroadcast(((HashJoinNode) 
node).getDistributionMode() == DistributionMode.BROADCAST);
             } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 2683754d77e..b99b8904e5e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -23,9 +23,14 @@ import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.stats.ExpressionEstimation;
 import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.CTEId;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
+import org.apache.doris.nereids.trees.expressions.LessThan;
+import org.apache.doris.nereids.trees.expressions.LessThanEqual;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Not;
 import org.apache.doris.nereids.trees.expressions.Slot;
@@ -53,6 +58,7 @@ import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
 import com.google.common.base.Preconditions;
@@ -113,6 +119,10 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
         join.right().accept(this, context);
         join.left().accept(this, context);
+        if 
(RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || 
join.isMarkJoin()) {
+            join.right().getOutput().forEach(slot ->
+                    
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
+        }
         collectPushDownCTEInfos(join, context);
         if (!getPushDownCTECandidates(ctx).isEmpty()) {
             pushDownRuntimeFilterIntoCTE(ctx);
@@ -142,29 +152,19 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         return producer;
     }
 
-    @Override
-    public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? 
extends Plan, ? extends Plan> join,
-            CascadesContext context) {
-        // TODO: we need to support all type join
-        join.right().accept(this, context);
-        join.left().accept(this, context);
+    private void generateBitMapRuntimeFilterForNLJ(PhysicalNestedLoopJoin<? 
extends Plan, ? extends Plan> join,
+                                                   RuntimeFilterContext ctx) {
         if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && 
join.getJoinType() != JoinType.CROSS_JOIN) {
-            return join;
+            return;
         }
-        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
         Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
-
-        if ((ctx.getSessionVariable().getRuntimeFilterType() & 
TRuntimeFilterType.BITMAP.getValue()) == 0) {
-            //only generate BITMAP filter for nested loop join
-            return join;
-        }
         List<Slot> leftSlots = join.left().getOutput();
         List<Slot> rightSlots = join.right().getOutput();
         List<Expression> bitmapRuntimeFilterConditions = 
JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots,
                 rightSlots, join.getOtherJoinConjuncts());
         if (!JoinUtils.extractExpressionForHashTable(leftSlots, rightSlots, 
join.getOtherJoinConjuncts())
                 .first.isEmpty()) {
-            return join;
+            return;
         }
         int bitmapRFCount = bitmapRuntimeFilterConditions.size();
         for (int i = 0; i < bitmapRFCount; i++) {
@@ -193,6 +193,104 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                 
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
             }
         }
+    }
+
+    /**
+     * A join B on B.x < A.x
+     * transform B.x < A.x to A.x > B.x,
+     * otherwise return null
+     */
+    private ComparisonPredicate normalizeNonEqual(AbstractPhysicalJoin<? 
extends Plan, ? extends Plan> join,
+                                                  Expression expr) {
+        if (!(expr instanceof ComparisonPredicate)) {
+            return null;
+        }
+        if (!(expr.child(0) instanceof SlotReference)) {
+            return null;
+        }
+        if (!(expr.child(1) instanceof SlotReference)) {
+            return null;
+        }
+        if (! join.left().getOutput().contains(expr.child(0))
+                || ! join.right().getOutput().contains(expr.child(1))) {
+            if (join.left().getOutput().contains(expr.child(1))
+                    && join.right().getOutput().contains(expr.child(0))) {
+                return ((ComparisonPredicate) expr).commute();
+            }
+        } else {
+            return (ComparisonPredicate) expr;
+        }
+        return null;
+    }
+
+    private TMinMaxRuntimeFilterType getMinMaxType(ComparisonPredicate 
compare) {
+        if (compare instanceof LessThan || compare instanceof LessThanEqual) {
+            return TMinMaxRuntimeFilterType.MAX;
+        }
+        if (compare instanceof GreaterThan || compare instanceof 
GreaterThanEqual) {
+            return TMinMaxRuntimeFilterType.MIN;
+        }
+        return TMinMaxRuntimeFilterType.MIN_MAX;
+    }
+
+    /**
+     * A join B on A.x < B.y
+     * min-max filter (A.x < N, N=max(B.y)) could be applied to A.x
+     */
+    private void generateMinMaxRuntimeFilter(AbstractPhysicalJoin<? extends 
Plan, ? extends Plan> join,
+                                                   RuntimeFilterContext ctx) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        int hashCondionSize = join.getHashJoinConjuncts().size();
+        for (int idx = 0; idx < join.getOtherJoinConjuncts().size(); idx++) {
+            int exprOrder = idx + hashCondionSize;
+            Expression expr = join.getOtherJoinConjuncts().get(exprOrder);
+            ComparisonPredicate compare = normalizeNonEqual(join, expr);
+            if (compare != null) {
+                Slot unwrappedSlot = checkTargetChild(compare.child(0));
+                if (unwrappedSlot == null) {
+                    continue;
+                }
+                Pair<PhysicalRelation, Slot> pair = 
aliasTransferMap.get(unwrappedSlot);
+                if (pair == null) {
+                    continue;
+                }
+                Slot olapScanSlot = pair.second;
+                PhysicalRelation scan = pair.first;
+                Preconditions.checkState(olapScanSlot != null && scan != null);
+                long buildSideNdv = getBuildSideNdv(join, compare);
+                RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                        compare.child(1), ImmutableList.of(olapScanSlot), 
ImmutableList.of(olapScanSlot),
+                        TRuntimeFilterType.MIN_MAX, exprOrder, join, true, 
buildSideNdv,
+                        getMinMaxType(compare));
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+                ctx.setTargetsOnScanNode(scan.getRelationId(), olapScanSlot);
+            }
+        }
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? 
extends Plan, ? extends Plan> join,
+            CascadesContext context) {
+        // TODO: we need to support all type join
+        join.right().accept(this, context);
+        join.left().accept(this, context);
+
+        if 
(RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || 
join.isMarkJoin()) {
+            join.right().getOutput().forEach(slot ->
+                    
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot));
+            return join;
+        }
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+
+        if ((ctx.getSessionVariable().getRuntimeFilterType() & 
TRuntimeFilterType.BITMAP.getValue()) != 0) {
+            generateBitMapRuntimeFilterForNLJ(join, ctx);
+        }
+
+        if ((ctx.getSessionVariable().getRuntimeFilterType() & 
TRuntimeFilterType.MIN_MAX.getValue()) != 0) {
+            generateMinMaxRuntimeFilter(join, ctx);
+        }
+
         return join;
     }
 
@@ -233,14 +331,16 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         return relation;
     }
 
-    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, EqualTo equalTo) {
+    // runtime filter build side ndv
+    private long getBuildSideNdv(AbstractPhysicalJoin<? extends Plan, ? 
extends Plan> join,
+                                 ComparisonPredicate compare) {
         AbstractPlan right = (AbstractPlan) join.right();
         //make ut test friendly
         if (right.getStats() == null) {
             return -1L;
         }
         ExpressionEstimation estimator = new ExpressionEstimation();
-        ColumnStatistic buildColStats = equalTo.right().accept(estimator, 
right.getStats());
+        ColumnStatistic buildColStats = compare.right().accept(estimator, 
right.getStats());
         return buildColStats.isUnKnown ? -1 : Math.max(1, (long) 
buildColStats.ndv);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
index eb9ed7cfc97..ad7e8ba8cc8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java
@@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.algebra.Join;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.statistics.Statistics;
 
 import com.google.common.collect.ImmutableList;
@@ -41,6 +42,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Abstract class for all physical join node.
@@ -214,4 +216,30 @@ public abstract class AbstractPhysicalJoin<
                         ? ImmutableList.of(markJoinSlotReference.get()) : 
ImmutableList.of())
                 .build();
     }
+
+    @Override
+    public String toString() {
+        List<Object> args = Lists.newArrayList("type", joinType,
+                "hashCondition", hashJoinConjuncts,
+                "otherCondition", otherJoinConjuncts,
+                "stats", statistics);
+        if (markJoinSlotReference.isPresent()) {
+            args.add("isMarkJoin");
+            args.add("true");
+        }
+        if (markJoinSlotReference.isPresent()) {
+            args.add("MarkJoinSlotReference");
+            args.add(markJoinSlotReference.get());
+        }
+        if (hint != JoinHint.NONE) {
+            args.add("hint");
+            args.add(hint);
+        }
+        if (!runtimeFilters.isEmpty()) {
+            args.add("runtimeFilters");
+            args.add(runtimeFilters.stream().map(rf -> rf.toString() + " 
").collect(Collectors.toList()));
+        }
+        return Utils.toSqlString(this.getClass().getName() + "[" + id.asInt() 
+ "]" + getGroupIdWithPrefix(),
+                args.toArray());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 6d9583504ea..b60afd67308 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -31,14 +31,12 @@ import 
org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.JoinHint;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.MutableState;
-import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.Statistics;
@@ -140,33 +138,6 @@ public class PhysicalHashJoin<
         return visitor.visitPhysicalHashJoin(this, context);
     }
 
-    @Override
-    public String toString() {
-        List<Object> args = Lists.newArrayList("type", joinType,
-                "hashJoinCondition", hashJoinConjuncts,
-                "otherJoinCondition", otherJoinConjuncts,
-                "stats", statistics,
-                "fr", getMutableState(AbstractPlan.FRAGMENT_ID));
-        if (markJoinSlotReference.isPresent()) {
-            args.add("isMarkJoin");
-            args.add("true");
-        }
-        if (markJoinSlotReference.isPresent()) {
-            args.add("MarkJoinSlotReference");
-            args.add(markJoinSlotReference.get());
-        }
-        if (hint != JoinHint.NONE) {
-            args.add("hint");
-            args.add(hint);
-        }
-        if (!runtimeFilters.isEmpty()) {
-            args.add("runtimeFilters");
-            args.add(runtimeFilters.stream().map(rf -> rf.toString() + " 
").collect(Collectors.toList()));
-        }
-        return Utils.toSqlString("PhysicalHashJoin[" + id.asInt() + "]" + 
getGroupIdWithPrefix(),
-                args.toArray());
-    }
-
     @Override
     public PhysicalHashJoin<Plan, Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 2);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
index d6b08d02c54..d2dca0d2e73 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java
@@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.MutableState;
-import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.statistics.Statistics;
 
 import com.google.common.base.Preconditions;
@@ -113,17 +112,17 @@ public class PhysicalNestedLoopJoin<
         return visitor.visitPhysicalNestedLoopJoin(this, context);
     }
 
-    @Override
-    public String toString() {
-        // TODO: Maybe we could pull up this to the abstract class in the 
future.
-        return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + "]" 
+ getGroupIdWithPrefix(),
-                "type", joinType,
-                "otherJoinCondition", otherJoinConjuncts,
-                "isMarkJoin", markJoinSlotReference.isPresent(),
-                "markJoinSlotReference", markJoinSlotReference.isPresent() ? 
markJoinSlotReference.get() : "empty",
-                "stats", statistics
-        );
-    }
+    // @Override
+    // public String toString() {
+    //     // TODO: Maybe we could pull up this to the abstract class in the 
future.
+    //     return Utils.toSqlString("PhysicalNestedLoopJoin[" + id.asInt() + 
"]" + getGroupIdWithPrefix(),
+    //             "type", joinType,
+    //             "otherJoinCondition", otherJoinConjuncts,
+    //             "isMarkJoin", markJoinSlotReference.isPresent(),
+    //             "markJoinSlotReference", markJoinSlotReference.isPresent() 
? markJoinSlotReference.get() : "empty",
+    //             "stats", statistics
+    //     );
+    // }
 
     @Override
     public PhysicalNestedLoopJoin<Plan, Plan> withChildren(List<Plan> 
children) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index 74a100dc237..f213a8996ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.physical;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
 import com.google.common.collect.ImmutableList;
@@ -45,21 +46,31 @@ public class RuntimeFilter {
     private final boolean bitmapFilterNotIn;
 
     private final long buildSideNdv;
+    // use for min-max filter only. specify if the min or max side is valid
+    private final TMinMaxRuntimeFilterType tMinMaxType;
 
     /**
      * constructor
      */
     public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> 
targets, TRuntimeFilterType type,
             int exprOrder, AbstractPhysicalJoin builderNode, long 
buildSideNdv) {
-        this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder, 
builderNode, false, buildSideNdv);
+        this(id, src, targets, ImmutableList.copyOf(targets), type, exprOrder,
+                builderNode, false, buildSideNdv, 
TMinMaxRuntimeFilterType.MIN_MAX);
+    }
+
+    public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> 
targets, List<Expression> targetExpressions,
+                         TRuntimeFilterType type, int exprOrder, 
AbstractPhysicalJoin builderNode,
+                         boolean bitmapFilterNotIn, long buildSideNdv) {
+        this(id, src, targets, targetExpressions, type, exprOrder,
+                builderNode, bitmapFilterNotIn, buildSideNdv, 
TMinMaxRuntimeFilterType.MIN_MAX);
     }
 
     /**
      * constructor
      */
     public RuntimeFilter(RuntimeFilterId id, Expression src, List<Slot> 
targets, List<Expression> targetExpressions,
-            TRuntimeFilterType type, int exprOrder, AbstractPhysicalJoin 
builderNode, boolean bitmapFilterNotIn,
-            long buildSideNdv) {
+                         TRuntimeFilterType type, int exprOrder, 
AbstractPhysicalJoin builderNode,
+                         boolean bitmapFilterNotIn, long buildSideNdv, 
TMinMaxRuntimeFilterType tMinMaxType) {
         this.id = id;
         this.srcSlot = src;
         this.targetSlots = Lists.newArrayList(targets);
@@ -69,9 +80,14 @@ public class RuntimeFilter {
         this.builderNode = builderNode;
         this.bitmapFilterNotIn = bitmapFilterNotIn;
         this.buildSideNdv = buildSideNdv <= 0 ? -1L : buildSideNdv;
+        this.tMinMaxType = tMinMaxType;
         builderNode.addRuntimeFilter(this);
     }
 
+    public TMinMaxRuntimeFilterType gettMinMaxType() {
+        return tMinMaxType;
+    }
+
     public Expression getSrcExpr() {
         return srcSlot;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index 8f5ce78304b..4d4a2c641a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -150,24 +150,7 @@ public class DataStreamSink extends DataSink {
         }
         List<String> filtersStr = new ArrayList<>();
         for (RuntimeFilter filter : runtimeFilters) {
-            StringBuilder filterStr = new StringBuilder();
-            filterStr.append(filter.getFilterId());
-            if (!isBrief) {
-                filterStr.append("[");
-                filterStr.append(filter.getType().toString().toLowerCase());
-                filterStr.append("]");
-                if (isBuildNode) {
-                    filterStr.append(" <- ");
-                    filterStr.append(filter.getSrcExpr().toSql());
-                    
filterStr.append("(").append(filter.getEstimateNdv()).append("/")
-                            
.append(filter.getExpectFilterSizeBytes()).append("/")
-                            .append(filter.getFilterSizeBytes()).append(")");
-                } else {
-                    filterStr.append(" -> ");
-                    
filterStr.append(filter.getTargetExpr(getExchNodeId()).toSql());
-                }
-            }
-            filtersStr.add(filterStr.toString());
+            filtersStr.add(filter.getExplainString(isBuildNode, isBrief, 
getExchNodeId()));
         }
         return Joiner.on(", ").join(filtersStr) + "\n";
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 40cfe876b89..754b9fcfcb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1116,24 +1116,7 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> implements PlanStats {
         }
         List<String> filtersStr = new ArrayList<>();
         for (RuntimeFilter filter : runtimeFilters) {
-            StringBuilder filterStr = new StringBuilder();
-            filterStr.append(filter.getFilterId());
-            if (!isBrief) {
-                filterStr.append("[");
-                filterStr.append(filter.getType().toString().toLowerCase());
-                filterStr.append("]");
-                if (isBuildNode) {
-                    filterStr.append(" <- ");
-                    filterStr.append(filter.getSrcExpr().toSql());
-                    
filterStr.append("(").append(filter.getEstimateNdv()).append("/")
-                            
.append(filter.getExpectFilterSizeBytes()).append("/")
-                            .append(filter.getFilterSizeBytes()).append(")");
-                } else {
-                    filterStr.append(" -> ");
-                    filterStr.append(filter.getTargetExpr(getId()).toSql());
-                }
-            }
-            filtersStr.add(filterStr.toString());
+            filtersStr.add(filter.getExplainString(isBuildNode, isBrief, 
getId()));
         }
         return Joiner.on(", ").join(filtersStr) + "\n";
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index c6781e7ed4c..d21f390c04d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.IdGenerator;
 import org.apache.doris.common.Pair;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TMinMaxRuntimeFilterType;
 import org.apache.doris.thrift.TRuntimeFilterDesc;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
@@ -109,6 +110,8 @@ public final class RuntimeFilter {
 
     private boolean useRemoteRfOpt = true;
 
+    private TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType;
+
     /**
      * Internal representation of a runtime filter target.
      */
@@ -142,8 +145,10 @@ public final class RuntimeFilter {
     }
 
     private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, 
Expr srcExpr, int exprOrder,
-            List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> 
targetSlots, TRuntimeFilterType type,
-            RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long 
buildSizeNdv) {
+                          List<Expr> origTargetExprs, List<Map<TupleId, 
List<SlotId>>> targetSlots,
+                          TRuntimeFilterType type,
+                          RuntimeFilterGenerator.FilterSizeLimits 
filterSizeLimits, long buildSizeNdv,
+                          TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType) {
         this.id = filterId;
         this.builderNode = filterSrcNode;
         this.srcExpr = srcExpr;
@@ -152,16 +157,27 @@ public final class RuntimeFilter {
         this.targetSlotsByTid = ImmutableList.copyOf(targetSlots);
         this.runtimeFilterType = type;
         this.ndvEstimate = buildSizeNdv;
+        this.tMinMaxRuntimeFilterType = tMinMaxRuntimeFilterType;
         computeNdvEstimate();
         calculateFilterSize(filterSizeLimits);
     }
 
+    private RuntimeFilter(RuntimeFilterId filterId, PlanNode filterSrcNode, 
Expr srcExpr, int exprOrder,
+            List<Expr> origTargetExprs, List<Map<TupleId, List<SlotId>>> 
targetSlots, TRuntimeFilterType type,
+            RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, long 
buildSizeNdv) {
+        this(filterId, filterSrcNode, srcExpr, exprOrder, origTargetExprs,
+                targetSlots, type, filterSizeLimits, buildSizeNdv, 
TMinMaxRuntimeFilterType.MIN_MAX);
+    }
+
     // only for nereids planner
-    public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, 
JoinNodeBase node, Expr srcExpr,
-            int exprOrder, List<Expr> origTargetExprs, List<Map<TupleId, 
List<SlotId>>> targetSlots,
-            TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits 
filterSizeLimits, long buildSizeNdv) {
-        return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExprs,
-                targetSlots, type, filterSizeLimits, buildSizeNdv);
+    public static RuntimeFilter fromNereidsRuntimeFilter(
+            org.apache.doris.nereids.trees.plans.physical.RuntimeFilter 
nereidsFilter,
+            JoinNodeBase node, Expr srcExpr, List<Expr> origTargetExprs,
+            List<Map<TupleId, List<SlotId>>> targetSlots,
+            RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
+        return new RuntimeFilter(nereidsFilter.getId(), node, srcExpr, 
nereidsFilter.getExprOrder(), origTargetExprs,
+                targetSlots, nereidsFilter.getType(), filterSizeLimits, 
nereidsFilter.getBuildSideNdv(),
+                nereidsFilter.gettMinMaxType());
     }
 
     @Override
@@ -224,6 +240,9 @@ public final class RuntimeFilter {
             tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift());
             tFilter.setBitmapFilterNotIn(bitmapFilterNotIn);
         }
+        if (runtimeFilterType.equals(TRuntimeFilterType.MIN_MAX)) {
+            tFilter.setMinMaxType(tMinMaxRuntimeFilterType);
+        }
         tFilter.setOptRemoteRf(optRemoteRf);
         return tFilter;
     }
@@ -256,6 +275,18 @@ public final class RuntimeFilter {
         return runtimeFilterType;
     }
 
+    public String getTypeDesc() {
+        String desc = runtimeFilterType.toString().toLowerCase();
+        if (runtimeFilterType == TRuntimeFilterType.MIN_MAX) {
+            if (tMinMaxRuntimeFilterType == TMinMaxRuntimeFilterType.MIN) {
+                desc = "min";
+            } else if (tMinMaxRuntimeFilterType == 
TMinMaxRuntimeFilterType.MAX) {
+                desc = "max";
+            }
+        }
+        return desc;
+    }
+
     public void setType(TRuntimeFilterType type) {
         runtimeFilterType = type;
     }
@@ -685,4 +716,25 @@ public final class RuntimeFilter {
     public long getExpectFilterSizeBytes() {
         return expectFilterSizeBytes;
     }
+
+    public String getExplainString(boolean isBuildNode, boolean isBrief, 
PlanNodeId targetNodeId) {
+        StringBuilder filterStr = new StringBuilder();
+        filterStr.append(getFilterId());
+        if (!isBrief) {
+            filterStr.append("[");
+            filterStr.append(getTypeDesc());
+            filterStr.append("]");
+            if (isBuildNode) {
+                filterStr.append(" <- ");
+                filterStr.append(getSrcExpr().toSql());
+                filterStr.append("(").append(getEstimateNdv()).append("/")
+                        .append(getExpectFilterSizeBytes()).append("/")
+                        .append(getFilterSizeBytes()).append(")");
+            } else {
+                filterStr.append(" -> ");
+                filterStr.append(getTargetExpr(targetNodeId).toSql());
+            }
+        }
+        return filterStr.toString();
+    }
 }
diff --git a/regression-test/suites/correctness_p0/test_runtime_filter.groovy 
b/regression-test/suites/correctness_p0/test_runtime_filter.groovy
new file mode 100644
index 00000000000..f691bd16060
--- /dev/null
+++ b/regression-test/suites/correctness_p0/test_runtime_filter.groovy
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// 
/testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
+// and modified by Doris.
+
+suite("test_runtime_filter") {
+
+    sql """ DROP TABLE IF EXISTS rf_tblA """
+    sql """
+            CREATE TABLE IF NOT EXISTS rf_tblA (
+                a int
+            )
+            DUPLICATE KEY(a)
+            DISTRIBUTED BY HASH(a) BUCKETS 1
+            PROPERTIES (
+              "replication_num" = "1"
+            )
+        """
+    
+    sql """ DROP TABLE IF EXISTS rf_tblB """
+    sql """
+            CREATE TABLE IF NOT EXISTS rf_tblB (
+                b int
+            )
+            DUPLICATE KEY(b)
+            DISTRIBUTED BY HASH(b) BUCKETS 1
+            PROPERTIES (
+              "replication_num" = "1"
+            )
+        """
+    sql """
+        CREATE TABLE IF NOT EXISTS rf_tblC (
+                c int
+            )
+            DUPLICATE KEY(c)
+            DISTRIBUTED BY HASH(c) BUCKETS 1
+            PROPERTIES (
+              "replication_num" = "1"
+            )
+        """
+
+    sql "set enable_pipeline_engine=true;"
+    sql "set runtime_filter_type=4"
+    sql "set enable_nereids_planner=true"
+    sql "set enable_fallback_to_original_planner=false"
+    sql "set disable_join_reorder=true"
+
+    explain{
+        sql ("""select * from rf_tblA join rf_tblB on a < b""")
+        contains "runtime filters: RF000[max] -> a"
+        contains "runtime filters: RF000[max] <- b"
+    } 
+
+    explain{
+        sql ("""select * from rf_tblA join rf_tblB on a > b""")
+        contains "runtime filters: RF000[min] -> a"
+        contains "runtime filters: RF000[min] <- b"
+    } 
+
+    explain{
+        sql ("""select * from rf_tblA join rf_tblB on b < a""")
+        contains "runtime filters: RF000[min] -> a"
+        contains "runtime filters: RF000[min] <- b"
+    } 
+
+    explain{
+        sql ("""select * from rf_tblA right outer join rf_tblB on a < b""")
+        contains "runtime filters: RF000[max] <- b"
+        contains "runtime filters: RF000[max] -> a"
+    }
+
+    explain{
+        sql ("""select * from rf_tblA left join rf_tblB on a < b; """)
+        notContains "runtime filters"
+    }
+
+    explain{
+        sql ("""select * from rf_tblA full outer join rf_tblB on a = b; """)
+        notContains "runtime filters"
+    }
+
+    explain{
+        sql ("""
+            with x as (select * from rf_tblA join rf_tblB on a=b)
+            select * from x join rf_tblC on x.b <= rf_tblC.c
+            union 
+            select * from x join rf_tblC on x.b <= rf_tblC.c
+            """)
+        contains "runtime filters: RF001[max] -> b"
+        contains "runtime filters: RF002[max] -> b"
+        contains "runtime filters: RF001[max] <- c"
+        contains "runtime filters: RF002[max] <- c"
+
+    }   
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to