github-actions[bot] commented on code in PR #62737:
URL: https://github.com/apache/doris/pull/62737#discussion_r3325720221


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -323,6 +332,337 @@ private boolean isUniqueValueEqualTo(PhysicalHashJoin<? 
extends Plan, ? extends
         return false;
     }
 
+    /**
+     * Generate decoupled runtime filters for the given join.
+     *
+     * For each equi-conjunct left=probe, right=build:
+     *   Walk the probe subtree to find a descendant join whose build side 
contains
+     *   the probe expression. If found, create an RF produced by that 
descendant join,
+     *   pushed down into this join's build subtree to reach the target scan.
+     *
+     * Decision logic to avoid circular wait between standard and decoupled 
RFs:
+     * - Stats known: if decoupled_ndv / standard_ndv < threshold → prefer 
decoupled (remove standard)
+     *                else → keep both, decoupled is non-blocking (wait_time=0)
+     * - Stats unknown: if probe subtree has filters but build subtree doesn't 
→ prefer decoupled
+     *                  else → keep both, decoupled is non-blocking
+     */
+    private void generateDecoupledRuntimeFilters(
+            PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            List<Expression> hashJoinConjuncts,
+            List<TRuntimeFilterType> legalTypes,
+            RuntimeFilterContext ctx,
+            CascadesContext context) {
+        // Only generate decoupled RFs for INNER/CROSS joins. For 
SEMI/ANTI/OUTER joins,
+        // the standard RF has specialized semantics and a reverse-direction 
decoupled RF
+        // may interfere or be semantically incorrect.
+        if (!join.getJoinType().isInnerJoin() && 
!join.getJoinType().isCrossJoin()) {
+            return;
+        }
+        double ndvRatioThreshold = 
ctx.getSessionVariable().decoupledRfNdvRatioThreshold;
+
+        for (int i = 0; i < hashJoinConjuncts.size(); i++) {
+            EqualPredicate equalTo = JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualPredicate) hashJoinConjuncts.get(i), 
join.left().getOutputSet());
+            Expression probeExpr = equalTo.left();
+            Expression buildExpr = equalTo.right();
+            if (buildExpr.getInputSlots().size() != 1) {
+                continue;
+            }
+            Pair<AbstractPhysicalJoin<?, ?>, Expression> result =
+                    findBuilderForDecoupledRf(probeExpr, join.left());
+            if (result == null) {
+                continue;
+            }
+            AbstractPhysicalJoin<?, ?> decoupledBuilder = result.first;
+            Expression resolvedSrcExpr = result.second;
+
+            long decoupledNdv = getDecoupledBuildSideNdv(decoupledBuilder, 
resolvedSrcExpr);
+            // Use strict NDV (returns -1 for unknown) for the preference 
comparison,
+            // to avoid biased ratio when one side has real NDV and the other 
uses rowCount fallback.
+            long strictDecoupledNdv = getStrictNdv(decoupledBuilder.right(), 
resolvedSrcExpr);
+            long strictStandardNdv = getStrictNdv(join.right(), 
equalTo.right());
+
+            // Prune decoupled RFs that won't be selective enough when stats 
are unknown.
+            // With known stats: always create the decoupled RF — 
shouldPreferDecoupledRf()
+            // decides below whether to prefer it or keep both with decoupled 
non-blocking.
+            // Without stats: if the builder's build side has no filter 
predicates,
+            // it likely outputs all distinct values -> non-selective.
+            if (!(strictDecoupledNdv > 0 && strictStandardNdv > 0)
+                    && !hasFilterInSubtree(decoupledBuilder.right())) {
+                continue;
+            }
+
+            boolean preferDecoupled = shouldPreferDecoupledRf(
+                    strictDecoupledNdv, strictStandardNdv, ndvRatioThreshold,
+                    decoupledBuilder, join);
+
+            for (TRuntimeFilterType type : legalTypes) {
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+
+                RuntimeFilterPushDownVisitor.PushDownContext pushDownContext =
+                        
RuntimeFilterPushDownVisitor.PushDownContext.createPushDownContext(
+                                ctx, decoupledBuilder, resolvedSrcExpr, 
buildExpr, type, false,
+                                
context.getStatementContext().isHasUnknownColStats(),
+                                decoupledNdv, -1 /*sentinel: decoupled RF*/);
+                boolean decoupledRfPushed = false;
+                if (pushDownContext.isValid()) {
+                    decoupledRfPushed = join.right().accept(
+                            new RuntimeFilterPushDownVisitor(), 
pushDownContext);
+                }
+                if (decoupledRfPushed) {
+                    if (preferDecoupled) {
+                        // When preferring decoupled RF, make the standard RF 
non-blocking.
+                        // This preserves both filters while prioritizing the 
decoupled RF
+                        // (which blocks). The standard RF still applies if it 
arrives in
+                        // time (with wait_time=0).
+                        markStandardRfAsNonBlocking(ctx, join, 
equalTo.right(), type, i);
+                    } else {
+                        markDecoupledRfAsNonBlocking(ctx, pushDownContext);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Decide whether decoupled RF should replace the standard RF.
+     *
+     * @return true if decoupled RF is preferred (standard RF should be 
removed);
+     *         false means keep both but decoupled RF will be non-blocking.
+     */
+    private boolean shouldPreferDecoupledRf(
+            long decoupledNdv, long standardNdv, double ndvRatioThreshold,
+            AbstractPhysicalJoin<?, ?> decoupledBuilder,
+            PhysicalHashJoin<? extends Plan, ? extends Plan> conditionJoin) {
+        boolean statsKnown = decoupledNdv > 0 && standardNdv > 0;
+        if (statsKnown) {
+            double ratio = (double) decoupledNdv / standardNdv;
+            return ratio < ndvRatioThreshold;
+        }
+        // Unknown stats: use filter-presence heuristic.
+        // The decoupled RF's source is in the probe subtree (via 
decoupledBuilder),
+        // the standard RF's source is in conditionJoin's build subtree.
+        // If probe source has filters but build source doesn't → decoupled RF 
is more selective.
+        boolean probeHasFilter = hasFilterInSubtree(decoupledBuilder.right());
+        boolean buildHasFilter = hasFilterInSubtree(conditionJoin.right());
+        return probeHasFilter && !buildHasFilter;
+    }
+
+    /**
+     * Check if the given subtree contains a PhysicalFilter with visible 
predicates.
+     * Walks through Project, Filter, Distribute, and Join nodes.
+     */
+    private boolean hasFilterInSubtree(Plan subtree) {
+        if (subtree instanceof PhysicalFilter) {
+            PhysicalFilter<?> filter = (PhysicalFilter<?>) subtree;
+            for (Expression expr : filter.getExpressions()) {
+                for (Slot slot : expr.getInputSlots()) {
+                    if (slot instanceof SlotReference) {
+                        SlotReference slotRef = (SlotReference) slot;
+                        if (!slotRef.getOriginalColumn().isPresent()
+                                || 
slotRef.getOriginalColumn().get().isVisible()) {
+                            return true;
+                        }
+                    }
+                }
+            }
+        }
+        for (Plan child : subtree.children()) {
+            if (hasFilterInSubtree(child)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * After push-down, find the newly created decoupled RF and mark it as 
non-blocking.
+     * The decoupled RF was just pushed down and registered in 
targetExprIdToFilter.
+     * We identify it by exprOrder == -1 and the matching builderNode.
+     */
+    private void markDecoupledRfAsNonBlocking(
+            RuntimeFilterContext ctx,
+            RuntimeFilterPushDownVisitor.PushDownContext pushDownContext) {
+        for (List<RuntimeFilter> filters : 
ctx.getTargetExprIdToFilter().values()) {
+            for (RuntimeFilter rf : filters) {
+                if (rf.getExprOrder() == -1
+                        && 
rf.getBuilderNode().equals(pushDownContext.builderNode)
+                        && rf.getSrcExpr().equals(pushDownContext.srcExpr)
+                        && rf.getType().equals(pushDownContext.type)) {
+                    rf.setNonBlocking(true);
+                }
+            }
+        }
+    }
+
+    private void markStandardRfAsNonBlocking(
+            RuntimeFilterContext ctx,
+            PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+            Expression srcExpr,
+            TRuntimeFilterType type,
+            int exprOrder) {
+        for (RuntimeFilter rf : ctx.getNereidsRuntimeFilter()) {
+            if (rf.getExprOrder() == exprOrder
+                    && rf.getBuilderNode().equals(join)
+                    && rf.getSrcExpr().equals(srcExpr)
+                    && rf.getType().equals(type)) {
+                rf.setNonBlocking(true);
+            }
+        }
+    }
+
+    /**
+     * Walk the probe subtree to find a descendant join whose build side 
(right child)
+     * contains all input slots of the given expression.
+     *
+     * @return Pair of (builderNode, resolvedSrcExpr) where resolvedSrcExpr is 
the
+     *         expression rewritten through any intermediate Projects, or null 
if not found.
+     */
+    private Pair<AbstractPhysicalJoin<?, ?>, Expression> 
findBuilderForDecoupledRf(
+            Expression expr, Plan subtree) {
+        //Constraints:
+        //   - Only INNER/CROSS joins on the path (OUTER/ANTI/SEMI block 
traversal)
+        //   - Expr must come from a join's build side (not a leaf scan 
directly)
+        if (subtree instanceof AbstractPhysicalJoin) {
+            AbstractPhysicalJoin<?, ?> join = (AbstractPhysicalJoin<?, ?>) 
subtree;
+            if (join.getJoinType() != JoinType.INNER_JOIN
+                    && join.getJoinType() != JoinType.CROSS_JOIN) {
+                return null;
+            }
+            if (join.isMarkJoin()) {
+                return null;
+            }
+            if (join.right().getOutputSet().containsAll(expr.getInputSlots())) 
{
+                // Try to find a deeper builder within the build subtree.
+                // A deeper builder produces the bloom filter earlier because 
it's built
+                // from a smaller, more selective dataset (e.g., a filtered 
dimension table).
+                Pair<AbstractPhysicalJoin<?, ?>, Expression> deeper =
+                        tryFindDeeperBuilder(expr, join.right());
+                return deeper != null ? deeper : Pair.of(join, expr);
+            }
+            if (join.left().getOutputSet().containsAll(expr.getInputSlots())) {
+                return findBuilderForDecoupledRf(expr, join.left());
+            }
+        } else if (subtree instanceof PhysicalProject) {
+            PhysicalProject<?> project = (PhysicalProject<?>) subtree;
+            Map<Slot, Expression> replaceMap = 
ExpressionUtils.generateReplaceMap(project.getProjects());
+            Expression rewritten = expr.rewriteDownShortCircuit(e -> 
replaceMap.getOrDefault(e, e));
+            if (rewritten.getInputSlots().size() == 1) {
+                return findBuilderForDecoupledRf(rewritten, project.child());
+            }
+        } else if (subtree instanceof PhysicalDistribute || subtree instanceof 
PhysicalFilter) {
+            // Transparent operators: pass through without expression rewriting
+            return findBuilderForDecoupledRf(expr, subtree.child(0));
+        }
+        return null;
+    }
+
+    /**
+     * Dive into the build subtree to find a deeper join that can serve as the 
decoupled RF
+     * builder. When expr is on the probe side of an inner join and an 
equi-condition maps it
+     * to the build side, the inner join is a better builder — its bloom 
filter is produced
+     * earlier (when the smaller build-side hash table is ready) rather than 
waiting for the
+     * entire join result.
+     *
+     * Example: plan is orders ⋈ (lineitem ⋈ part[filter]) with expr=l_partkey.
+     *   l_partkey is on the probe side of lineitem ⋈ part, equi-cond 
l_partkey = p_partkey
+     *   maps to p_partkey on part (build side). Returning (lineitem ⋈ part, 
p_partkey) means
+     *   the bloom filter is built from part's hash table, ready immediately 
after part scan.
+     */
+    private Pair<AbstractPhysicalJoin<?, ?>, Expression> tryFindDeeperBuilder(
+            Expression expr, Plan subtree) {
+        if (subtree instanceof AbstractPhysicalJoin) {
+            AbstractPhysicalJoin<?, ?> join = (AbstractPhysicalJoin<?, ?>) 
subtree;
+            if (join.getJoinType() != JoinType.INNER_JOIN
+                    && join.getJoinType() != JoinType.CROSS_JOIN) {
+                return null;
+            }
+            if (join.isMarkJoin()) {
+                return null;
+            }
+            if (join.right().getOutputSet().containsAll(expr.getInputSlots())) 
{
+                // expr is already on the build side; try to go even deeper
+                Pair<AbstractPhysicalJoin<?, ?>, Expression> deeper =
+                        tryFindDeeperBuilder(expr, join.right());
+                return deeper != null ? deeper : Pair.of(join, expr);
+            }
+            if (join.left().getOutputSet().containsAll(expr.getInputSlots())) {
+                // expr is on the probe side; check equi-conditions for 
equivalent build expr
+                Expression buildEquiv = findEquivalentBuildExpr(expr, join);
+                if (buildEquiv != null) {
+                    Pair<AbstractPhysicalJoin<?, ?>, Expression> deeper =
+                            tryFindDeeperBuilder(buildEquiv, join.right());
+                    return deeper != null ? deeper : Pair.of(join, buildEquiv);
+                }
+                // No equi-condition maps expr to build side; try through 
probe side
+                return tryFindDeeperBuilder(expr, join.left());
+            }
+        } else if (subtree instanceof PhysicalProject) {
+            PhysicalProject<?> project = (PhysicalProject<?>) subtree;
+            Map<Slot, Expression> replaceMap = 
ExpressionUtils.generateReplaceMap(project.getProjects());
+            Expression rewritten = expr.rewriteDownShortCircuit(e -> 
replaceMap.getOrDefault(e, e));
+            if (rewritten.getInputSlots().size() == 1) {
+                return tryFindDeeperBuilder(rewritten, project.child());
+            }
+        } else if (subtree instanceof PhysicalDistribute || subtree instanceof 
PhysicalFilter) {
+            return tryFindDeeperBuilder(expr, subtree.child(0));
+        }
+        return null;
+    }
+
+    /**
+     * Find an equivalent build-side expression through a join's 
equi-conditions.
+     * For example, if expr is l_partkey (probe) and join has l_partkey = 
p_partkey,
+     * returns p_partkey (build side).
+     */
+    private Expression findEquivalentBuildExpr(Expression expr, 
AbstractPhysicalJoin<?, ?> join) {
+        Set<Slot> exprSlots = expr.getInputSlots();
+        Set<Slot> leftOutputSet = join.left().getOutputSet();
+        for (Expression condition : join.getHashJoinConjuncts()) {
+            if (condition instanceof EqualPredicate) {
+                EqualPredicate eq = JoinUtils.swapEqualToForChildrenOrder(
+                        (EqualPredicate) condition, leftOutputSet);
+                if (eq.left().getInputSlots().equals(exprSlots)

Review Comment:
   This matches only the input slot set and drops the actual expression shape 
when mapping through a descendant join. For example, in `(t1 join t2 on t1.k = 
t2.k) join t3 on t1.k + 1 = t3.k`, `probeExpr` is `t1.k + 1`, but `exprSlots` 
is just `{t1.k}`, so this returns `t2.k` and builds a decoupled RF `t2.k -> 
t3.k`. That filter can reject valid rows where `t3.k = t1.k + 1`. Please either 
require the equi-conjunct side to equal `expr` exactly, or rewrite the full 
expression by replacing equivalent slots so the source becomes `t2.k + 1` 
rather than `t2.k`.



##########
be/src/exec/runtime_filter/runtime_filter_producer_helper.cpp:
##########
@@ -103,6 +143,12 @@ Status RuntimeFilterProducerHelper::build(
         uint64_t hash_table_size = block ? block->rows() : 0;
         RETURN_IF_ERROR(_init_filters(state, hash_table_size));
         if (hash_table_size > 1) {
+            // Evaluate decoupled RF expressions on the block before insert.
+            // Standard RF exprs are already evaluated during hash table 
building.
+            for (size_t idx : _decoupled_filter_indices) {
+                int result_column_id = -1;
+                RETURN_IF_ERROR(_filter_expr_contexts[idx]->execute(block, 
&result_column_id));
+            }

Review Comment:
   After executing a decoupled RF source expression, the result column should 
be materialized before `_insert()`. Standard hash-join build expressions do 
this in `HashJoinBuildSinkLocalState::_do_evaluate()`, and the cross-join 
helper also calls `convert_to_full_column_if_const()`. Without the same 
conversion here, a valid one-slot decoupled source expression that evaluates to 
`ColumnConst` (for example `k * 0`) is inserted starting at offset 1 as if it 
were a full-length column, which can assert/read past the const column or build 
an invalid runtime filter. Convert 
`block->get_by_position(result_column_id).column` to a full column before 
calling `_insert()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to