This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 357d75638ec [enhance](boot) Limit the concurrency of load data dirs
(#35852)
357d75638ec is described below
commit 357d75638ec9dd215b47e2f8c8ee7cf8efd2593e
Author: plat1ko <[email protected]>
AuthorDate: Fri Jun 7 09:55:33 2024 +0800
[enhance](boot) Limit the concurrency of load data dirs (#35852)
Limit the concurrency of load data dirs, which can reduce memory usage
during BE boot up.
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 3 +++
be/src/olap/storage_engine.cpp | 60 +++++++++++++++++++++++++++---------------
3 files changed, 44 insertions(+), 21 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 432fcc3648b..0258d73989e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1289,6 +1289,8 @@ DEFINE_Int64(s3_file_system_local_upload_buffer_size,
"5242880");
//JVM monitoring enable. To prevent be from crashing due to jvm compatibility
issues. The default setting is off.
DEFINE_Bool(enable_jvm_monitor, "false");
+DEFINE_Int32(load_data_dirs_threads, "-1");
+
// Skip loading stale rowset meta when initializing `TabletMeta` from protobuf
DEFINE_mBool(skip_loading_stale_rowset_meta, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ad37b2b2238..ab2b912fa68 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1367,6 +1367,9 @@ DECLARE_Int64(s3_file_system_local_upload_buffer_size);
//JVM monitoring enable. To prevent be from crashing due to jvm compatibility
issues.
DECLARE_Bool(enable_jvm_monitor);
+// Num threads to load data dirs, default value -1 indicates the same number
of threads as the number of data dirs
+DECLARE_Int32(load_data_dirs_threads);
+
// Skip loading stale rowset meta when initializing `TabletMeta` from protobuf
DECLARE_mBool(skip_loading_stale_rowset_meta);
// Whether to use file to record log. When starting BE with --console,
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 3f1ee02a93b..0c167cfacfe 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -174,31 +174,49 @@ StorageEngine::~StorageEngine() {
stop();
}
-// Note: Only the previously existing root path can be reloaded here, that is,
the root path registered when re load starts is allowed,
-// but the brand new path of re load is not allowed because the ce scheduler
information has not been thoroughly updated here
static Status load_data_dirs(const std::vector<DataDir*>& data_dirs) {
- std::vector<std::thread> threads;
- std::vector<Status> results(data_dirs.size());
- for (size_t i = 0; i < data_dirs.size(); ++i) {
- threads.emplace_back(
- [&results, data_dir = data_dirs[i]](size_t index) {
- results[index] = data_dir->load();
- if (!results[index].ok()) {
- LOG(WARNING) << "io error when init load tables. res="
<< results[index]
- << ", data dir=" << data_dir->path();
- }
- },
- i);
- }
- for (auto& thread : threads) {
- thread.join();
+ std::unique_ptr<ThreadPool> pool;
+
+ int num_threads = config::load_data_dirs_threads;
+ if (num_threads <= 0) {
+ num_threads = data_dirs.size();
}
- for (const auto& result : results) {
- if (!result.ok()) {
- return result;
+
+ auto st = ThreadPoolBuilder("load_data_dir")
+ .set_min_threads(num_threads)
+ .set_max_threads(num_threads)
+ .build(&pool);
+ CHECK(st.ok()) << st;
+
+ std::mutex result_mtx;
+ Status result;
+
+ for (auto* data_dir : data_dirs) {
+ st = pool->submit_func([&, data_dir] {
+ {
+ std::lock_guard lock(result_mtx);
+ if (!result.ok()) { // Some data dir has failed
+ return;
+ }
+ }
+
+ auto st = data_dir->load();
+ if (!st.ok()) {
+ LOG(WARNING) << "error occured when init load tables. res=" <<
st
+ << ", data dir=" << data_dir->path();
+ std::lock_guard lock(result_mtx);
+ result = std::move(st);
+ }
+ });
+
+ if (!st.ok()) {
+ return st;
}
}
- return Status::OK();
+
+ pool->wait();
+
+ return result;
}
Status StorageEngine::_open() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]