This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 357d75638ec [enhance](boot) Limit the concurrency of load data dirs 
(#35852)
357d75638ec is described below

commit 357d75638ec9dd215b47e2f8c8ee7cf8efd2593e
Author: plat1ko <[email protected]>
AuthorDate: Fri Jun 7 09:55:33 2024 +0800

    [enhance](boot) Limit the concurrency of load data dirs (#35852)
    
    Limit the concurrency of load data dirs, which can reduce memory usage
    during BE boot up.
---
 be/src/common/config.cpp       |  2 ++
 be/src/common/config.h         |  3 +++
 be/src/olap/storage_engine.cpp | 60 +++++++++++++++++++++++++++---------------
 3 files changed, 44 insertions(+), 21 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 432fcc3648b..0258d73989e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1289,6 +1289,8 @@ DEFINE_Int64(s3_file_system_local_upload_buffer_size, 
"5242880");
 //JVM monitoring enable. To prevent be from crashing due to jvm compatibility 
issues. The default setting is off.
 DEFINE_Bool(enable_jvm_monitor, "false");
 
+DEFINE_Int32(load_data_dirs_threads, "-1");
+
 // Skip loading stale rowset meta when initializing `TabletMeta` from protobuf
 DEFINE_mBool(skip_loading_stale_rowset_meta, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ad37b2b2238..ab2b912fa68 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1367,6 +1367,9 @@ DECLARE_Int64(s3_file_system_local_upload_buffer_size);
 //JVM monitoring enable. To prevent be from crashing due to jvm compatibility 
issues.
 DECLARE_Bool(enable_jvm_monitor);
 
+// Num threads to load data dirs, default value -1 indicates the same number 
of threads as the number of data dirs
+DECLARE_Int32(load_data_dirs_threads);
+
 // Skip loading stale rowset meta when initializing `TabletMeta` from protobuf
 DECLARE_mBool(skip_loading_stale_rowset_meta);
 // Whether to use file to record log. When starting BE with --console,
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 3f1ee02a93b..0c167cfacfe 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -174,31 +174,49 @@ StorageEngine::~StorageEngine() {
     stop();
 }
 
-// Note: Only the previously existing root path can be reloaded here, that is, 
the root path registered when re load starts is allowed,
-// but the brand new path of re load is not allowed because the ce scheduler 
information has not been thoroughly updated here
 static Status load_data_dirs(const std::vector<DataDir*>& data_dirs) {
-    std::vector<std::thread> threads;
-    std::vector<Status> results(data_dirs.size());
-    for (size_t i = 0; i < data_dirs.size(); ++i) {
-        threads.emplace_back(
-                [&results, data_dir = data_dirs[i]](size_t index) {
-                    results[index] = data_dir->load();
-                    if (!results[index].ok()) {
-                        LOG(WARNING) << "io error when init load tables. res=" 
<< results[index]
-                                     << ", data dir=" << data_dir->path();
-                    }
-                },
-                i);
-    }
-    for (auto& thread : threads) {
-        thread.join();
+    std::unique_ptr<ThreadPool> pool;
+
+    int num_threads = config::load_data_dirs_threads;
+    if (num_threads <= 0) {
+        num_threads = data_dirs.size();
     }
-    for (const auto& result : results) {
-        if (!result.ok()) {
-            return result;
+
+    auto st = ThreadPoolBuilder("load_data_dir")
+                      .set_min_threads(num_threads)
+                      .set_max_threads(num_threads)
+                      .build(&pool);
+    CHECK(st.ok()) << st;
+
+    std::mutex result_mtx;
+    Status result;
+
+    for (auto* data_dir : data_dirs) {
+        st = pool->submit_func([&, data_dir] {
+            {
+                std::lock_guard lock(result_mtx);
+                if (!result.ok()) { // Some data dir has failed
+                    return;
+                }
+            }
+
+            auto st = data_dir->load();
+            if (!st.ok()) {
+                LOG(WARNING) << "error occured when init load tables. res=" << 
st
+                             << ", data dir=" << data_dir->path();
+                std::lock_guard lock(result_mtx);
+                result = std::move(st);
+            }
+        });
+
+        if (!st.ok()) {
+            return st;
         }
     }
-    return Status::OK();
+
+    pool->wait();
+
+    return result;
 }
 
 Status StorageEngine::_open() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to