This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 925da90480 [imporve](bloomfilter) refactor runtime_filter_mgr with
bloomfilter (#21715)
925da90480 is described below
commit 925da90480f60afc0e5333a536d41e004234874e
Author: Mryange <[email protected]>
AuthorDate: Tue Jul 11 22:35:30 2023 +0800
[imporve](bloomfilter) refactor runtime_filter_mgr with bloomfilter (#21715)
Reduced the granularity of the lock. In the past, the entire map was locked
map(string) --> map(int)
The bf does not need to init_with_fixed_length
---
be/src/runtime/runtime_filter_mgr.cpp | 38 +++++++++++++++++++----------------
be/src/runtime/runtime_filter_mgr.h | 13 ++++++------
2 files changed, 28 insertions(+), 23 deletions(-)
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 4380716671..6614532d85 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -39,6 +39,7 @@
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/brpc_client_cache.h"
+#include "util/spinlock.h"
namespace doris {
@@ -208,7 +209,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions*
query_options,
const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
const int producer_size) {
- std::lock_guard<std::mutex> guard(_filter_map_mutex);
+ std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
std::shared_ptr<RuntimeFilterCntlVal> cntVal =
std::make_shared<RuntimeFilterCntlVal>();
// runtime_filter_desc and target will be released,
// so we need to copy to cntVal
@@ -219,10 +220,10 @@ Status
RuntimeFilterMergeControllerEntity::_init_with_desc(
cntVal->filter =
cntVal->pool->add(new IRuntimeFilter(_state,
&_state->get_query_ctx()->obj_pool));
- std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+ auto filter_id = runtime_filter_desc->filter_id;
// LOG(INFO) << "entity filter id:" << filter_id;
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc,
query_options, -1, false);
- _filter_map.emplace(filter_id, cntVal);
+ _filter_map.emplace(filter_id, CntlValwithLock {cntVal,
std::make_unique<SpinLock>()});
return Status::OK();
}
@@ -230,7 +231,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions*
query_options,
const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
const int producer_size) {
- std::lock_guard<std::mutex> guard(_filter_map_mutex);
+ std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
std::shared_ptr<RuntimeFilterCntlVal> cntVal =
std::make_shared<RuntimeFilterCntlVal>();
// runtime_filter_desc and target will be released,
// so we need to copy to cntVal
@@ -241,10 +242,10 @@ Status
RuntimeFilterMergeControllerEntity::_init_with_desc(
cntVal->filter =
cntVal->pool->add(new IRuntimeFilter(_state,
&_state->get_query_ctx()->obj_pool));
- std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+ auto filter_id = runtime_filter_desc->filter_id;
// LOG(INFO) << "entity filter id:" << filter_id;
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc,
query_options);
- _filter_map.emplace(filter_id, cntVal);
+ _filter_map.emplace(filter_id, CntlValwithLock {cntVal,
std::make_unique<SpinLock>()});
return Status::OK();
}
@@ -312,34 +313,37 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
int merged_size = 0;
int64_t merge_time = 0;
+ int64_t start_merge = MonotonicMillis();
+ auto filter_id = request->filter_id();
+ std::map<int, CntlValwithLock>::iterator iter;
{
- int64_t start_merge = MonotonicMillis();
- std::lock_guard<std::mutex> guard(_filter_map_mutex);
- auto iter = _filter_map.find(std::to_string(request->filter_id()));
+ std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
+ iter = _filter_map.find(filter_id);
VLOG_ROW << "recv filter id:" << request->filter_id() << " " <<
request->ShortDebugString();
if (iter == _filter_map.end()) {
return Status::InvalidArgument("unknown filter id {}",
std::to_string(request->filter_id()));
}
- cntVal = iter->second;
- if (auto bf = cntVal->filter->get_bloomfilter()) {
- RETURN_IF_ERROR(bf->init_with_fixed_length());
- }
+ }
+ // iter->second = pair{CntlVal,SpinLock}
+ cntVal = iter->second.first;
+ {
+ std::lock_guard<SpinLock> l(*iter->second.second);
MergeRuntimeFilterParams params(request, attach_data);
- ObjectPool* pool = iter->second->pool.get();
+ ObjectPool* pool = cntVal->pool.get();
RuntimeFilterWrapperHolder holder;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, ¶ms, pool,
holder.getHandle()));
RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get()));
- cntVal->arrive_id.insert(UniqueId(request->fragment_id()).to_string());
+ cntVal->arrive_id.insert(UniqueId(request->fragment_id()));
merged_size = cntVal->arrive_id.size();
// TODO: avoid log when we had acquired a lock
VLOG_ROW << "merge size:" << merged_size << ":" <<
cntVal->producer_size;
DCHECK_LE(merged_size, cntVal->producer_size);
- iter->second->merge_time += (MonotonicMillis() - start_merge);
+ cntVal->merge_time += (MonotonicMillis() - start_merge);
if (merged_size < cntVal->producer_size) {
return Status::OK();
} else {
- merge_time = iter->second->merge_time;
+ merge_time = cntVal->merge_time;
}
}
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 7d7e7a0ba1..2048229cd2 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -26,6 +26,7 @@
#include <map>
#include <memory>
#include <mutex>
+#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -145,12 +146,12 @@ public:
std::vector<doris::TRuntimeFilterTargetParams> target_info;
std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
IRuntimeFilter* filter;
- std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
+ std::unordered_set<UniqueId> arrive_id; // fragment_instance_id ?
std::shared_ptr<ObjectPool> pool;
};
public:
- RuntimeFilterCntlVal* get_filter(int id) { return
_filter_map[std::to_string(id)].get(); }
+ RuntimeFilterCntlVal* get_filter(int id) { return
_filter_map[id].first.get(); }
private:
Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
@@ -166,11 +167,11 @@ private:
UniqueId _query_id;
UniqueId _fragment_instance_id;
// protect _filter_map
- std::mutex _filter_map_mutex;
+ std::shared_mutex _filter_map_mutex;
std::shared_ptr<MemTracker> _mem_tracker;
- // TODO: convert filter id to i32
- // filter-id -> val
- std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
+ using CntlValwithLock =
+ std::pair<std::shared_ptr<RuntimeFilterCntlVal>,
std::unique_ptr<SpinLock>>;
+ std::map<int, CntlValwithLock> _filter_map;
RuntimeState* _state;
bool _opt_remote_rf = true;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]