This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f4124518e70 branch-4.0: [Chore](runtime-filter) add more rf merger 
info to debug string and add release_undone_filters logic #56632 (#56719)
f4124518e70 is described below

commit f4124518e70ec95fe22111ec17954e42f75a042c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 11 13:36:31 2025 +0800

    branch-4.0: [Chore](runtime-filter) add more rf merger info to debug string 
and add release_undone_filters logic #56632 (#56719)
    
    Cherry-picked from #56632
    
    Co-authored-by: Pxl <[email protected]>
---
 be/src/runtime/fragment_mgr.cpp              |  19 +--
 be/src/runtime/query_context.cpp             |   3 +
 be/src/runtime_filter/runtime_filter.h       |   2 +-
 be/src/runtime_filter/runtime_filter_mgr.cpp | 182 +++++++++++++++++----------
 be/src/runtime_filter/runtime_filter_mgr.h   |   9 ++
 5 files changed, 139 insertions(+), 76 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index edde9773ad2..e7f3ca3426c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -753,13 +753,11 @@ Status FragmentMgr::_get_or_create_query_ctx(const 
TPipelineFragmentParams& para
                         if (parent.__isset.runtime_filter_info) {
                             auto info = parent.runtime_filter_info;
                             if (info.__isset.runtime_filter_params) {
-                                if 
(!info.runtime_filter_params.rid_to_runtime_filter.empty()) {
-                                    auto handler =
-                                            
std::make_shared<RuntimeFilterMergeControllerEntity>();
-                                    RETURN_IF_ERROR(
-                                            handler->init(query_ctx, 
info.runtime_filter_params));
-                                    
query_ctx->set_merge_controller_handler(handler);
-                                }
+                                auto handler =
+                                        
std::make_shared<RuntimeFilterMergeControllerEntity>();
+                                RETURN_IF_ERROR(
+                                        handler->init(query_ctx, 
info.runtime_filter_params));
+                                
query_ctx->set_merge_controller_handler(handler);
 
                                 
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
                                         info.runtime_filter_params);
@@ -806,6 +804,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
                             debug_string_buffer, "QueryId: {}, 
global_runtime_filter_mgr: {}\n",
                             print_id(it.first.first),
                             
it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());
+
+                    if 
(it.second->get_query_ctx()->get_merge_controller_handler()) {
+                        fmt::format_to(debug_string_buffer, "{}\n",
+                                       it.second->get_query_ctx()
+                                               ->get_merge_controller_handler()
+                                               ->debug_string());
+                    }
                 }
 
                 auto timeout_second = it.second->timeout_second();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c19a27e2d39..ba633da034d 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -231,6 +231,9 @@ QueryContext::~QueryContext() {
     _runtime_predicates.clear();
     file_scan_range_params_map.clear();
     obj_pool.clear();
+    if (_merge_controller_handler) {
+        _merge_controller_handler->release_undone_filters(this);
+    }
     _merge_controller_handler.reset();
 
     DorisMetrics::instance()->query_ctx_cnt->increment(-1);
diff --git a/be/src/runtime_filter/runtime_filter.h 
b/be/src/runtime_filter/runtime_filter.h
index 7273db88ecb..2605dd30519 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -60,7 +60,7 @@ public:
 
         auto state = _wrapper->get_state();
         if (state != RuntimeFilterWrapper::State::READY) {
-            request->set_disabled(state == 
RuntimeFilterWrapper::State::DISABLED);
+            request->set_disabled(true);
             return Status::OK();
         }
 
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 25f017dae57..ba9e9f5bc9d 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -48,7 +48,8 @@ namespace doris {
 #include "common/compile_check_begin.h"
 RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global)
         : _is_global(is_global),
-          
_tracker(std::make_unique<MemTracker>("RuntimeFilterMgr(experimental)")) {}
+          _tracker(std::make_unique<MemTracker>(
+                  fmt::format("RuntimeFilterMgr({})", is_global ? "global" : 
"local"))) {}
 
 std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
RuntimeFilterMgr::get_consume_filters(
         int filter_id) {
@@ -284,10 +285,19 @@ Status RuntimeFilterMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request)
 }
 
 std::string RuntimeFilterMgr::debug_string() {
-    std::string result = "Merger Info:\n";
+    std::string result = "Local Merger Info:\n";
     std::lock_guard l(_lock);
-    for (const auto& [filter_id, merger] : _local_merge_map) {
-        result += fmt::format("{}\n", merger.merger->debug_string());
+    for (const auto& [filter_id, ctx] : _local_merge_map) {
+        result += fmt::format("{}\n", ctx.merger->debug_string());
+        for (const auto& producer : ctx.producers) {
+            result += fmt::format("{}\n", producer->debug_string());
+        }
+    }
+    result += "Consumer Info:\n";
+    for (const auto& [filter_id, consumers] : _consumer_map) {
+        for (const auto& consumer : consumers) {
+            result += fmt::format("{}\n", consumer->debug_string());
+        }
     }
     return result;
 }
@@ -300,7 +310,6 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
     int64_t merge_time = 0;
     auto filter_id = request->filter_id();
     std::map<int, GlobalMergeContext>::iterator iter;
-    Status st = Status::OK();
     {
         std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
         iter = _filter_map.find(filter_id);
@@ -331,81 +340,118 @@ Status 
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
     }
 
     if (is_ready) {
-        DCHECK_GT(cnt_val.targetv2_info.size(), 0);
-
-        butil::IOBuf request_attachment;
+        return _send_rf_to_target(cnt_val,
+                                  query_ctx->ignore_runtime_filter_error()
+                                          ? std::weak_ptr<QueryContext> {}
+                                          : query_ctx,
+                                  merge_time, request->query_id(), 
query_ctx->execution_timeout());
+    }
+    return Status::OK();
+}
 
-        PPublishFilterRequestV2 apply_request;
-        // serialize filter
-        void* data = nullptr;
-        int len = 0;
-        bool has_attachment = false;
+Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext& 
cnt_val,
+                                                              
std::weak_ptr<QueryContext> ctx,
+                                                              int64_t 
merge_time,
+                                                              PUniqueId 
query_id,
+                                                              int 
execution_timeout) {
+    DCHECK_GT(cnt_val.targetv2_info.size(), 0);
+
+    butil::IOBuf request_attachment;
+
+    PPublishFilterRequestV2 apply_request;
+    // serialize filter
+    void* data = nullptr;
+    int len = 0;
+    bool has_attachment = false;
+
+    RETURN_IF_ERROR(cnt_val.merger->serialize(&apply_request, &data, &len));
+
+    if (data != nullptr && len > 0) {
+        void* allocated = malloc(len);
+        memcpy(allocated, data, len);
+        // control the memory by doris self to avoid using brpc's thread local 
storage
+        // because the memory of tls will not be released
+        request_attachment.append_user_data(allocated, len, [](void* ptr) { 
free(ptr); });
+        has_attachment = true;
+    }
 
-        RETURN_IF_ERROR(cnt_val.merger->serialize(&apply_request, &data, 
&len));
+    std::vector<TRuntimeFilterTargetParamsV2>& targets = cnt_val.targetv2_info;
+    auto st = Status::OK();
+    for (auto& target : targets) {
+        auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
+                                          
DummyBrpcCallback<PPublishFilterResponse>>::
+                
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
+                              
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
+
+        closure->request_->set_merge_time(merge_time);
+        *closure->request_->mutable_query_id() = query_id;
+        if (has_attachment) {
+            closure->cntl_->request_attachment().append(request_attachment);
+        }
 
-        if (data != nullptr && len > 0) {
-            void* allocated = malloc(len);
-            memcpy(allocated, data, len);
-            // control the memory by doris self to avoid using brpc's thread 
local storage
-            // because the memory of tls will not be released
-            request_attachment.append_user_data(allocated, len, [](void* ptr) 
{ free(ptr); });
-            has_attachment = true;
+        
closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(execution_timeout));
+        if (config::execution_ignore_eovercrowded) {
+            closure->cntl_->ignore_eovercrowded();
         }
 
-        auto ctx = query_ctx->ignore_runtime_filter_error() ? 
std::weak_ptr<QueryContext> {}
-                                                            : query_ctx;
-        std::vector<TRuntimeFilterTargetParamsV2>& targets = 
cnt_val.targetv2_info;
-
-        for (auto& target : targets) {
-            auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
-                                              
DummyBrpcCallback<PPublishFilterResponse>>::
-                    
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
-                                  
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
-
-            closure->request_->set_merge_time(merge_time);
-            *closure->request_->mutable_query_id() = request->query_id();
-            if (has_attachment) {
-                
closure->cntl_->request_attachment().append(request_attachment);
+        // set fragment-id
+        if (target.__isset.target_fragment_ids) {
+            for (auto& target_fragment_id : target.target_fragment_ids) {
+                closure->request_->add_fragment_ids(target_fragment_id);
             }
-
-            closure->cntl_->set_timeout_ms(
-                    
get_execution_rpc_timeout_ms(query_ctx->execution_timeout()));
-            if (config::execution_ignore_eovercrowded) {
-                closure->cntl_->ignore_eovercrowded();
+        } else {
+            // FE not upgraded yet.
+            for (auto& target_fragment_instance_id : 
target.target_fragment_instance_ids) {
+                PUniqueId* cur_id = 
closure->request_->add_fragment_instance_ids();
+                cur_id->set_hi(target_fragment_instance_id.hi);
+                cur_id->set_lo(target_fragment_instance_id.lo);
             }
+        }
 
-            // set fragment-id
-            if (target.__isset.target_fragment_ids) {
-                for (auto& target_fragment_id : target.target_fragment_ids) {
-                    closure->request_->add_fragment_ids(target_fragment_id);
-                }
-            } else {
-                // FE not upgraded yet.
-                for (auto& target_fragment_instance_id : 
target.target_fragment_instance_ids) {
-                    PUniqueId* cur_id = 
closure->request_->add_fragment_instance_ids();
-                    cur_id->set_hi(target_fragment_instance_id.hi);
-                    cur_id->set_lo(target_fragment_instance_id.lo);
-                }
-            }
+        std::shared_ptr<PBackendService_Stub> stub(
+                
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                        target.target_fragment_instance_addr));
+        if (stub == nullptr) {
+            LOG(WARNING) << "Failed to init rpc to "
+                         << target.target_fragment_instance_addr.hostname << 
":"
+                         << target.target_fragment_instance_addr.port;
+            st = Status::InternalError("Failed to init rpc to {}:{}",
+                                       
target.target_fragment_instance_addr.hostname,
+                                       
target.target_fragment_instance_addr.port);
+            continue;
+        }
+        stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(),
+                             closure->response_.get(), closure.get());
+        closure.release();
+    }
 
-            std::shared_ptr<PBackendService_Stub> stub(
-                    
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
-                            target.target_fragment_instance_addr));
-            if (stub == nullptr) {
-                LOG(WARNING) << "Failed to init rpc to "
-                             << target.target_fragment_instance_addr.hostname 
<< ":"
-                             << target.target_fragment_instance_addr.port;
-                st = Status::InternalError("Failed to init rpc to {}:{}",
-                                           
target.target_fragment_instance_addr.hostname,
-                                           
target.target_fragment_instance_addr.port);
-                continue;
+    cnt_val.done = true;
+    return st;
+}
+
+void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* 
query_ctx) {
+    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+    for (auto& [filter_id, ctx] : _filter_map) {
+        if (!ctx.done && !ctx.targetv2_info.empty()) {
+            auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {}, 
0,
+                                         
UniqueId(query_ctx->query_id()).to_proto(),
+                                         query_ctx->execution_timeout());
+            if (!st.ok()) {
+                LOG(WARNING)
+                        << "Failed to send runtime filter to target before 
query done. filter_id:"
+                        << filter_id << " " << ctx.merger->debug_string() << " 
reason:" << st;
             }
-            stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(),
-                                 closure->response_.get(), closure.get());
-            closure.release();
         }
     }
-    return st;
+    _filter_map.clear();
 }
 
+std::string RuntimeFilterMergeControllerEntity::debug_string() {
+    std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
+    std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+    for (const auto& [filter_id, ctx] : _filter_map) {
+        result += fmt::format("{}\n", ctx.merger->debug_string());
+    }
+    return result;
+}
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 622d1d1742f..6941e5e5631 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -21,6 +21,7 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
+#include <glog/logging.h>
 
 #include <cstdint>
 #include <map>
@@ -71,6 +72,7 @@ struct GlobalMergeContext {
     std::vector<TRuntimeFilterTargetParamsV2> targetv2_info;
     std::unordered_set<UniqueId> arrive_id;
     std::vector<PNetworkAddress> source_addrs;
+    bool done = false;
 };
 
 // owned by RuntimeState
@@ -146,12 +148,19 @@ public:
     Status send_filter_size(std::shared_ptr<QueryContext> query_ctx,
                             const PSendFilterSizeRequest* request);
 
+    std::string debug_string();
+
+    void release_undone_filters(QueryContext* query_ctx);
+
 private:
     Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,
                            const TRuntimeFilterDesc* runtime_filter_desc,
                            const std::vector<TRuntimeFilterTargetParamsV2>&& 
target_info,
                            const int producer_size);
 
+    Status _send_rf_to_target(GlobalMergeContext& cnt_val, 
std::weak_ptr<QueryContext> ctx,
+                              int64_t merge_time, PUniqueId query_id, int 
execution_timeout);
+
     // protect _filter_map
     std::shared_mutex _filter_map_mutex;
     std::shared_ptr<MemTracker> _mem_tracker;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to