This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f4124518e70 branch-4.0: [Chore](runtime-filter) add more rf merger
info to debug string and add release_undone_filters logic #56632 (#56719)
f4124518e70 is described below
commit f4124518e70ec95fe22111ec17954e42f75a042c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Oct 11 13:36:31 2025 +0800
branch-4.0: [Chore](runtime-filter) add more rf merger info to debug string
and add release_undone_filters logic #56632 (#56719)
Cherry-picked from #56632
Co-authored-by: Pxl <[email protected]>
---
be/src/runtime/fragment_mgr.cpp | 19 +--
be/src/runtime/query_context.cpp | 3 +
be/src/runtime_filter/runtime_filter.h | 2 +-
be/src/runtime_filter/runtime_filter_mgr.cpp | 182 +++++++++++++++++----------
be/src/runtime_filter/runtime_filter_mgr.h | 9 ++
5 files changed, 139 insertions(+), 76 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index edde9773ad2..e7f3ca3426c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -753,13 +753,11 @@ Status FragmentMgr::_get_or_create_query_ctx(const
TPipelineFragmentParams& para
if (parent.__isset.runtime_filter_info) {
auto info = parent.runtime_filter_info;
if (info.__isset.runtime_filter_params) {
- if
(!info.runtime_filter_params.rid_to_runtime_filter.empty()) {
- auto handler =
-
std::make_shared<RuntimeFilterMergeControllerEntity>();
- RETURN_IF_ERROR(
- handler->init(query_ctx,
info.runtime_filter_params));
-
query_ctx->set_merge_controller_handler(handler);
- }
+ auto handler =
+
std::make_shared<RuntimeFilterMergeControllerEntity>();
+ RETURN_IF_ERROR(
+ handler->init(query_ctx,
info.runtime_filter_params));
+
query_ctx->set_merge_controller_handler(handler);
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
info.runtime_filter_params);
@@ -806,6 +804,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
debug_string_buffer, "QueryId: {},
global_runtime_filter_mgr: {}\n",
print_id(it.first.first),
it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());
+
+ if
(it.second->get_query_ctx()->get_merge_controller_handler()) {
+ fmt::format_to(debug_string_buffer, "{}\n",
+ it.second->get_query_ctx()
+ ->get_merge_controller_handler()
+ ->debug_string());
+ }
}
auto timeout_second = it.second->timeout_second();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c19a27e2d39..ba633da034d 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -231,6 +231,9 @@ QueryContext::~QueryContext() {
_runtime_predicates.clear();
file_scan_range_params_map.clear();
obj_pool.clear();
+ if (_merge_controller_handler) {
+ _merge_controller_handler->release_undone_filters(this);
+ }
_merge_controller_handler.reset();
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
diff --git a/be/src/runtime_filter/runtime_filter.h
b/be/src/runtime_filter/runtime_filter.h
index 7273db88ecb..2605dd30519 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -60,7 +60,7 @@ public:
auto state = _wrapper->get_state();
if (state != RuntimeFilterWrapper::State::READY) {
- request->set_disabled(state ==
RuntimeFilterWrapper::State::DISABLED);
+ request->set_disabled(true);
return Status::OK();
}
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 25f017dae57..ba9e9f5bc9d 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -48,7 +48,8 @@ namespace doris {
#include "common/compile_check_begin.h"
RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global)
: _is_global(is_global),
-
_tracker(std::make_unique<MemTracker>("RuntimeFilterMgr(experimental)")) {}
+ _tracker(std::make_unique<MemTracker>(
+ fmt::format("RuntimeFilterMgr({})", is_global ? "global" :
"local"))) {}
std::vector<std::shared_ptr<RuntimeFilterConsumer>>
RuntimeFilterMgr::get_consume_filters(
int filter_id) {
@@ -284,10 +285,19 @@ Status RuntimeFilterMgr::sync_filter_size(const
PSyncFilterSizeRequest* request)
}
std::string RuntimeFilterMgr::debug_string() {
- std::string result = "Merger Info:\n";
+ std::string result = "Local Merger Info:\n";
std::lock_guard l(_lock);
- for (const auto& [filter_id, merger] : _local_merge_map) {
- result += fmt::format("{}\n", merger.merger->debug_string());
+ for (const auto& [filter_id, ctx] : _local_merge_map) {
+ result += fmt::format("{}\n", ctx.merger->debug_string());
+ for (const auto& producer : ctx.producers) {
+ result += fmt::format("{}\n", producer->debug_string());
+ }
+ }
+ result += "Consumer Info:\n";
+ for (const auto& [filter_id, consumers] : _consumer_map) {
+ for (const auto& consumer : consumers) {
+ result += fmt::format("{}\n", consumer->debug_string());
+ }
}
return result;
}
@@ -300,7 +310,6 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
int64_t merge_time = 0;
auto filter_id = request->filter_id();
std::map<int, GlobalMergeContext>::iterator iter;
- Status st = Status::OK();
{
std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
iter = _filter_map.find(filter_id);
@@ -331,81 +340,118 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
}
if (is_ready) {
- DCHECK_GT(cnt_val.targetv2_info.size(), 0);
-
- butil::IOBuf request_attachment;
+ return _send_rf_to_target(cnt_val,
+ query_ctx->ignore_runtime_filter_error()
+ ? std::weak_ptr<QueryContext> {}
+ : query_ctx,
+ merge_time, request->query_id(),
query_ctx->execution_timeout());
+ }
+ return Status::OK();
+}
- PPublishFilterRequestV2 apply_request;
- // serialize filter
- void* data = nullptr;
- int len = 0;
- bool has_attachment = false;
+Status
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext&
cnt_val,
+
std::weak_ptr<QueryContext> ctx,
+ int64_t
merge_time,
+ PUniqueId
query_id,
+ int
execution_timeout) {
+ DCHECK_GT(cnt_val.targetv2_info.size(), 0);
+
+ butil::IOBuf request_attachment;
+
+ PPublishFilterRequestV2 apply_request;
+ // serialize filter
+ void* data = nullptr;
+ int len = 0;
+ bool has_attachment = false;
+
+ RETURN_IF_ERROR(cnt_val.merger->serialize(&apply_request, &data, &len));
+
+ if (data != nullptr && len > 0) {
+ void* allocated = malloc(len);
+ memcpy(allocated, data, len);
+ // control the memory by doris self to avoid using brpc's thread local
storage
+ // because the memory of tls will not be released
+ request_attachment.append_user_data(allocated, len, [](void* ptr) {
free(ptr); });
+ has_attachment = true;
+ }
- RETURN_IF_ERROR(cnt_val.merger->serialize(&apply_request, &data,
&len));
+ std::vector<TRuntimeFilterTargetParamsV2>& targets = cnt_val.targetv2_info;
+ auto st = Status::OK();
+ for (auto& target : targets) {
+ auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
+
DummyBrpcCallback<PPublishFilterResponse>>::
+
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
+
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
+
+ closure->request_->set_merge_time(merge_time);
+ *closure->request_->mutable_query_id() = query_id;
+ if (has_attachment) {
+ closure->cntl_->request_attachment().append(request_attachment);
+ }
- if (data != nullptr && len > 0) {
- void* allocated = malloc(len);
- memcpy(allocated, data, len);
- // control the memory by doris self to avoid using brpc's thread
local storage
- // because the memory of tls will not be released
- request_attachment.append_user_data(allocated, len, [](void* ptr)
{ free(ptr); });
- has_attachment = true;
+
closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(execution_timeout));
+ if (config::execution_ignore_eovercrowded) {
+ closure->cntl_->ignore_eovercrowded();
}
- auto ctx = query_ctx->ignore_runtime_filter_error() ?
std::weak_ptr<QueryContext> {}
- : query_ctx;
- std::vector<TRuntimeFilterTargetParamsV2>& targets =
cnt_val.targetv2_info;
-
- for (auto& target : targets) {
- auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
-
DummyBrpcCallback<PPublishFilterResponse>>::
-
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
-
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
-
- closure->request_->set_merge_time(merge_time);
- *closure->request_->mutable_query_id() = request->query_id();
- if (has_attachment) {
-
closure->cntl_->request_attachment().append(request_attachment);
+ // set fragment-id
+ if (target.__isset.target_fragment_ids) {
+ for (auto& target_fragment_id : target.target_fragment_ids) {
+ closure->request_->add_fragment_ids(target_fragment_id);
}
-
- closure->cntl_->set_timeout_ms(
-
get_execution_rpc_timeout_ms(query_ctx->execution_timeout()));
- if (config::execution_ignore_eovercrowded) {
- closure->cntl_->ignore_eovercrowded();
+ } else {
+ // FE not upgraded yet.
+ for (auto& target_fragment_instance_id :
target.target_fragment_instance_ids) {
+ PUniqueId* cur_id =
closure->request_->add_fragment_instance_ids();
+ cur_id->set_hi(target_fragment_instance_id.hi);
+ cur_id->set_lo(target_fragment_instance_id.lo);
}
+ }
- // set fragment-id
- if (target.__isset.target_fragment_ids) {
- for (auto& target_fragment_id : target.target_fragment_ids) {
- closure->request_->add_fragment_ids(target_fragment_id);
- }
- } else {
- // FE not upgraded yet.
- for (auto& target_fragment_instance_id :
target.target_fragment_instance_ids) {
- PUniqueId* cur_id =
closure->request_->add_fragment_instance_ids();
- cur_id->set_hi(target_fragment_instance_id.hi);
- cur_id->set_lo(target_fragment_instance_id.lo);
- }
- }
+ std::shared_ptr<PBackendService_Stub> stub(
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+ target.target_fragment_instance_addr));
+ if (stub == nullptr) {
+ LOG(WARNING) << "Failed to init rpc to "
+ << target.target_fragment_instance_addr.hostname <<
":"
+ << target.target_fragment_instance_addr.port;
+ st = Status::InternalError("Failed to init rpc to {}:{}",
+
target.target_fragment_instance_addr.hostname,
+
target.target_fragment_instance_addr.port);
+ continue;
+ }
+ stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(),
+ closure->response_.get(), closure.get());
+ closure.release();
+ }
- std::shared_ptr<PBackendService_Stub> stub(
-
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
- target.target_fragment_instance_addr));
- if (stub == nullptr) {
- LOG(WARNING) << "Failed to init rpc to "
- << target.target_fragment_instance_addr.hostname
<< ":"
- << target.target_fragment_instance_addr.port;
- st = Status::InternalError("Failed to init rpc to {}:{}",
-
target.target_fragment_instance_addr.hostname,
-
target.target_fragment_instance_addr.port);
- continue;
+ cnt_val.done = true;
+ return st;
+}
+
+void RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext*
query_ctx) {
+ std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+ for (auto& [filter_id, ctx] : _filter_map) {
+ if (!ctx.done && !ctx.targetv2_info.empty()) {
+ auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {},
0,
+
UniqueId(query_ctx->query_id()).to_proto(),
+ query_ctx->execution_timeout());
+ if (!st.ok()) {
+ LOG(WARNING)
+ << "Failed to send runtime filter to target before
query done. filter_id:"
+ << filter_id << " " << ctx.merger->debug_string() << "
reason:" << st;
}
- stub->apply_filterv2(closure->cntl_.get(), closure->request_.get(),
- closure->response_.get(), closure.get());
- closure.release();
}
}
- return st;
+ _filter_map.clear();
}
+std::string RuntimeFilterMergeControllerEntity::debug_string() {
+ std::string result = "RuntimeFilterMergeControllerEntity Info:\n";
+ std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+ for (const auto& [filter_id, ctx] : _filter_map) {
+ result += fmt::format("{}\n", ctx.merger->debug_string());
+ }
+ return result;
+}
} // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h
b/be/src/runtime_filter/runtime_filter_mgr.h
index 622d1d1742f..6941e5e5631 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -21,6 +21,7 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
+#include <glog/logging.h>
#include <cstdint>
#include <map>
@@ -71,6 +72,7 @@ struct GlobalMergeContext {
std::vector<TRuntimeFilterTargetParamsV2> targetv2_info;
std::unordered_set<UniqueId> arrive_id;
std::vector<PNetworkAddress> source_addrs;
+ bool done = false;
};
// owned by RuntimeState
@@ -146,12 +148,19 @@ public:
Status send_filter_size(std::shared_ptr<QueryContext> query_ctx,
const PSendFilterSizeRequest* request);
+ std::string debug_string();
+
+ void release_undone_filters(QueryContext* query_ctx);
+
private:
Status _init_with_desc(std::shared_ptr<QueryContext> query_ctx,
const TRuntimeFilterDesc* runtime_filter_desc,
const std::vector<TRuntimeFilterTargetParamsV2>&&
target_info,
const int producer_size);
+ Status _send_rf_to_target(GlobalMergeContext& cnt_val,
std::weak_ptr<QueryContext> ctx,
+ int64_t merge_time, PUniqueId query_id, int
execution_timeout);
+
// protect _filter_map
std::shared_mutex _filter_map_mutex;
std::shared_ptr<MemTracker> _mem_tracker;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]