This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 4db3af748b6 branch-4.0: [Bug](runtime-filter) set rf disabled before 
release_undone_filters #57462 (#57482)
4db3af748b6 is described below

commit 4db3af748b6641fdb473f442a8b62ab2c0906be5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 31 09:35:30 2025 +0800

    branch-4.0: [Bug](runtime-filter) set rf disabled before 
release_undone_filters #57462 (#57482)
    
    Cherry-picked from #57462
    
    Co-authored-by: Pxl <[email protected]>
---
 be/src/runtime_filter/runtime_filter_merger.h | 10 ++++++++++
 be/src/runtime_filter/runtime_filter_mgr.cpp  | 15 +++++++++++++--
 be/src/runtime_filter/runtime_filter_mgr.h    |  2 +-
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime_filter/runtime_filter_merger.h 
b/be/src/runtime_filter/runtime_filter_merger.h
index d373bb8be05..00bac845473 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -93,6 +93,16 @@ public:
 
     bool ready() const { return _rf_state == State::READY; }
 
+    void set_wrapper_state_and_ready_to_apply(RuntimeFilterWrapper::State 
state,
+                                              std::string reason = "") {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
+        if (_rf_state == State::READY) {
+            return;
+        }
+        _wrapper->set_state(state, reason);
+        _rf_state = State::READY;
+    }
+
 private:
     RuntimeFilterMerger(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc)
             : RuntimeFilter(desc), _rf_state(State::WAITING_FOR_PRODUCT) {}
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index ba9e9f5bc9d..48645ffdb8e 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -356,6 +356,12 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
                                                               int 
execution_timeout) {
     DCHECK_GT(cnt_val.targetv2_info.size(), 0);
 
+    if (cnt_val.done) {
+        return Status::InternalError("Runtime filter has been sent",
+                                     cnt_val.merger->debug_string());
+    }
+    cnt_val.done = true;
+
     butil::IOBuf request_attachment;
 
     PPublishFilterRequestV2 apply_request;
@@ -424,8 +430,6 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
                              closure->response_.get(), closure.get());
         closure.release();
     }
-
-    cnt_val.done = true;
     return st;
 }
 
@@ -433,6 +437,13 @@ void 
RuntimeFilterMergeControllerEntity::release_undone_filters(QueryContext* qu
     std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
     for (auto& [filter_id, ctx] : _filter_map) {
         if (!ctx.done && !ctx.targetv2_info.empty()) {
+            {
+                std::lock_guard<std::mutex> l(ctx.mtx);
+                ctx.merger->set_wrapper_state_and_ready_to_apply(
+                        RuntimeFilterWrapper::State::DISABLED,
+                        "rf coordinator's query context released before 
runtime filter is ready to "
+                        "apply");
+            }
             auto st = _send_rf_to_target(ctx, std::weak_ptr<QueryContext> {}, 
0,
                                          
UniqueId(query_ctx->query_id()).to_proto(),
                                          query_ctx->execution_timeout());
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 6941e5e5631..160babf278d 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -72,7 +72,7 @@ struct GlobalMergeContext {
     std::vector<TRuntimeFilterTargetParamsV2> targetv2_info;
     std::unordered_set<UniqueId> arrive_id;
     std::vector<PNetworkAddress> source_addrs;
-    bool done = false;
+    std::atomic<bool> done = false;
 };
 
 // owned by RuntimeState


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to