This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 4db3af748b6 branch-4.0: [Bug](runtime-filter) set rf disabled before
release_undone_filters #57462 (#57482)
4db3af748b6 is described below
commit 4db3af748b6641fdb473f442a8b62ab2c0906be5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 31 09:35:30 2025 +0800
branch-4.0: [Bug](runtime-filter) set rf disabled before
release_undone_filters #57462 (#57482)
Cherry-picked from #57462
Co-authored-by: Pxl <[email protected]>
---
be/src/runtime_filter/runtime_filter_merger.h | 10 ++++++++++
be/src/runtime_filter/runtime_filter_mgr.cpp | 15 +++++++++++++--
be/src/runtime_filter/runtime_filter_mgr.h | 2 +-
3 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/be/src/runtime_filter/runtime_filter_merger.h
b/be/src/runtime_filter/runtime_filter_merger.h
index d373bb8be05..00bac845473 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -93,6 +93,16 @@ public:
bool ready() const { return _rf_state == State::READY; }
+ void set_wrapper_state_and_ready_to_apply(RuntimeFilterWrapper::State
state,
+ std::string reason = "") {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
+ if (_rf_state == State::READY) {
+ return;
+ }
+ _wrapper->set_state(state, reason);
+ _rf_state = State::READY;
+ }
+
private:
RuntimeFilterMerger(const QueryContext* query_ctx, const
TRuntimeFilterDesc* desc)
: RuntimeFilter(desc), _rf_state(State::WAITING_FOR_PRODUCT) {}
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index ba9e9f5bc9d..48645ffdb8e 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -356,6 +356,12 @@ Status
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
int
execution_timeout) {
DCHECK_GT(cnt_val.targetv2_info.size(), 0);
+ if (cnt_val.done) {
+ return Status::InternalError("Runtime filter has been sent",
+ cnt_val.merger->debug_string());
+ }
+ cnt_val.done = true;
+
butil::IOBuf request_attachment;
PPublishFilterRequestV2 apply_request;
@@ -424,8 +430,6 @@ Status
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
closure->response_.get(), closure.get());
closure.release();
}
-
- cnt_val.done = true;
return st;
}
@@ -433,6 +437,13 @@ void
RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu
std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
for (auto& [filter_id, ctx] : _filter_map) {
if (!ctx.done && !ctx.targetv2_info.empty()) {
+ {
+ std::lock_guard<std::mutex> l(ctx.mtx);
+ ctx.merger->set_wrapper_state_and_ready_to_apply(
+ RuntimeFilterWrapper::State::DISABLED,
+ "rf coordinator's query context released before
runtime filter is ready to "
+ "apply");
+ }
auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {},
0,
UniqueId(query_ctx->query_id()).to_proto(),
query_ctx->execution_timeout());
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h
b/be/src/runtime_filter/runtime_filter_mgr.h
index 6941e5e5631..160babf278d 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -72,7 +72,7 @@ struct GlobalMergeContext {
std::vector<TRuntimeFilterTargetParamsV2> targetv2_info;
std::unordered_set<UniqueId> arrive_id;
std::vector<PNetworkAddress> source_addrs;
- bool done = false;
+ std::atomic<bool> done = false;
};
// owned by RuntimeState
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]