This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5d450f5ae90 [Chore](runtime-filter) add lock for all runtime filter
producer/consumer's public method (#50110)
5d450f5ae90 is described below
commit 5d450f5ae90cefc42acf31e5defa65028eecbb11
Author: Pxl <[email protected]>
AuthorDate: Sat Apr 19 08:11:26 2025 +0800
[Chore](runtime-filter) add lock for all runtime filter producer/consumer's
public method (#50110)
---
be/src/runtime_filter/runtime_filter.h | 6 +-
be/src/runtime_filter/runtime_filter_consumer.cpp | 5 +-
be/src/runtime_filter/runtime_filter_consumer.h | 7 +-
be/src/runtime_filter/runtime_filter_merger.h | 2 +-
be/src/runtime_filter/runtime_filter_producer.cpp | 6 +-
be/src/runtime_filter/runtime_filter_producer.h | 30 ++++----
.../runtime_filter_producer_helper_set_test.cpp | 81 ++++++++++++++++++++++
7 files changed, 114 insertions(+), 23 deletions(-)
diff --git a/be/src/runtime_filter/runtime_filter.h
b/be/src/runtime_filter/runtime_filter.h
index e5c833d8f67..8b42576c7b4 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -46,11 +46,13 @@ public:
template <class T>
Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
return _wrapper->assign(request, data);
}
template <class T>
Status serialize(T* request, void** data, int* len) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
auto real_runtime_filter_type = _wrapper->get_real_type();
request->set_filter_type(get_type(real_runtime_filter_type));
@@ -81,7 +83,7 @@ public:
return Status::OK();
}
- virtual std::string debug_string() const = 0;
+ virtual std::string debug_string() = 0;
protected:
RuntimeFilter(const TRuntimeFilterDesc* desc)
@@ -118,6 +120,8 @@ protected:
friend class RuntimeFilterProducer;
friend class RuntimeFilterConsumer;
friend class RuntimeFilterMerger;
+
+ std::recursive_mutex _rmtx; // lock all member function of runtime filter
producer/consumer
};
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp
b/be/src/runtime_filter/runtime_filter_consumer.cpp
index 4b1842c8170..acc25ac46cc 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer.cpp
@@ -45,6 +45,7 @@ Status RuntimeFilterConsumer::_apply_ready_expr(
}
Status
RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>&
push_exprs) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_rf_state == State::READY) {
RETURN_IF_ERROR(_apply_ready_expr(push_exprs));
}
@@ -55,6 +56,7 @@ Status
RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilte
}
void RuntimeFilterConsumer::signal(RuntimeFilter* other) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time)
* NANOS_PER_MILLIS));
_set_state(State::READY, other->_wrapper);
if (!_filter_timer.empty()) {
@@ -66,6 +68,7 @@ void RuntimeFilterConsumer::signal(RuntimeFilter* other) {
std::shared_ptr<pipeline::RuntimeFilterTimer>
RuntimeFilterConsumer::create_filter_timer(
std::shared_ptr<pipeline::Dependency> dependencies) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
auto timer =
std::make_shared<pipeline::RuntimeFilterTimer>(_registration_time,
_rf_wait_time_ms, dependencies);
_filter_timer.push_back(timer);
@@ -211,13 +214,13 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
}
void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile*
parent_operator_profile) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
DCHECK(parent_operator_profile != nullptr);
int filter_id = -1;
{
// since debug_string will read from RuntimeFilter::_wrapper
// and it is a shared_ptr, instead of a atomic_shared_ptr
// so it is not thread safe
- std::unique_lock<std::mutex> l(_mtx);
filter_id = _wrapper->filter_id();
parent_operator_profile->add_description(fmt::format("RF{} Info",
filter_id),
debug_string(),
"RuntimeFilterInfo");
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h
b/be/src/runtime_filter/runtime_filter_consumer.h
index 3fb72ef8881..e0e42e509d4 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -58,7 +58,8 @@ public:
// Called after `State` is ready (e.g. signaled)
Status acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>&
push_exprs);
- std::string debug_string() const override {
+ std::string debug_string() override {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
return fmt::format("Consumer: ({}, state: {}, reached_timeout: {},
timeout_limit: {}ms)",
_debug_string(), to_string(_rf_state),
_reached_timeout ? "true" : "false",
std::to_string(_rf_wait_time_ms));
@@ -112,7 +113,6 @@ private:
}
void _set_state(State rf_state, std::shared_ptr<RuntimeFilterWrapper>
other = nullptr) {
- std::unique_lock<std::mutex> l(_mtx);
if (rf_state == State::TIMEOUT) {
DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1);
_reached_timeout = true;
@@ -154,9 +154,6 @@ private:
const int64_t _registration_time;
std::atomic<State> _rf_state;
- // only used to lock _set_state() to make _wrapper and _rf_state is
protected
- // signal and acquire_expr may called in different threads at the same time
- std::mutex _mtx;
bool _reached_timeout = false;
diff --git a/be/src/runtime_filter/runtime_filter_merger.h
b/be/src/runtime_filter/runtime_filter_merger.h
index 63dca0a39ae..bfce64e204a 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -46,7 +46,7 @@ public:
return Status::OK();
}
- std::string debug_string() const override {
+ std::string debug_string() override {
return fmt::format(
"Merger: ({}, expected_producer_num: {},
received_producer_num: {}, "
"received_rf_size_num: {}, received_sum_size: {})",
diff --git a/be/src/runtime_filter/runtime_filter_producer.cpp
b/be/src/runtime_filter/runtime_filter_producer.cpp
index 788aea3b5c0..85d55f9f5c7 100644
--- a/be/src/runtime_filter/runtime_filter_producer.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer.cpp
@@ -46,6 +46,7 @@ Status
RuntimeFilterProducer::_send_to_local_targets(RuntimeState* state, Runtim
};
Status RuntimeFilterProducer::publish(RuntimeState* state, bool
build_hash_table) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
_check_state({State::READY_TO_PUBLISH});
auto do_merge = [&]() {
@@ -141,6 +142,7 @@ public:
void RuntimeFilterProducer::latch_dependency(
const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
_check_state({State::WAITING_FOR_DATA});
return;
@@ -151,6 +153,7 @@ void RuntimeFilterProducer::latch_dependency(
}
Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t
local_filter_size) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
_check_state({State::WAITING_FOR_DATA});
return Status::OK();
@@ -166,7 +169,7 @@ Status RuntimeFilterProducer::send_size(RuntimeState*
state, uint64_t local_filt
LocalMergeContext* merger_context = nullptr;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
_wrapper->filter_id(), &merger_context));
- std::lock_guard l(merger_context->mtx);
+ std::lock_guard merger_lock(merger_context->mtx);
if (merger_context->merger->add_rf_size(local_filter_size)) {
if (!_has_remote_target) {
for (auto filter : merger_context->producers) {
@@ -230,6 +233,7 @@ Status RuntimeFilterProducer::send_size(RuntimeState*
state, uint64_t local_filt
}
void RuntimeFilterProducer::set_synced_size(uint64_t global_size) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (!set_state(State::WAITING_FOR_DATA)) {
_check_wrapper_state({RuntimeFilterWrapper::State::DISABLED});
}
diff --git a/be/src/runtime_filter/runtime_filter_producer.h
b/be/src/runtime_filter/runtime_filter_producer.h
index 620262f6051..ea013625462 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -61,6 +61,7 @@ public:
// insert data to build filter
Status insert(vectorized::ColumnPtr column, size_t start) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (!_wrapper->is_valid() || _rf_state == State::READY_TO_PUBLISH ||
_rf_state == State::PUBLISHED) {
return Status::OK();
@@ -71,7 +72,8 @@ public:
Status publish(RuntimeState* state, bool build_hash_table);
- std::string debug_string() const override {
+ std::string debug_string() override {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
auto result =
fmt::format("Producer: ({}, state: {}", _debug_string(),
to_string(_rf_state));
if (_need_sync_filter_size) {
@@ -85,6 +87,7 @@ public:
void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State
state,
std::string reason = "") {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_rf_state == State::PUBLISHED || _rf_state ==
State::READY_TO_PUBLISH) {
return;
}
@@ -110,7 +113,7 @@ public:
}
bool set_state(State state) {
- std::unique_lock<std::mutex> l(_mtx);
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_rf_state == State::PUBLISHED ||
(state != State::PUBLISHED && _rf_state ==
State::READY_TO_PUBLISH)) {
return false;
@@ -119,10 +122,17 @@ public:
return true;
}
- std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; }
- void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper
= wrapper; }
+ std::shared_ptr<RuntimeFilterWrapper> wrapper() {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
+ return _wrapper;
+ }
+ void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
+ _wrapper = wrapper;
+ }
void collect_realtime_profile(RuntimeProfile* parent_operator_profile) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
DCHECK(parent_operator_profile != nullptr);
if (parent_operator_profile == nullptr) {
return;
@@ -131,12 +141,8 @@ public:
RuntimeFilterInfo:
- RF0 Info: xxxx
*/
- {
- std::unique_lock<std::mutex> l(_mtx);
- parent_operator_profile->add_description(
- fmt::format("RF{} Info", _wrapper->filter_id()),
debug_string(),
- "RuntimeFilterInfo");
- }
+ parent_operator_profile->add_description(fmt::format("RF{} Info",
_wrapper->filter_id()),
+ debug_string(),
"RuntimeFilterInfo");
}
private:
@@ -168,10 +174,6 @@ private:
std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
std::atomic<State> _rf_state;
-
- // only used to lock set_state() to make _rf_state is protected
- // set_synced_size and RuntimeFilterProducerHelper::terminate may called
in different threads at the same time
- std::mutex _mtx;
};
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp
b/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp
new file mode 100644
index 00000000000..a3bf632c13f
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_producer_helper_set.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime_filter/runtime_filter_test_utils.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+
+class RuntimeFilterProducerHelperSetTest : public RuntimeFilterTest {
+ void SetUp() override {
+ RuntimeFilterTest::SetUp();
+ _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM,
INSTANCE_NUM);
+ _op.reset(new pipeline::MockOperatorX());
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2));
+
+ _sink.reset(new pipeline::HashJoinBuildSinkOperatorX(
+ &_pool, 0, _op->operator_id(),
+ TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(),
_tbl));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink));
+
+ _task.reset(new pipeline::PipelineTask(_pipeline, 0,
_runtime_states[0].get(), nullptr,
+ &_profile, {}, 0));
+ }
+
+ pipeline::OperatorPtr _op;
+ pipeline::DataSinkOperatorPtr _sink;
+ pipeline::PipelinePtr _pipeline;
+ std::shared_ptr<pipeline::PipelineTask> _task;
+ ObjectPool _pool;
+};
+
+TEST_F(RuntimeFilterProducerHelperSetTest, basic) {
+ auto helper = RuntimeFilterProducerHelperSet();
+
+ vectorized::VExprContextSPtr ctx;
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+ TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+ ctx->_last_result_column_id = 0;
+
+ vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+ std::vector<TRuntimeFilterDesc> runtime_filter_descs =
{TRuntimeFilterDescBuilder().build()};
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+ helper.init(_runtime_states[0].get(), build_expr_ctxs,
runtime_filter_descs));
+
+ vectorized::Block block;
+ auto column = vectorized::ColumnInt32::create();
+ column->insert(1);
+ column->insert(2);
+ block.insert({std::move(column),
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
+
+ std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.process(_runtime_states[0].get(),
&block, 2));
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]