This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3034ac3fe29 [chore](config) Add config to control BufferedReader and 
S3FileWriter's thread pool's min max nums (#33974)
3034ac3fe29 is described below

commit 3034ac3fe29dab050cbe9572f1db26635a18ec0d
Author: AlexYue <[email protected]>
AuthorDate: Thu Apr 25 10:27:19 2024 +0800

    [chore](config) Add config to control BufferedReader and S3FileWriter's 
thread pool's min max nums (#33974)
---
 be/src/common/config.cpp                        | 14 +++++++++++---
 be/src/common/config.h                          | 14 +++++++++++---
 be/src/io/cache/block_file_cache_downloader.cpp |  2 +-
 be/src/io/fs/s3_file_writer.cpp                 |  2 +-
 be/src/runtime/exec_env_init.cpp                | 25 +++++++++++++++++++++----
 5 files changed, 45 insertions(+), 12 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9df1a184f60..de1458c240d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1025,9 +1025,8 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
 DEFINE_mInt64(row_column_page_size, "4096");
 // it must be larger than or equal to 5MB
 DEFINE_mInt64(s3_write_buffer_size, "5242880");
-DEFINE_mInt32(s3_task_check_interval, "60");
-// The timeout config for S3 buffer allocation
-DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
+// Log interval when doing s3 upload task
+DEFINE_mInt32(s3_file_writer_log_interval_second, "60");
 DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
 DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB
 
@@ -1217,6 +1216,15 @@ DEFINE_mBool(enable_injection_point, "false");
 
 DEFINE_mBool(ignore_schema_change_check, "false");
 
+// The min thread num for BufferedReaderPrefetchThreadPool
+DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
+// The max thread num for BufferedReaderPrefetchThreadPool
+DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
+// The min thread num for S3FileUploadThreadPool
+DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
+// The max thread num for S3FileUploadThreadPool
+DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 81910dd2553..4139d76b6bc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1070,9 +1070,8 @@ DECLARE_mInt32(tablet_path_check_batch_size);
 DECLARE_mInt64(row_column_page_size);
 // it must be larger than or equal to 5MB
 DECLARE_mInt64(s3_write_buffer_size);
-DECLARE_mInt32(s3_task_check_interval);
-// The timeout config for S3 buffer allocation
-DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
+// Log interval when doing s3 upload task
+DECLARE_mInt32(s3_file_writer_log_interval_second);
 // the max number of cached file handle for block segemnt
 DECLARE_mInt64(file_cache_max_file_reader_cache_size);
 DECLARE_mInt64(hdfs_write_batch_buffer_size_mb);
@@ -1296,6 +1295,15 @@ DECLARE_mBool(enable_injection_point);
 
 DECLARE_mBool(ignore_schema_change_check);
 
+// The min thread num for BufferedReaderPrefetchThreadPool
+DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
+// The max thread num for BufferedReaderPrefetchThreadPool
+DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
+// The min thread num for S3FileUploadThreadPool
+DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
+// The max thread num for S3FileUploadThreadPool
+DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index 30fb3a86338..283605f23be 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -184,7 +184,7 @@ struct DownloadTaskExecutor {
                 LOG_WARNING("").error(st);
             }
         }
-        auto timeout_duration = config::s3_task_check_interval;
+        auto timeout_duration = config::s3_file_writer_log_interval_second;
         timespec current_time;
         // We don't need high accuracy here, so we use time(nullptr)
         // since it's the fastest way to get current time(second)
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 9df1ac847af..84487f496ac 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -135,7 +135,7 @@ Status S3FileWriter::_create_multi_upload_request() {
 }
 
 void S3FileWriter::_wait_until_finish(std::string_view task_name) {
-    auto timeout_duration = config::s3_writer_buffer_allocation_timeout;
+    auto timeout_duration = config::s3_file_writer_log_interval_second;
     auto msg = fmt::format(
             "{} multipart upload already takes {} seconds, bucket={}, key={}, 
upload_id={}",
             task_name, timeout_duration, _bucket, _path.native(), _upload_id);
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 5cbb5829ee0..5a7e39cf158 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -142,6 +142,17 @@ static void init_doris_metrics(const 
std::vector<StorePath>& store_paths) {
     DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, 
network_interfaces);
 }
 
+// Used to calculate the num of min thread and max thread based on the passed 
config
+static pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) {
+    auto num_cores = doris::CpuInfo::num_cores();
+    min_num = (min_num == 0) ? num_cores : min_num;
+    max_num = (max_num == 0) ? num_cores : max_num;
+    auto factor = max_num / min_num;
+    min_num = std::min(num_cores * factor, min_num);
+    max_num = std::min(min_num * factor, max_num);
+    return {min_num, max_num};
+}
+
 Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
                      const std::vector<StorePath>& spill_store_paths,
                      const std::set<std::string>& broken_paths) {
@@ -184,9 +195,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
                               .build(&_send_batch_thread_pool));
 
+    auto [buffered_reader_min_threads, buffered_reader_max_threads] =
+            
get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread,
+                            
config::num_buffered_reader_prefetch_thread_pool_max_thread);
     static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
-                              .set_min_threads(16)
-                              .set_max_threads(64)
+                              .set_min_threads(buffered_reader_min_threads)
+                              .set_max_threads(buffered_reader_max_threads)
                               .build(&_buffered_reader_prefetch_thread_pool));
 
     static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool")
@@ -199,9 +213,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               .set_max_threads(16)
                               
.build(&_s3_downloader_download_poller_thread_pool));
 
+    auto [s3_file_upload_min_threads, s3_file_upload_max_threads] =
+            get_num_threads(config::num_s3_file_upload_thread_pool_min_thread,
+                            config::num_s3_file_upload_thread_pool_max_thread);
     static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
-                              .set_min_threads(16)
-                              .set_max_threads(64)
+                              .set_min_threads(s3_file_upload_min_threads)
+                              .set_max_threads(s3_file_upload_max_threads)
                               .build(&_s3_file_upload_thread_pool));
 
     // min num equal to fragment pool's min num


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to