github-actions[bot] commented on code in PR #62737:
URL: https://github.com/apache/doris/pull/62737#discussion_r3323013304
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -323,6 +332,343 @@ private boolean isUniqueValueEqualTo(PhysicalHashJoin<?
extends Plan, ? extends
return false;
}
+ /**
+ * Generate decoupled runtime filters for the given join.
+ *
+ * For each equi-conjunct left=probe, right=build:
+ * Walk the probe subtree to find a descendant join whose build side
contains
+ * the probe expression. If found, create an RF produced by that
descendant join,
+ * pushed down into this join's build subtree to reach the target scan.
+ *
+ * Decision logic to avoid circular wait between standard and decoupled
RFs:
+ * - Stats known: if decoupled_ndv / standard_ndv < threshold → prefer
decoupled (remove standard)
+ * else → keep both, decoupled is non-blocking (wait_time=0)
+ * - Stats unknown: if probe subtree has filters but build subtree doesn't
→ prefer decoupled
+ * else → keep both, decoupled is non-blocking
+ */
+ private void generateDecoupledRuntimeFilters(
+ PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+ List<Expression> hashJoinConjuncts,
+ List<TRuntimeFilterType> legalTypes,
+ RuntimeFilterContext ctx,
+ CascadesContext context) {
+ // Only generate decoupled RFs for INNER/CROSS joins. For
SEMI/ANTI/OUTER joins,
+ // the standard RF has specialized semantics and a reverse-direction
decoupled RF
+ // may interfere or be semantically incorrect.
+ if (!join.getJoinType().isInnerJoin() &&
!join.getJoinType().isCrossJoin()) {
+ return;
+ }
+ double ndvRatioThreshold =
ctx.getSessionVariable().decoupledRfNdvRatioThreshold;
+
+ for (int i = 0; i < hashJoinConjuncts.size(); i++) {
+ EqualPredicate equalTo = JoinUtils.swapEqualToForChildrenOrder(
+ (EqualPredicate) hashJoinConjuncts.get(i),
join.left().getOutputSet());
+ Expression probeExpr = equalTo.left();
+ Expression buildExpr = equalTo.right();
+ if (buildExpr.getInputSlots().size() != 1) {
+ continue;
+ }
+ Pair<AbstractPhysicalJoin<?, ?>, Expression> result =
+ findBuilderForDecoupledRf(probeExpr, join.left());
+ if (result == null) {
+ continue;
+ }
+ AbstractPhysicalJoin<?, ?> decoupledBuilder = result.first;
+ Expression resolvedSrcExpr = result.second;
+
+ long decoupledNdv = getDecoupledBuildSideNdv(decoupledBuilder,
resolvedSrcExpr);
+ // Use strict NDV (returns -1 for unknown) for the preference
comparison,
+ // to avoid biased ratio when one side has real NDV and the other
uses rowCount fallback.
+ long strictDecoupledNdv = getStrictNdv(decoupledBuilder.right(),
resolvedSrcExpr);
+ long strictStandardNdv = getStrictNdv(join.right(),
equalTo.right());
+
+ // Prune decoupled RFs that won't be selective enough.
+ // With stats: if source NDV covers a large fraction of target
NDV, bloom filter
+ // passes most rows -> useless.
+ // Without stats: if the builder's build side has no filter
predicates,
+ // it likely outputs all distinct values -> non-selective.
+ if (strictDecoupledNdv > 0 && strictStandardNdv > 0) {
+ if ((double) strictDecoupledNdv / strictStandardNdv >=
ndvRatioThreshold) {
+ continue;
+ }
Review Comment:
This `continue` makes the documented threshold behavior unreachable for
known stats. `decoupled_rf_ndv_ratio_threshold` is documented as: ratio <
threshold => prefer decoupled, otherwise keep the standard RF and make the
decoupled RF non-blocking. But this branch skips creating the decoupled RF
whenever `strictDecoupledNdv / strictStandardNdv >= threshold`, so the later
`shouldPreferDecoupledRf()` can only see ratios below the threshold and always
returns `true` for known stats. For example, with the documented/default
behavior and a ratio of 0.8 at threshold 0.5, the PR should keep both filters
with the decoupled one non-blocking; current code creates no decoupled filter
at all. The new regression's threshold section only logs EXPLAIN output, so it
would not catch this.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -323,6 +332,343 @@ private boolean isUniqueValueEqualTo(PhysicalHashJoin<?
extends Plan, ? extends
return false;
}
+ /**
+ * Generate decoupled runtime filters for the given join.
+ *
+ * For each equi-conjunct left=probe, right=build:
+ * Walk the probe subtree to find a descendant join whose build side
contains
+ * the probe expression. If found, create an RF produced by that
descendant join,
+ * pushed down into this join's build subtree to reach the target scan.
+ *
+ * Decision logic to avoid circular wait between standard and decoupled
RFs:
+ * - Stats known: if decoupled_ndv / standard_ndv < threshold → prefer
decoupled (remove standard)
+ * else → keep both, decoupled is non-blocking (wait_time=0)
+ * - Stats unknown: if probe subtree has filters but build subtree doesn't
→ prefer decoupled
+ * else → keep both, decoupled is non-blocking
+ */
+ private void generateDecoupledRuntimeFilters(
+ PhysicalHashJoin<? extends Plan, ? extends Plan> join,
+ List<Expression> hashJoinConjuncts,
+ List<TRuntimeFilterType> legalTypes,
+ RuntimeFilterContext ctx,
+ CascadesContext context) {
+ // Only generate decoupled RFs for INNER/CROSS joins. For
SEMI/ANTI/OUTER joins,
+ // the standard RF has specialized semantics and a reverse-direction
decoupled RF
+ // may interfere or be semantically incorrect.
+ if (!join.getJoinType().isInnerJoin() &&
!join.getJoinType().isCrossJoin()) {
+ return;
+ }
+ double ndvRatioThreshold =
ctx.getSessionVariable().decoupledRfNdvRatioThreshold;
+
+ for (int i = 0; i < hashJoinConjuncts.size(); i++) {
+ EqualPredicate equalTo = JoinUtils.swapEqualToForChildrenOrder(
+ (EqualPredicate) hashJoinConjuncts.get(i),
join.left().getOutputSet());
+ Expression probeExpr = equalTo.left();
+ Expression buildExpr = equalTo.right();
+ if (buildExpr.getInputSlots().size() != 1) {
+ continue;
+ }
+ Pair<AbstractPhysicalJoin<?, ?>, Expression> result =
+ findBuilderForDecoupledRf(probeExpr, join.left());
+ if (result == null) {
+ continue;
+ }
+ AbstractPhysicalJoin<?, ?> decoupledBuilder = result.first;
+ Expression resolvedSrcExpr = result.second;
+
+ long decoupledNdv = getDecoupledBuildSideNdv(decoupledBuilder,
resolvedSrcExpr);
+ // Use strict NDV (returns -1 for unknown) for the preference
comparison,
+ // to avoid biased ratio when one side has real NDV and the other
uses rowCount fallback.
+ long strictDecoupledNdv = getStrictNdv(decoupledBuilder.right(),
resolvedSrcExpr);
+ long strictStandardNdv = getStrictNdv(join.right(),
equalTo.right());
+
+ // Prune decoupled RFs that won't be selective enough.
+ // With stats: if source NDV covers a large fraction of target
NDV, bloom filter
+ // passes most rows -> useless.
+ // Without stats: if the builder's build side has no filter
predicates,
+ // it likely outputs all distinct values -> non-selective.
+ if (strictDecoupledNdv > 0 && strictStandardNdv > 0) {
+ if ((double) strictDecoupledNdv / strictStandardNdv >=
ndvRatioThreshold) {
+ continue;
+ }
+ } else {
+ if (!hasFilterInSubtree(decoupledBuilder.right())) {
+ continue;
+ }
+ }
+
+ boolean preferDecoupled = shouldPreferDecoupledRf(
+ strictDecoupledNdv, strictStandardNdv, ndvRatioThreshold,
+ decoupledBuilder, join);
+
+ for (TRuntimeFilterType type : legalTypes) {
+ if (type == TRuntimeFilterType.BITMAP) {
+ continue;
+ }
+
+ RuntimeFilterPushDownVisitor.PushDownContext pushDownContext =
+
RuntimeFilterPushDownVisitor.PushDownContext.createPushDownContext(
+ ctx, decoupledBuilder, resolvedSrcExpr,
buildExpr, type, false,
+
context.getStatementContext().isHasUnknownColStats(),
+ decoupledNdv, -1 /*sentinel: decoupled RF*/);
+ boolean decoupledRfPushed = false;
+ if (pushDownContext.isValid()) {
+ decoupledRfPushed = join.right().accept(
+ new RuntimeFilterPushDownVisitor(),
pushDownContext);
+ }
+ if (decoupledRfPushed) {
+ if (preferDecoupled) {
+ // When preferring decoupled RF, make the standard RF
non-blocking.
+ // This preserves both filters while prioritizing the
decoupled RF
+ // (which blocks). The standard RF still applies if it
arrives in
+ // time (with wait_time=0).
+ markStandardRfAsNonBlocking(ctx, join,
equalTo.right(), type, i);
+ } else {
+ markDecoupledRfAsNonBlocking(ctx, pushDownContext);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Decide whether decoupled RF should replace the standard RF.
+ *
+ * @return true if decoupled RF is preferred (standard RF should be
removed);
+ * false means keep both but decoupled RF will be non-blocking.
+ */
+ private boolean shouldPreferDecoupledRf(
+ long decoupledNdv, long standardNdv, double ndvRatioThreshold,
+ AbstractPhysicalJoin<?, ?> decoupledBuilder,
+ PhysicalHashJoin<? extends Plan, ? extends Plan> conditionJoin) {
+ boolean statsKnown = decoupledNdv > 0 && standardNdv > 0;
+ if (statsKnown) {
+ double ratio = (double) decoupledNdv / standardNdv;
+ return ratio < ndvRatioThreshold;
+ }
+ // Unknown stats: use filter-presence heuristic.
+ // The decoupled RF's source is in the probe subtree (via
decoupledBuilder),
+ // the standard RF's source is in conditionJoin's build subtree.
+ // If probe source has filters but build source doesn't → decoupled RF
is more selective.
+ boolean probeHasFilter = hasFilterInSubtree(decoupledBuilder.right());
+ boolean buildHasFilter = hasFilterInSubtree(conditionJoin.right());
+ return probeHasFilter && !buildHasFilter;
+ }
+
+ /**
+ * Check if the given subtree contains a PhysicalFilter with visible
predicates.
+ * Walks through Project, Filter, Distribute, and Join nodes.
+ */
+ private boolean hasFilterInSubtree(Plan subtree) {
+ if (subtree instanceof PhysicalFilter) {
+ PhysicalFilter<?> filter = (PhysicalFilter<?>) subtree;
+ for (Expression expr : filter.getExpressions()) {
+ for (Slot slot : expr.getInputSlots()) {
+ if (slot instanceof SlotReference) {
+ SlotReference slotRef = (SlotReference) slot;
+ if (!slotRef.getOriginalColumn().isPresent()
+ ||
slotRef.getOriginalColumn().get().isVisible()) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ for (Plan child : subtree.children()) {
+ if (hasFilterInSubtree(child)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * After push-down, find the newly created decoupled RF and mark it as
non-blocking.
+ * The decoupled RF was just pushed down and registered in
targetExprIdToFilter.
+ * We identify it by exprOrder == -1 and the matching builderNode.
+ */
+ private void markDecoupledRfAsNonBlocking(
+ RuntimeFilterContext ctx,
+ RuntimeFilterPushDownVisitor.PushDownContext pushDownContext) {
+ for (List<RuntimeFilter> filters :
ctx.getTargetExprIdToFilter().values()) {
+ for (RuntimeFilter rf : filters) {
+ if (rf.getExprOrder() == -1
+ &&
rf.getBuilderNode().equals(pushDownContext.builderNode)
+ && rf.getSrcExpr().equals(pushDownContext.srcExpr)
+ && rf.getType().equals(pushDownContext.type)) {
+ rf.setNonBlocking(true);
+ return;
Review Comment:
When a single decoupled pushdown creates multiple RF targets, this marks
only the first matching RF as non-blocking and returns.
`RuntimeFilterPushDownVisitor` can create more than one `RuntimeFilter` for the
same `(exprOrder=-1, builderNode, srcExpr, type)` by expanding through joins or
set operations, and the non-preferred path requires all of those decoupled
filters to use `wait_time_ms=0` to avoid circular waits with the standard RF.
The remaining matching filters stay blocking, and `RuntimeFilterTranslator`
groups filters without including `nonBlocking` in `RuntimeFilterGroupKey`, so
the final legacy filter wait time can also depend on which target becomes the
group head. Please mark every matching newly-created decoupled RF (or otherwise
make nonBlocking part of grouping semantics) instead of returning after the
first one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]