This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7d70c1739c0bae681c01fb544e62f9eabaedc2d5 Author: wangbo <[email protected]> AuthorDate: Fri Jul 19 18:05:17 2024 +0800 [Fix]add min scan thread num for workload group's scan thread (#38096) ## Proposed changes Set workload group's and non-workload group's remote scan min thread to reduce thread num, prevent Be core for thread Exhaustion. before: <img width="582" alt="image" src="https://github.com/user-attachments/assets/3a861191-c5a9-4b73-8a08-0aec0bed1cd5"> after: <img width="522" alt="image" src="https://github.com/user-attachments/assets/4024bbc8-d9d3-45bd-a895-07a6d87a6fd8"> --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/runtime/workload_group/workload_group.cpp | 5 +- be/src/util/s3_util.cpp | 2 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 60 +++++++++--------------- be/src/vec/exec/scan/scanner_scheduler.h | 13 +++-- 6 files changed, 38 insertions(+), 44 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b152111011e..5222100170e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -250,6 +250,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> b } return true; }); +DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8"); DEFINE_Int32(remote_split_source_batch_size, "10240"); DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1"); // number of olap scanner thread pool queue size diff --git a/be/src/common/config.h b/be/src/common/config.h index f4ed1decaa0..53261ab2fb9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -299,6 +299,7 @@ DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms); // number of scanner thread pool size for olap table // and the min thread num of remote scanner thread pool DECLARE_mInt32(doris_scanner_thread_pool_thread_num); +DECLARE_mInt32(doris_scanner_min_thread_pool_thread_num); // number of batch size to fetch the remote split source DECLARE_mInt32(remote_split_source_batch_size); // max number of remote scanner thread pool size diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index fd45093758e..f4d1e0d4f7e 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -316,7 +316,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info( } // 11 min remote scan thread num - int min_remote_scan_thread_num = vectorized::ScannerScheduler::get_remote_scan_thread_num(); + int min_remote_scan_thread_num = config::doris_scanner_min_thread_pool_thread_num; if (tworkload_group_info.__isset.min_remote_scan_thread_num && tworkload_group_info.min_remote_scan_thread_num > 0) { min_remote_scan_thread_num = tworkload_group_info.min_remote_scan_thread_num; @@ -415,7 +415,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler = std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + tg_name, cg_cpu_ctl_ptr); - Status ret = remote_scan_scheduler->start(remote_max_thread_num, remote_max_thread_num, + Status ret = remote_scan_scheduler->start(remote_max_thread_num, + config::doris_scanner_min_thread_pool_thread_num, remote_scan_thread_queue_size); if (ret.ok()) { _remote_scan_task_sched = std::move(remote_scan_scheduler); diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index d7a83fa2cff..ffb93c2d9d9 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -257,7 +257,7 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_s3_client( aws_config.maxConnections = config::doris_scanner_thread_pool_thread_num; #else aws_config.maxConnections = - ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_size(); + ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num(); #endif } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 4d07e66917d..351f5d4e275 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -80,28 +80,33 @@ void ScannerScheduler::stop() { _is_closed = true; - _local_scan_thread_pool->shutdown(); - _remote_scan_thread_pool->shutdown(); _limited_scan_thread_pool->shutdown(); - - _local_scan_thread_pool->join(); - _remote_scan_thread_pool->join(); _limited_scan_thread_pool->wait(); + _local_scan_thread_pool->stop(); + _remote_scan_thread_pool->stop(); + LOG(INFO) << "ScannerScheduler stopped"; } Status ScannerScheduler::init(ExecEnv* env) { // 1. local scan thread pool - _local_scan_thread_pool = std::make_unique<PriorityThreadPool>( - config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size, "local_scan"); + _local_scan_thread_pool = + std::make_unique<vectorized::SimplifiedScanScheduler>("local_scan", nullptr); + Status ret1 = _local_scan_thread_pool->start(config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size); + RETURN_IF_ERROR(ret1); // 2. remote scan thread pool - _remote_thread_pool_max_size = ScannerScheduler::get_remote_scan_thread_num(); + _remote_thread_pool_max_thread_num = ScannerScheduler::get_remote_scan_thread_num(); int remote_scan_pool_queue_size = ScannerScheduler::get_remote_scan_thread_queue_size(); - _remote_scan_thread_pool = std::make_unique<PriorityThreadPool>( - _remote_thread_pool_max_size, remote_scan_pool_queue_size, "RemoteScanThreadPool"); + _remote_scan_thread_pool = + std::make_unique<vectorized::SimplifiedScanScheduler>("RemoteScanThreadPool", nullptr); + Status ret2 = _remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num, + config::doris_scanner_min_thread_pool_thread_num, + remote_scan_pool_queue_size); + RETURN_IF_ERROR(ret2); // 3. limited scan thread pool RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool") @@ -127,9 +132,6 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, return; } - // Submit scanners to thread pool - // TODO(cmy): How to handle this "nice"? - int nice = 1; if (ctx->thread_token != nullptr) { std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock(); if (scanner_delegate == nullptr) { @@ -163,27 +165,13 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); auto sumbit_task = [&]() { bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL; - auto* scan_sched = + SimplifiedScanScheduler* scan_sched = is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler(); - auto& thread_pool = is_local ? _local_scan_thread_pool : _remote_scan_thread_pool; - if (scan_sched) { - auto work_func = [scanner_ref = scan_task, ctx]() { - auto status = [&] { - RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); - return Status::OK(); - }(); - - if (!status.ok()) { - scanner_ref->set_status(status); - ctx->append_block_to_queue(scanner_ref); - } - }; - SimplifiedScanTask simple_scan_task = {work_func, ctx}; - return scan_sched->submit_scan_task(simple_scan_task); + if (!scan_sched) { // query without workload group + scan_sched = + is_local ? _local_scan_thread_pool.get() : _remote_scan_thread_pool.get(); } - - PriorityThreadPool::Task task; - task.work_function = [scanner_ref = scan_task, ctx]() { + auto work_func = [scanner_ref = scan_task, ctx]() { auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -194,10 +182,8 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, ctx->append_block_to_queue(scanner_ref); } }; - task.priority = nice; - return thread_pool->offer(task) - ? Status::OK() - : Status::InternalError("Scan thread pool had shutdown"); + SimplifiedScanTask simple_scan_task = {work_func, ctx}; + return scan_sched->submit_scan_task(simple_scan_task); }; if (auto ret = sumbit_task(); !ret) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 238afc15bf6..ddc61396e23 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -39,6 +39,7 @@ namespace doris::vectorized { class ScannerDelegate; class ScanTask; class ScannerContext; +class SimplifiedScanScheduler; // Responsible for the scheduling and execution of all Scanners of a BE node. // Execution thread pool @@ -63,7 +64,7 @@ public: std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, int max_concurrency); - int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; } + int remote_thread_pool_max_thread_num() const { return _remote_thread_pool_max_thread_num; } static int get_remote_scan_thread_num(); @@ -81,14 +82,14 @@ private: // _local_scan_thread_pool is for local scan task(typically, olap scanner) // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) // _limited_scan_thread_pool is a special pool for queries with resource limit - std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool; - std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool; + std::unique_ptr<vectorized::SimplifiedScanScheduler> _local_scan_thread_pool; + std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_thread_pool; std::unique_ptr<ThreadPool> _limited_scan_thread_pool; // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; bool _is_init = false; - int _remote_thread_pool_max_size; + int _remote_thread_pool_max_thread_num; }; struct SimplifiedScanTask { @@ -193,6 +194,10 @@ public: } } + int get_queue_size() { return _scan_thread_pool->get_queue_size(); } + + int get_active_threads() { return _scan_thread_pool->num_active_threads(); } + std::vector<int> thread_debug_info() { return _scan_thread_pool->debug_info(); } private: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
