This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 41ff3a086c71d15612ea966f8ba5f4ebd878ab09
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 21:09:34 2026 +0800

    [fix](runtime filter) truthful local-merge signal for filters crossing a 
local exchange
    
    A same-fragment runtime filter was applied per-instance without merging
    whenever the target scan reported is_serial_operator=false: the scan passes
    that flag as need_local_merge (scan_operator.cpp), and the producer only
    merges when a global consumer exists. The contract silently encoded
    'pooled scan => serial => local exchange between join and scan => merge';
    parallelizing the pooled scan breaks the inference and partial IN/MIN_MAX
    filters built from hash-sliced build sides get applied unmerged to
    bucket-sliced scans, dropping legitimate rows.
    
    Make the signal truthful instead of inferred:
    - TRuntimeFilterDesc.force_local_merge (new optional field): set by FE when 
a
      LocalExchangeNode actually sits between the filter builder and a
      same-fragment target (computed after FE local exchange planning by walking
      the plan from the builder to the target).
    - BE register_consumer_runtime_filter ORs the bit into need_merge, putting 
the
      consumer into the global manager so producers merge before publishing.
    
    BE-planned mode (planner off) has no FE LocalExchangeNodes, the bit stays
    false, and the old serial-flag inference continues to cover that world.
---
 be/src/runtime/runtime_state.cpp                   |  3 +-
 .../org/apache/doris/planner/RuntimeFilter.java    | 37 ++++++++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                     |  6 ++++
 3 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 9e173acab7e..54989511eb8 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -508,7 +508,8 @@ Status RuntimeState::register_consumer_runtime_filter(
         const TRuntimeFilterDesc& desc, bool need_local_merge, int node_id,
         std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
     _registered_runtime_filter_ids.insert(desc.filter_id);
-    bool need_merge = desc.has_remote_targets || need_local_merge;
+    bool need_merge = desc.has_remote_targets || need_local_merge ||
+                      (desc.__isset.force_local_merge && 
desc.force_local_merge);
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
     RETURN_IF_ERROR(mgr->register_consumer_filter(this, desc, node_id, 
consumer_filter));
     // Stamp the consumer with the current recursive CTE stage so that 
incoming publish RPCs
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 9c92f35c82f..3aff4c590de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -263,6 +263,29 @@ public final class RuntimeFilter {
         this.bitmapFilterNotIn = bitmapFilterNotIn;
     }
 
+    /**
+     * DFS from {@code node} down to {@code target} within the fragment 
(stopping at
+     * ExchangeNode boundaries). Returns null if target is not under node, 
otherwise
+     * whether the path crosses a LocalExchangeNode.
+     */
+    private static Boolean pathCrossesLocalExchange(PlanNode node, PlanNode 
target) {
+        if (node == target) {
+            return false;
+        }
+        for (PlanNode child : node.getChildren()) {
+            if (child instanceof ExchangeNode) {
+                // fragment boundary: a target behind it is a remote target, 
handled by
+                // has_remote_targets
+                continue;
+            }
+            Boolean sub = pathCrossesLocalExchange(child, target);
+            if (sub != null) {
+                return sub || child instanceof LocalExchangeNode;
+            }
+        }
+        return null;
+    }
+
     /**
      * Serializes a runtime filter to Thrift.
      */
@@ -276,11 +299,25 @@ public final class RuntimeFilter {
         tFilter.setHasRemoteTargets(hasRemoteTargets);
 
         boolean hasSerialTargets = false;
+        boolean forceLocalMerge = false;
         for (RuntimeFilterTarget target : targets) {
             tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), 
ExprToThriftVisitor.treeToThrift(target.expr));
             hasSerialTargets = hasSerialTargets
                     || target.node.isSerialOperatorOnBe(ConnectContext.get());
+            // Truthful merge signal: if a LocalExchangeNode sits between the 
builder join
+            // and a same-fragment target scan, per-instance partial filters 
are not aligned
+            // with the scan's data slice and must be merged before being 
applied. BE used to
+            // infer this from the target scan's is_serial_operator (scan 
pooled => LE
+            // in between), which silently breaks once the scan is 
parallelized; this bit is
+            // computed from the actual plan after FE local exchange planning. 
In BE-planned
+            // mode (planner off) the FE tree has no LocalExchangeNodes and 
the bit stays
+            // false — the serial-flag inference still covers that world.
+            if (!forceLocalMerge && target.isLocalTarget) {
+                Boolean crossed = pathCrossesLocalExchange(builderNode, 
target.node);
+                forceLocalMerge = crossed != null && crossed;
+            }
         }
+        tFilter.setForceLocalMerge(forceLocalMerge);
 
         boolean enableSyncFilterSize = ConnectContext.get() != null
                 && 
ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index eb4d0265178..e0a402a2728 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1596,6 +1596,12 @@ struct TRuntimeFilterDesc {
   // the listed partitions with the listed direction; absent partitions are
   // unsafe for this RF target and must not be pruned by it.
   20: optional map<Types.TPlanNodeId, list<TPartitionTargetExprMonotonicity>> 
planId_to_partition_target_monotonicity;
+
+  // True when a local exchange sits between the filter builder (join) and a 
same-fragment
+  // target scan: per-instance partial filters are then NOT aligned with the 
scan's data
+  // slice and must be merged before being applied. Computed truthfully by FE 
after local
+  // exchange planning; replaces inferring this from the target scan's 
is_serial_operator.
+  21: optional bool force_local_merge;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to