This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e3943ef952f [Bug](runtime-filter) use rf_lock to lock
local_state.conjuncts read operator #59688 (#59770)
e3943ef952f is described below
commit e3943ef952f7fbefa2c9621e55de9b2785d197f6
Author: Pxl <[email protected]>
AuthorDate: Tue Jan 13 09:52:43 2026 +0800
[Bug](runtime-filter) use rf_lock to lock local_state.conjuncts read
operator #59688 (#59770)
#59688
---
be/src/pipeline/exec/scan_operator.cpp | 12 ------------
be/src/pipeline/exec/scan_operator.h | 3 ---
be/src/runtime_filter/runtime_filter_consumer_helper.cpp | 11 +++++++++++
be/src/runtime_filter/runtime_filter_consumer_helper.h | 4 ++++
be/src/vec/exec/scan/scanner.cpp | 3 ++-
5 files changed, 17 insertions(+), 16 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index c6ca051adec..602fc5949e0 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1119,18 +1119,6 @@ int64_t ScanLocalState<Derived>::limit_per_scanner() {
return _parent->cast<typename Derived::Parent>()._limit_per_scanner;
}
-template <typename Derived>
-Status
ScanLocalState<Derived>::clone_conjunct_ctxs(vectorized::VExprContextSPtrs&
conjuncts) {
- if (!_conjuncts.empty()) {
- std::unique_lock l(_conjunct_lock);
- conjuncts.resize(_conjuncts.size());
- for (size_t i = 0; i != _conjuncts.size(); ++i) {
- RETURN_IF_ERROR(_conjuncts[i]->clone(state(), conjuncts[i]));
- }
- }
- return Status::OK();
-}
-
template <typename Derived>
Status ScanLocalState<Derived>::_init_profile() {
// 1. counters for scan node
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 00c39269c25..1733bc16c1c 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -77,7 +77,6 @@ public:
virtual int64_t limit_per_scanner() = 0;
- virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs&
conjuncts) = 0;
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) = 0;
virtual TPushAggOp::type get_push_down_agg_type() = 0;
@@ -131,7 +130,6 @@ protected:
RuntimeProfile::Counter* _scan_bytes = nullptr;
RuntimeFilterConsumerHelper _helper;
- std::mutex _conjunct_lock;
};
template <typename LocalStateType>
@@ -159,7 +157,6 @@ class ScanLocalState : public ScanLocalStateBase {
int64_t limit_per_scanner() override;
- Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts)
override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges)
override {}
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
index afb6007e35e..bf871d34fec 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
@@ -130,6 +130,17 @@ Status
RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter(
return Status::OK();
}
+Status RuntimeFilterConsumerHelper::clone_conjunct_ctxs(
+ RuntimeState* state, vectorized::VExprContextSPtrs& scanner_conjuncts,
+ vectorized::VExprContextSPtrs& local_state_conjuncts) {
+ std::unique_lock l(_rf_locks);
+ scanner_conjuncts.resize(local_state_conjuncts.size());
+ for (size_t i = 0; i != local_state_conjuncts.size(); ++i) {
+ RETURN_IF_ERROR(local_state_conjuncts[i]->clone(state,
scanner_conjuncts[i]));
+ }
+ return Status::OK();
+}
+
void RuntimeFilterConsumerHelper::collect_realtime_profile(
RuntimeProfile* parent_operator_profile) {
std::ignore = parent_operator_profile->add_counter("RuntimeFilterInfo",
TUnit::NONE,
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h
b/be/src/runtime_filter/runtime_filter_consumer_helper.h
index 36da3cd10c0..6b077ac3885 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h
@@ -48,6 +48,10 @@ public:
vectorized::VExprContextSPtrs& conjuncts,
const RowDescriptor&
row_descriptor);
+ Status clone_conjunct_ctxs(RuntimeState* state,
+ vectorized::VExprContextSPtrs&
scanner_conjuncts,
+ vectorized::VExprContextSPtrs&
local_state_conjuncts);
+
// Called by XXXLocalState::close()
// parent_operator_profile is owned by LocalState so update it is safe at
here.
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index 90a07d07047..517c45d14cc 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -211,7 +211,8 @@ Status Scanner::try_append_late_arrival_runtime_filter() {
}
// Notice that the number of runtime filters may be larger than
_applied_rf_num.
// But it is ok because it will be updated at next time.
- RETURN_IF_ERROR(_local_state->clone_conjunct_ctxs(_conjuncts));
+ RETURN_IF_ERROR(_local_state->_helper.clone_conjunct_ctxs(_state,
_conjuncts,
+
_local_state->_conjuncts));
_applied_rf_num = arrived_rf_num;
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]