This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3034ac3fe29 [chore](config) Add config to control BufferedReader and
S3FileWriter's thread pool's min max nums (#33974)
3034ac3fe29 is described below
commit 3034ac3fe29dab050cbe9572f1db26635a18ec0d
Author: AlexYue <[email protected]>
AuthorDate: Thu Apr 25 10:27:19 2024 +0800
[chore](config) Add config to control BufferedReader and S3FileWriter's
thread pool's min max nums (#33974)
---
be/src/common/config.cpp | 14 +++++++++++---
be/src/common/config.h | 14 +++++++++++---
be/src/io/cache/block_file_cache_downloader.cpp | 2 +-
be/src/io/fs/s3_file_writer.cpp | 2 +-
be/src/runtime/exec_env_init.cpp | 25 +++++++++++++++++++++----
5 files changed, 45 insertions(+), 12 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9df1a184f60..de1458c240d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1025,9 +1025,8 @@ DEFINE_mInt32(tablet_path_check_batch_size, "1000");
DEFINE_mInt64(row_column_page_size, "4096");
// it must be larger than or equal to 5MB
DEFINE_mInt64(s3_write_buffer_size, "5242880");
-DEFINE_mInt32(s3_task_check_interval, "60");
-// The timeout config for S3 buffer allocation
-DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
+// Log interval when doing s3 upload task
+DEFINE_mInt32(s3_file_writer_log_interval_second, "60");
DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB
@@ -1217,6 +1216,15 @@ DEFINE_mBool(enable_injection_point, "false");
DEFINE_mBool(ignore_schema_change_check, "false");
+// The min thread num for BufferedReaderPrefetchThreadPool
+DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread, "16");
+// The max thread num for BufferedReaderPrefetchThreadPool
+DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
+// The min thread num for S3FileUploadThreadPool
+DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
+// The max thread num for S3FileUploadThreadPool
+DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 81910dd2553..4139d76b6bc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1070,9 +1070,8 @@ DECLARE_mInt32(tablet_path_check_batch_size);
DECLARE_mInt64(row_column_page_size);
// it must be larger than or equal to 5MB
DECLARE_mInt64(s3_write_buffer_size);
-DECLARE_mInt32(s3_task_check_interval);
-// The timeout config for S3 buffer allocation
-DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
+// Log interval when doing s3 upload task
+DECLARE_mInt32(s3_file_writer_log_interval_second);
// the max number of cached file handle for block segemnt
DECLARE_mInt64(file_cache_max_file_reader_cache_size);
DECLARE_mInt64(hdfs_write_batch_buffer_size_mb);
@@ -1296,6 +1295,15 @@ DECLARE_mBool(enable_injection_point);
DECLARE_mBool(ignore_schema_change_check);
+// The min thread num for BufferedReaderPrefetchThreadPool
+DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread);
+// The max thread num for BufferedReaderPrefetchThreadPool
+DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
+// The min thread num for S3FileUploadThreadPool
+DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
+// The max thread num for S3FileUploadThreadPool
+DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index 30fb3a86338..283605f23be 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -184,7 +184,7 @@ struct DownloadTaskExecutor {
LOG_WARNING("").error(st);
}
}
- auto timeout_duration = config::s3_task_check_interval;
+ auto timeout_duration = config::s3_file_writer_log_interval_second;
timespec current_time;
// We don't need high accuracy here, so we use time(nullptr)
// since it's the fastest way to get current time(second)
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 9df1ac847af..84487f496ac 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -135,7 +135,7 @@ Status S3FileWriter::_create_multi_upload_request() {
}
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
- auto timeout_duration = config::s3_writer_buffer_allocation_timeout;
+ auto timeout_duration = config::s3_file_writer_log_interval_second;
auto msg = fmt::format(
"{} multipart upload already takes {} seconds, bucket={}, key={},
upload_id={}",
task_name, timeout_duration, _bucket, _path.native(), _upload_id);
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 5cbb5829ee0..5a7e39cf158 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -142,6 +142,17 @@ static void init_doris_metrics(const
std::vector<StorePath>& store_paths) {
DorisMetrics::instance()->initialize(init_system_metrics, disk_devices,
network_interfaces);
}
+// Used to calculate the num of min thread and max thread based on the passed
config
+static pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) {
+ auto num_cores = doris::CpuInfo::num_cores();
+ min_num = (min_num == 0) ? num_cores : min_num;
+ max_num = (max_num == 0) ? num_cores : max_num;
+ auto factor = max_num / min_num;
+ min_num = std::min(num_cores * factor, min_num);
+ max_num = std::min(min_num * factor, max_num);
+ return {min_num, max_num};
+}
+
Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
const std::vector<StorePath>& spill_store_paths,
const std::set<std::string>& broken_paths) {
@@ -184,9 +195,12 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool));
+ auto [buffered_reader_min_threads, buffered_reader_max_threads] =
+
get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread,
+
config::num_buffered_reader_prefetch_thread_pool_max_thread);
static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
- .set_min_threads(16)
- .set_max_threads(64)
+ .set_min_threads(buffered_reader_min_threads)
+ .set_max_threads(buffered_reader_max_threads)
.build(&_buffered_reader_prefetch_thread_pool));
static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool")
@@ -199,9 +213,12 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_threads(16)
.build(&_s3_downloader_download_poller_thread_pool));
+ auto [s3_file_upload_min_threads, s3_file_upload_max_threads] =
+ get_num_threads(config::num_s3_file_upload_thread_pool_min_thread,
+ config::num_s3_file_upload_thread_pool_max_thread);
static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
- .set_min_threads(16)
- .set_max_threads(64)
+ .set_min_threads(s3_file_upload_min_threads)
+ .set_max_threads(s3_file_upload_max_threads)
.build(&_s3_file_upload_thread_pool));
// min num equal to fragment pool's min num
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]