This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e3943ef952f  [Bug](runtime-filter) use rf_lock to lock 
local_state.conjuncts read operator #59688  (#59770)
e3943ef952f is described below

commit e3943ef952f7fbefa2c9621e55de9b2785d197f6
Author: Pxl <[email protected]>
AuthorDate: Tue Jan 13 09:52:43 2026 +0800

     [Bug](runtime-filter) use rf_lock to lock local_state.conjuncts read 
operator #59688  (#59770)
    
    #59688
---
 be/src/pipeline/exec/scan_operator.cpp                   | 12 ------------
 be/src/pipeline/exec/scan_operator.h                     |  3 ---
 be/src/runtime_filter/runtime_filter_consumer_helper.cpp | 11 +++++++++++
 be/src/runtime_filter/runtime_filter_consumer_helper.h   |  4 ++++
 be/src/vec/exec/scan/scanner.cpp                         |  3 ++-
 5 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index c6ca051adec..602fc5949e0 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1119,18 +1119,6 @@ int64_t ScanLocalState<Derived>::limit_per_scanner() {
     return _parent->cast<typename Derived::Parent>()._limit_per_scanner;
 }
 
-template <typename Derived>
-Status 
ScanLocalState<Derived>::clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
conjuncts) {
-    if (!_conjuncts.empty()) {
-        std::unique_lock l(_conjunct_lock);
-        conjuncts.resize(_conjuncts.size());
-        for (size_t i = 0; i != _conjuncts.size(); ++i) {
-            RETURN_IF_ERROR(_conjuncts[i]->clone(state(), conjuncts[i]));
-        }
-    }
-    return Status::OK();
-}
-
 template <typename Derived>
 Status ScanLocalState<Derived>::_init_profile() {
     // 1. counters for scan node
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 00c39269c25..1733bc16c1c 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -77,7 +77,6 @@ public:
 
     virtual int64_t limit_per_scanner() = 0;
 
-    virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
conjuncts) = 0;
     virtual void set_scan_ranges(RuntimeState* state,
                                  const std::vector<TScanRangeParams>& 
scan_ranges) = 0;
     virtual TPushAggOp::type get_push_down_agg_type() = 0;
@@ -131,7 +130,6 @@ protected:
     RuntimeProfile::Counter* _scan_bytes = nullptr;
 
     RuntimeFilterConsumerHelper _helper;
-    std::mutex _conjunct_lock;
 };
 
 template <typename LocalStateType>
@@ -159,7 +157,6 @@ class ScanLocalState : public ScanLocalStateBase {
 
     int64_t limit_per_scanner() override;
 
-    Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) 
override;
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override {}
 
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
index afb6007e35e..bf871d34fec 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
@@ -130,6 +130,17 @@ Status 
RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter(
     return Status::OK();
 }
 
+Status RuntimeFilterConsumerHelper::clone_conjunct_ctxs(
+        RuntimeState* state, vectorized::VExprContextSPtrs& scanner_conjuncts,
+        vectorized::VExprContextSPtrs& local_state_conjuncts) {
+    std::unique_lock l(_rf_locks);
+    scanner_conjuncts.resize(local_state_conjuncts.size());
+    for (size_t i = 0; i != local_state_conjuncts.size(); ++i) {
+        RETURN_IF_ERROR(local_state_conjuncts[i]->clone(state, 
scanner_conjuncts[i]));
+    }
+    return Status::OK();
+}
+
 void RuntimeFilterConsumerHelper::collect_realtime_profile(
         RuntimeProfile* parent_operator_profile) {
     std::ignore = parent_operator_profile->add_counter("RuntimeFilterInfo", 
TUnit::NONE,
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h 
b/be/src/runtime_filter/runtime_filter_consumer_helper.h
index 36da3cd10c0..6b077ac3885 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h
@@ -48,6 +48,10 @@ public:
                                                   
vectorized::VExprContextSPtrs& conjuncts,
                                                   const RowDescriptor& 
row_descriptor);
 
+    Status clone_conjunct_ctxs(RuntimeState* state,
+                               vectorized::VExprContextSPtrs& 
scanner_conjuncts,
+                               vectorized::VExprContextSPtrs& 
local_state_conjuncts);
+
     // Called by XXXLocalState::close()
     // parent_operator_profile is owned by LocalState so update it is safe at 
here.
     void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index 90a07d07047..517c45d14cc 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -211,7 +211,8 @@ Status Scanner::try_append_late_arrival_runtime_filter() {
     }
     // Notice that the number of runtime filters may be larger than 
_applied_rf_num.
     // But it is ok because it will be updated at next time.
-    RETURN_IF_ERROR(_local_state->clone_conjunct_ctxs(_conjuncts));
+    RETURN_IF_ERROR(_local_state->_helper.clone_conjunct_ctxs(_state, 
_conjuncts,
+                                                              
_local_state->_conjuncts));
     _applied_rf_num = arrived_rf_num;
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to