This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 41ff3a086c71d15612ea966f8ba5f4ebd878ab09 Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 21:09:34 2026 +0800 [fix](runtime filter) truthful local-merge signal for filters crossing a local exchange A same-fragment runtime filter was applied per-instance without merging whenever the target scan reported is_serial_operator=false: the scan passes that flag as need_local_merge (scan_operator.cpp), and the producer only merges when a global consumer exists. The contract silently encoded 'pooled scan => serial => local exchange between join and scan => merge'; parallelizing the pooled scan breaks the inference and partial IN/MIN_MAX filters built from hash-sliced build sides get applied unmerged to bucket-sliced scans, dropping legitimate rows. Make the signal truthful instead of inferred: - TRuntimeFilterDesc.force_local_merge (new optional field): set by FE when a LocalExchangeNode actually sits between the filter builder and a same-fragment target (computed after FE local exchange planning by walking the plan from the builder to the target). - BE register_consumer_runtime_filter ORs the bit into need_merge, putting the consumer into the global manager so producers merge before publishing. BE-planned mode (planner off) has no FE LocalExchangeNodes, the bit stays false, and the old serial-flag inference continues to cover that world. --- be/src/runtime/runtime_state.cpp | 3 +- .../org/apache/doris/planner/RuntimeFilter.java | 37 ++++++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 6 ++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 9e173acab7e..54989511eb8 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -508,7 +508,8 @@ Status RuntimeState::register_consumer_runtime_filter( const TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) { _registered_runtime_filter_ids.insert(desc.filter_id); - bool need_merge = desc.has_remote_targets || need_local_merge; + bool need_merge = desc.has_remote_targets || need_local_merge || + (desc.__isset.force_local_merge && desc.force_local_merge); RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : local_runtime_filter_mgr(); RETURN_IF_ERROR(mgr->register_consumer_filter(this, desc, node_id, consumer_filter)); // Stamp the consumer with the current recursive CTE stage so that incoming publish RPCs diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 9c92f35c82f..3aff4c590de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -263,6 +263,29 @@ public final class RuntimeFilter { this.bitmapFilterNotIn = bitmapFilterNotIn; } + /** + * DFS from {@code node} down to {@code target} within the fragment (stopping at + * ExchangeNode boundaries). Returns null if target is not under node, otherwise + * whether the path crosses a LocalExchangeNode. + */ + private static Boolean pathCrossesLocalExchange(PlanNode node, PlanNode target) { + if (node == target) { + return false; + } + for (PlanNode child : node.getChildren()) { + if (child instanceof ExchangeNode) { + // fragment boundary: a target behind it is a remote target, handled by + // has_remote_targets + continue; + } + Boolean sub = pathCrossesLocalExchange(child, target); + if (sub != null) { + return sub || child instanceof LocalExchangeNode; + } + } + return null; + } + /** * Serializes a runtime filter to Thrift. */ @@ -276,11 +299,25 @@ public final class RuntimeFilter { tFilter.setHasRemoteTargets(hasRemoteTargets); boolean hasSerialTargets = false; + boolean forceLocalMerge = false; for (RuntimeFilterTarget target : targets) { tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), ExprToThriftVisitor.treeToThrift(target.expr)); hasSerialTargets = hasSerialTargets || target.node.isSerialOperatorOnBe(ConnectContext.get()); + // Truthful merge signal: if a LocalExchangeNode sits between the builder join + // and a same-fragment target scan, per-instance partial filters are not aligned + // with the scan's data slice and must be merged before being applied. BE used to + // infer this from the target scan's is_serial_operator (scan pooled => LE + // in between), which silently breaks once the scan is parallelized; this bit is + // computed from the actual plan after FE local exchange planning. In BE-planned + // mode (planner off) the FE tree has no LocalExchangeNodes and the bit stays + // false — the serial-flag inference still covers that world. + if (!forceLocalMerge && target.isLocalTarget) { + Boolean crossed = pathCrossesLocalExchange(builderNode, target.node); + forceLocalMerge = crossed != null && crossed; + } } + tFilter.setForceLocalMerge(forceLocalMerge); boolean enableSyncFilterSize = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index eb4d0265178..e0a402a2728 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1596,6 +1596,12 @@ struct TRuntimeFilterDesc { // the listed partitions with the listed direction; absent partitions are // unsafe for this RF target and must not be pruned by it. 20: optional map<Types.TPlanNodeId, list<TPartitionTargetExprMonotonicity>> planId_to_partition_target_monotonicity; + + // True when a local exchange sits between the filter builder (join) and a same-fragment + // target scan: per-instance partial filters are then NOT aligned with the scan's data + // slice and must be merged before being applied. Computed truthfully by FE after local + // exchange planning; replaces inferring this from the target scan's is_serial_operator. + 21: optional bool force_local_merge; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
