This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d7d3a2c4f4 [fix](be) Avoid local runtime filter merge deadlock 
(#64866)
9d7d3a2c4f4 is described below

commit 9d7d3a2c4f437e0c2962bde1a49848ed0535063c
Author: Pxl <[email protected]>
AuthorDate: Wed Jul 1 09:30:05 2026 +0800

    [fix](be) Avoid local runtime filter merge deadlock (#64866)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: None
    
    Problem Summary: Local runtime filter merge can deadlock when one join
    build instance publishes a local-merge runtime filter while another
    instance sends its runtime filter size. The old local merge context lock
    protected both the merger and the producer list, so one path could hold
    a producer runtime filter lock and then wait for the context lock while
    another path held the context lock and then waited for a producer lock.
    
    This change gives RuntimeFilterMerger its own internal synchronization
    and makes LocalMergeContext expose a snapshot of the merger and
    producers. Publish, send-size, and sync-size paths take the context lock
    only while copying that snapshot, then merge filters or update producer
    sizes outside the context lock. RuntimeFilterMerger returns the ready
    transition from merge_from directly, removing the separate unlocked
    ready check.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Unit Test
    - build-support/clang-format.sh
    be/src/exec/runtime_filter/runtime_filter_merger.h
    be/src/exec/runtime_filter/runtime_filter_mgr.cpp
    be/src/exec/runtime_filter/runtime_filter_mgr.h
    be/src/exec/runtime_filter/runtime_filter_producer.cpp
    be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
    be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
        - git diff --cached --check
        - ./run-be-ut.sh --run --filter=RuntimeFilterMgrTest.*
        - ./run-be-ut.sh --run --filter=RuntimeFilterMergerTest.*
    - Behavior changed: No
    - Does this need documentation: No
---
 be/src/exec/runtime_filter/runtime_filter_merger.h |  27 +++-
 be/src/exec/runtime_filter/runtime_filter_mgr.cpp  | 142 ++++++++++++---------
 be/src/exec/runtime_filter/runtime_filter_mgr.h    |  35 ++---
 .../runtime_filter/runtime_filter_producer.cpp     |  42 +++---
 be/src/runtime/runtime_state.cpp                   |   2 +-
 .../runtime_filter/runtime_filter_merger_test.cpp  |  26 ++--
 .../runtime_filter/runtime_filter_mgr_test.cpp     |  33 ++---
 7 files changed, 181 insertions(+), 126 deletions(-)

diff --git a/be/src/exec/runtime_filter/runtime_filter_merger.h 
b/be/src/exec/runtime_filter/runtime_filter_merger.h
index 9405d2405a5..a7bd3605f71 100644
--- a/be/src/exec/runtime_filter/runtime_filter_merger.h
+++ b/be/src/exec/runtime_filter/runtime_filter_merger.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <algorithm>
+
 #include "exec/runtime_filter/runtime_filter.h"
 #include "exec/runtime_filter/runtime_filter_definitions.h"
 #include "exprs/vexpr.h"
@@ -46,6 +48,7 @@ public:
     }
 
     std::string debug_string() override {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         return fmt::format(
                 "Merger: ({}, expected_producer_num: {}, 
received_producer_num: {}, "
                 "received_rf_size_num: {}, received_sum_size: {})",
@@ -54,12 +57,15 @@ public:
     }
 
     // If input is a disabled predicate, the final result is a disabled 
predicate.
-    Status merge_from(const RuntimeFilter* other) {
+    // Returns true only for the call that makes the merger ready.
+    Status merge_from(const RuntimeFilter* other, bool* ready) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         _received_producer_num++;
         if (_expected_producer_num < _received_producer_num) {
             return Status::InternalError(
                     "runtime filter merger input product more than expected, 
{}", debug_string());
         }
+        *ready = _received_producer_num == _expected_producer_num;
         if (_received_producer_num == _expected_producer_num) {
             _rf_state = State::READY;
         }
@@ -71,18 +77,26 @@ public:
         return st;
     }
 
-    void set_expected_producer_num(int num) {
+    // Only raise the expected producer count. RuntimeFilterMgr may compute the
+    // count under its own lock and apply it after releasing that lock, so
+    // concurrent registrations can update the merger out of order.
+    void increase_expected_producer_num(int num) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         if (_received_producer_num > 0 || _received_rf_size_num > 0) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
                             "runtime filter merger set expected producer after 
receive data, {}",
                             debug_string());
         }
-        _expected_producer_num = num;
+        _expected_producer_num = std::max(_expected_producer_num, num);
     }
 
-    int get_expected_producer_num() const { return _expected_producer_num; }
+    int get_expected_producer_num() {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
+        return _expected_producer_num;
+    }
 
     bool add_rf_size(uint64_t size) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         _received_rf_size_num++;
         if (_expected_producer_num < _received_rf_size_num) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -93,7 +107,10 @@ public:
         return (_received_rf_size_num == _expected_producer_num);
     }
 
-    uint64_t get_received_sum_size() const { return _received_sum_size; }
+    uint64_t get_received_sum_size() {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
+        return _received_sum_size;
+    }
 
     bool ready() const { return _rf_state == State::READY; }
 
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
index f8e687f09b7..9de74e5acb3 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
@@ -55,7 +55,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global)
 
 std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
RuntimeFilterMgr::get_consume_filters(
         int filter_id) {
-    std::lock_guard<std::mutex> l(_lock);
+    LockGuard l(_lock);
     auto iter = _consumer_map.find(filter_id);
     if (iter == _consumer_map.end()) {
         return {};
@@ -69,13 +69,17 @@ Status RuntimeFilterMgr::register_consumer_filter(
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
-    std::lock_guard<std::mutex> l(_lock);
-    RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, 
consumer));
-    _consumer_map[key].push_back(*consumer);
+    std::shared_ptr<RuntimeFilterConsumer> new_consumer;
+    RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, 
&new_consumer));
+    {
+        LockGuard l(_lock);
+        _consumer_map[key].push_back(new_consumer);
+    }
+    *consumer = new_consumer;
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::register_local_merger_producer_filter(
+Status RuntimeFilterMgr::register_local_merge_producer_filter(
         const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
         std::shared_ptr<RuntimeFilterProducer> producer) {
     if (!_is_global) [[unlikely]] {
@@ -90,58 +94,57 @@ Status 
RuntimeFilterMgr::register_local_merger_producer_filter(
     }
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
+    uint32_t producer_stage = producer->stage();
 
-    LocalMergeContext* context;
+    std::shared_ptr<LocalMergeContext> context;
+    std::shared_ptr<RuntimeFilterMerger> merger;
+    int expected_producer_num = 0;
     {
-        std::lock_guard<std::mutex> l(_lock);
-        context = &_local_merge_map[key]; // may inplace construct default 
object
-    }
+        LockGuard l(_lock);
+        auto iter = _local_merge_map.find(key);
+        if (iter == _local_merge_map.end() || !iter->second ||
+            producer_stage > iter->second->stage) {
+            auto new_context = std::make_shared<LocalMergeContext>();
+            RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, 
&new_context->merger));
+            new_context->stage = producer_stage;
+            _local_merge_map.insert_or_assign(key, new_context);
+            context = new_context;
+        } else {
+            context = iter->second;
+        }
 
-    RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer));
-    return Status::OK();
-}
+        context->producers.emplace_back(producer);
+        merger = context->merger;
+        expected_producer_num = cast_set<int>(context->producers.size());
+    }
 
-Status LocalMergeContext::register_producer(const QueryContext* query_ctx,
-                                            const TRuntimeFilterDesc* desc,
-                                            
std::shared_ptr<RuntimeFilterProducer> producer) {
-    std::lock_guard<std::mutex> l(mtx);
-    if (producer->stage() > stage) {
-        // New recursive CTE round: discard stale merger and producers from
-        // the previous round and recreate the merger for the new round.
-        merger.reset();
-        producers.clear();
-        stage = producer->stage();
-    }
-    if (!merger) {
-        RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger));
-    }
-    producers.emplace_back(producer);
-    merger->set_expected_producer_num(cast_set<int>(producers.size()));
+    merger->increase_expected_producer_num(expected_producer_num);
     // Sync the local merger's stage from the producer so that outgoing merge 
RPCs
     // (via _push_to_remote) carry the correct recursive CTE round number.
-    merger->set_stage(producer->stage());
+    merger->set_stage(producer_stage);
     return Status::OK();
 }
 
-Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
-                                                          LocalMergeContext** 
local_merge_filters) {
+Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t 
expected_stage,
+                                                 
std::shared_ptr<LocalMergeContext>* context) {
     if (!_is_global) [[unlikely]] {
         return Status::InternalError(
                 "A local merge filter can not be registered in Local 
RuntimeFilterMgr");
     }
-    std::lock_guard<std::mutex> l(_lock);
+    context->reset();
+    LockGuard l(_lock);
     auto iter = _local_merge_map.find(filter_id);
     if (iter == _local_merge_map.end()) {
-        // Filter may have been removed during a recursive CTE stage reset.
         // Return OK with nullptr to let the caller skip gracefully.
-        *local_merge_filters = nullptr;
         return Status::OK();
     }
-    *local_merge_filters = &iter->second;
-    if (!iter->second.merger) {
-        return Status::InternalError("local merge context merger is nullptr 
for filter_id: {}",
-                                     filter_id);
+    if (!iter->second) {
+        return Status::InternalError("local merge context is nullptr for 
filter_id: {}", filter_id);
+    }
+    if (expected_stage != iter->second->stage) {
+        return Status::OK();
     }
+    *context = iter->second;
     return Status::OK();
 }
 
@@ -155,18 +158,28 @@ Status RuntimeFilterMgr::register_producer_filter(
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
-    std::lock_guard<std::mutex> l(_lock);
-    if (_producer_id_set.contains(key)) {
-        return Status::InvalidArgument("filter {} has been registered", key);
+    {
+        LockGuard l(_lock);
+        if (_producer_id_set.contains(key)) {
+            return Status::InvalidArgument("filter {} has been registered", 
key);
+        }
+    }
+    std::shared_ptr<RuntimeFilterProducer> new_producer;
+    RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, 
&new_producer));
+    {
+        LockGuard l(_lock);
+        if (_producer_id_set.contains(key)) {
+            return Status::InvalidArgument("filter {} has been registered", 
key);
+        }
+        _producer_id_set.insert(key);
     }
-    RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer));
-    _producer_id_set.insert(key);
+    *producer = new_producer;
     return Status::OK();
 }
 
 bool RuntimeFilterMgr::set_runtime_filter_params(
         const TRuntimeFilterParams& runtime_filter_params) {
-    std::lock_guard l(_lock);
+    LockGuard l(_lock);
     if (!_has_merge_addr) {
         _merge_addr = runtime_filter_params.runtime_filter_merge_addr;
         _has_merge_addr = true;
@@ -199,7 +212,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     cnt_val->targetv2_info = targetv2_info;
     RETURN_IF_ERROR(
             RuntimeFilterMerger::create(query_ctx.get(), runtime_filter_desc, 
&cnt_val->merger));
-    cnt_val->merger->set_expected_producer_num(producer_size);
+    cnt_val->merger->increase_expected_producer_num(producer_size);
 
     return Status::OK();
 }
@@ -304,13 +317,13 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
 }
 
 Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* 
request) {
-    LocalMergeContext* local_merge_filters = nullptr;
-    RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(), 
&local_merge_filters));
-    if (local_merge_filters == nullptr) {
+    std::shared_ptr<LocalMergeContext> context;
+    RETURN_IF_ERROR(get_local_merge_context(request->filter_id(), 
request->stage(), &context));
+    if (!context) {
         // Filter was removed during a recursive CTE stage reset; discard 
stale request.
         return Status::OK();
     }
-    for (auto producer : local_merge_filters->producers) {
+    for (const auto& producer : context->producers) {
         producer->set_synced_size(request->filter_size());
     }
     return Status::OK();
@@ -318,18 +331,32 @@ Status RuntimeFilterMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request)
 
 std::string RuntimeFilterMgr::debug_string() {
     std::string result = "Local Merger Info:\n";
-    std::lock_guard l(_lock);
-    for (const auto& [filter_id, ctx] : _local_merge_map) {
+    struct LocalMergeContextSnapshot {
+        std::shared_ptr<RuntimeFilterMerger> merger;
+        std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
+    };
+    std::vector<LocalMergeContextSnapshot> local_merge_contexts;
+    std::vector<std::shared_ptr<RuntimeFilterConsumer>> consumers;
+    {
+        LockGuard l(_lock);
+        for (const auto& [filter_id, ctx] : _local_merge_map) {
+            DORIS_CHECK(ctx);
+            DORIS_CHECK(ctx->merger);
+            local_merge_contexts.push_back({ctx->merger, ctx->producers});
+        }
+        for (const auto& [filter_id, filter_consumers] : _consumer_map) {
+            consumers.insert(consumers.end(), filter_consumers.begin(), 
filter_consumers.end());
+        }
+    }
+    for (const auto& ctx : local_merge_contexts) {
         result += fmt::format("{}\n", ctx.merger->debug_string());
         for (const auto& producer : ctx.producers) {
             result += fmt::format("{}\n", producer->debug_string());
         }
     }
     result += "Consumer Info:\n";
-    for (const auto& [filter_id, consumers] : _consumer_map) {
-        for (const auto& consumer : consumers) {
-            result += fmt::format("{}\n", consumer->debug_string());
-        }
+    for (const auto& consumer : consumers) {
+        result += fmt::format("{}\n", consumer->debug_string());
     }
     return result;
 }
@@ -373,10 +400,9 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
 
         RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));
 
-        RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get()));
+        RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(), 
&is_ready));
 
         cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id()));
-        is_ready = cnt_val.merger->ready(); // update is_ready in locked scope
     }
 
     if (is_ready) {
@@ -491,7 +517,7 @@ Status GlobalMergeContext::reset(QueryContext* query_ctx) {
     DORIS_CHECK(merger);
     int producer_size = merger->get_expected_producer_num();
     RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, 
&runtime_filter_desc, &merger));
-    merger->set_expected_producer_num(producer_size);
+    merger->increase_expected_producer_num(producer_size);
     arrive_id.clear();
     source_addrs.clear();
     sync_size_callbacks.clear();
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h 
b/be/src/exec/runtime_filter/runtime_filter_mgr.h
index 536eb63e152..e5c6494917c 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h
@@ -56,15 +56,12 @@ class HandleErrorBrpcCallback;
 class SyncSizeCallback;
 
 struct LocalMergeContext {
-    std::mutex mtx;
     std::shared_ptr<RuntimeFilterMerger> merger;
     std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
     // Tracks the recursive CTE round.  When a producer from a newer round
-    // registers, the context is reset (merger recreated, old producers 
dropped).
+    // registers, RuntimeFilterMgr replaces the whole context and old in-flight
+    // users keep the previous context alive through shared_ptr.
     uint32_t stage = 0;
-
-    Status register_producer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc,
-                             std::shared_ptr<RuntimeFilterProducer> producer);
 };
 
 struct GlobalMergeContext {
@@ -98,11 +95,12 @@ public:
                                     int node_id,
                                     std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter);
 
-    Status register_local_merger_producer_filter(const QueryContext* query_ctx,
-                                                 const TRuntimeFilterDesc& 
desc,
-                                                 
std::shared_ptr<RuntimeFilterProducer> producer);
+    Status register_local_merge_producer_filter(const QueryContext* query_ctx,
+                                                const TRuntimeFilterDesc& desc,
+                                                
std::shared_ptr<RuntimeFilterProducer> producer);
 
-    Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** 
local_merge_filters);
+    Status get_local_merge_context(int filter_id, uint32_t expected_stage,
+                                   std::shared_ptr<LocalMergeContext>* 
context);
 
     // Create local producer. This producer is hold by 
RuntimeFilterProducerHelper.
     Status register_producer_filter(const QueryContext* query_ctx, const 
TRuntimeFilterDesc& desc,
@@ -116,10 +114,10 @@ public:
     std::string debug_string();
 
     void remove_filter(int32_t filter_id) {
-        std::lock_guard<std::mutex> l(_lock);
+        LockGuard l(_lock);
         _consumer_map.erase(filter_id);
-        // NOTE: _local_merge_map is NOT erased here.  It is reset lazily in
-        // LocalMergeContext::register_producer when a producer from a newer
+        // NOTE: _local_merge_map is NOT erased here.  It is replaced lazily in
+        // register_local_merge_producer_filter when a producer from a newer
         // recursive CTE round registers.  Erasing eagerly here would race with
         // multi-fragment REBUILD: a consumer-only fragment's remove_filter 
could
         // delete the entry that the producer fragment just re-registered.
@@ -142,16 +140,21 @@ private:
     // RuntimeFilterMgr is owned by RuntimeState, so we only
     // use filter_id as key
     // key: "filter-id"
-    std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> 
_consumer_map;
-    std::set<int32_t> _producer_id_set;
-    std::map<int32_t, LocalMergeContext> _local_merge_map;
+    // Protects fields marked GUARDED_BY(_lock). While holding this lock, only
+    // access RuntimeFilterMgr-owned state or copy shared_ptr snapshots; do not
+    // call methods on existing RuntimeFilter objects, because RF objects have
+    // their own locks and may call back into RuntimeFilterMgr.
+    AnnotatedMutex _lock;
+    std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>> 
_consumer_map
+            GUARDED_BY(_lock);
+    std::set<int32_t> _producer_id_set GUARDED_BY(_lock);
+    std::map<int32_t, std::shared_ptr<LocalMergeContext>> _local_merge_map 
GUARDED_BY(_lock);
 
     std::unique_ptr<MemTracker> _tracker;
 
     TNetworkAddress _merge_addr;
 
     bool _has_merge_addr = false;
-    std::mutex _lock;
 };
 
 // controller -> <query-id, entity>
diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp 
b/be/src/exec/runtime_filter/runtime_filter_producer.cpp
index 7da8c687112..5a342911627 100644
--- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp
@@ -53,16 +53,16 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, 
bool build_hash_table
             // when global consumer not exist, send_to_local_targets will do 
nothing, so merge rf is useless
             return Status::OK();
         }
-        LocalMergeContext* context = nullptr;
-        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
-                _wrapper->filter_id(), &context));
-        if (context == nullptr) {
+        std::shared_ptr<LocalMergeContext> context;
+        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
+                _wrapper->filter_id(), _stage, &context));
+        if (!context) {
             // Filter was removed during a recursive CTE stage reset; this 
producer is stale.
             return Status::OK();
         }
-        std::lock_guard l(context->mtx);
-        RETURN_IF_ERROR(context->merger->merge_from(this));
-        if (context->merger->ready()) {
+        bool ready = false;
+        RETURN_IF_ERROR(context->merger->merge_from(this, &ready));
+        if (ready) {
             if (_has_remote_target) {
                 RETURN_IF_ERROR(_send_to_remote_targets(state, 
context->merger.get()));
             } else {
@@ -123,26 +123,26 @@ Status RuntimeFilterProducer::send_size(RuntimeState* 
state, uint64_t local_filt
     set_state(State::WAITING_FOR_SYNCED_SIZE);
 
     if (_need_do_merge(state)) {
-        LocalMergeContext* merger_context = nullptr;
-        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
-                _wrapper->filter_id(), &merger_context));
-        if (merger_context == nullptr) {
+        std::shared_ptr<LocalMergeContext> context;
+        
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
+                _wrapper->filter_id(), _stage, &context));
+        if (!context) {
             // Filter was removed during a recursive CTE stage reset; this 
producer is stale.
             return Status::OK();
         }
-        std::lock_guard merger_lock(merger_context->mtx);
-        if (merger_context->merger->add_rf_size(local_filter_size)) {
-            if (!_has_remote_target) {
-                for (auto filter : merger_context->producers) {
-                    
filter->set_synced_size(merger_context->merger->get_received_sum_size());
-                }
-                return Status::OK();
-            } else {
-                local_filter_size = 
merger_context->merger->get_received_sum_size();
+        uint64_t received_sum_size = 0;
+        bool ready_to_sync = context->merger->add_rf_size(local_filter_size);
+        if (!ready_to_sync) {
+            return Status::OK();
+        }
+        received_sum_size = context->merger->get_received_sum_size();
+        if (!_has_remote_target) {
+            for (const auto& filter : context->producers) {
+                filter->set_synced_size(received_sum_size);
             }
-        } else {
             return Status::OK();
         }
+        local_filter_size = received_sum_size;
 
     } else if (!_has_remote_target) {
         set_synced_size(local_filter_size);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 54989511eb8..5dfd027d42d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -499,7 +499,7 @@ Status RuntimeState::register_producer_runtime_filter(
         DORIS_CHECK(pfc);
         (*producer_filter)->set_stage(pfc->rec_cte_stage());
     }
-    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
+    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
             _query_ctx, desc, *producer_filter));
     return Status::OK();
 }
diff --git a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp 
b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
index da9d5b66291..3d0bb701c8b 100644
--- a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
@@ -35,15 +35,17 @@ public:
         auto desc = TRuntimeFilterDescBuilder().build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
                 RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger));
-        merger->set_expected_producer_num(2);
+        merger->increase_expected_producer_num(2);
         ASSERT_FALSE(merger->ready());
         ASSERT_EQ(merger->_wrapper->_state, 
RuntimeFilterWrapper::State::UNINITED);
 
+        bool ready = false;
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
                 _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
         producer->set_wrapper_state_and_ready_to_publish(first_product_state);
-        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), 
&ready));
+        ASSERT_FALSE(ready);
         ASSERT_FALSE(merger->ready());
         ASSERT_EQ(merger->_wrapper->_state, first_expected_state);
 
@@ -51,7 +53,8 @@ public:
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
                 _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2));
         
producer2->set_wrapper_state_and_ready_to_publish(second_product_state);
-        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get()));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(), 
&ready));
+        ASSERT_TRUE(ready);
         ASSERT_TRUE(merger->ready());
         ASSERT_EQ(merger->_wrapper->_state, second_expected_state);
     }
@@ -63,15 +66,17 @@ public:
         std::shared_ptr<RuntimeFilterMerger> merger;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
                 RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger));
-        merger->set_expected_producer_num(1);
+        merger->increase_expected_producer_num(1);
         ASSERT_FALSE(merger->ready());
 
+        bool ready = false;
         std::shared_ptr<RuntimeFilterProducer> producer;
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
                 _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
         producer->set_wrapper_state_and_ready_to_publish(state);
-        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), 
&ready));
+        ASSERT_TRUE(ready);
         ASSERT_TRUE(merger->ready());
 
         PMergeFilterRequest request;
@@ -99,7 +104,7 @@ TEST_F(RuntimeFilterMergerTest, add_rf_size) {
     std::shared_ptr<RuntimeFilterMerger> merger;
     auto desc = TRuntimeFilterDescBuilder().build();
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), 
&desc, &merger));
-    merger->set_expected_producer_num(2);
+    merger->increase_expected_producer_num(2);
 
     ASSERT_FALSE(merger->add_rf_size(123));
     ASSERT_TRUE(merger->add_rf_size(1));
@@ -118,22 +123,25 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) {
     std::shared_ptr<RuntimeFilterMerger> merger;
     auto desc = TRuntimeFilterDescBuilder().build();
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), 
&desc, &merger));
-    merger->set_expected_producer_num(1);
+    merger->increase_expected_producer_num(1);
     ASSERT_FALSE(merger->ready());
     ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED);
 
+    bool ready = false;
     std::shared_ptr<RuntimeFilterProducer> producer;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
             _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer));
     
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
-    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // 
ready wrapper
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), 
&ready));
+    ASSERT_TRUE(ready);
+    ASSERT_TRUE(merger->ready());
     ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY);
 
     std::shared_ptr<RuntimeFilterProducer> producer2;
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
             _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2));
     
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
-    auto st = merger->merge_from(producer2.get());
+    auto st = merger->merge_from(producer2.get(), &ready);
     ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR);
 }
 
diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
index d6ccc080961..5a50e762c25 100644
--- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
@@ -77,12 +77,12 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
         // producer_filter should not be nullptr
         EXPECT_FALSE(
                 global_runtime_filter_mgr
-                        ->register_local_merger_producer_filter(ctx.get(), 
desc, producer_filter)
+                        ->register_local_merge_producer_filter(ctx.get(), 
desc, producer_filter)
                         .ok());
         // local merge filter should not be registered in local mgr
         EXPECT_FALSE(
                 local_runtime_filter_mgr
-                        ->register_local_merger_producer_filter(ctx.get(), 
desc, producer_filter)
+                        ->register_local_merge_producer_filter(ctx.get(), 
desc, producer_filter)
                         .ok());
         // producer should not registered in global mgr
         EXPECT_FALSE(global_runtime_filter_mgr
@@ -103,28 +103,29 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
                              .ok());
         EXPECT_NE(producer_filter, nullptr);
 
-        LocalMergeContext* local_merge_filters = nullptr;
+        std::shared_ptr<LocalMergeContext> context;
         // filter_id not yet registered: global mgr returns OK with nullptr
         // (graceful skip for recursive CTE stage reset).
         EXPECT_TRUE(global_runtime_filter_mgr
-                            ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
+                            ->get_local_merge_context(filter_id, 
producer_filter->stage(), &context)
                             .ok());
-        EXPECT_EQ(local_merge_filters, nullptr);
+        EXPECT_EQ(context, nullptr);
         // local mgr always returns error (not supported)
-        EXPECT_FALSE(local_runtime_filter_mgr
-                             ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
-                             .ok());
-        // Register local merge filter
-        EXPECT_TRUE(
-                global_runtime_filter_mgr
-                        ->register_local_merger_producer_filter(ctx.get(), 
desc, producer_filter)
+        EXPECT_FALSE(
+                local_runtime_filter_mgr
+                        ->get_local_merge_context(filter_id, 
producer_filter->stage(), &context)
                         .ok());
+        // Register local merge filter
+        EXPECT_TRUE(global_runtime_filter_mgr
+                            ->register_local_merge_producer_filter(ctx.get(), 
desc, producer_filter)
+                            .ok());
         EXPECT_TRUE(global_runtime_filter_mgr
-                            ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
+                            ->get_local_merge_context(filter_id, 
producer_filter->stage(), &context)
                             .ok());
-        EXPECT_NE(local_merge_filters, nullptr);
-        EXPECT_EQ(local_merge_filters->producers.size(), 1);
-        local_merge_filters->producers.front()->_rf_state =
+        EXPECT_NE(context, nullptr);
+        EXPECT_NE(context->merger, nullptr);
+        EXPECT_EQ(context->producers.size(), 1);
+        context->producers.front()->_rf_state =
                 RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE;
     }
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to