This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new af2d93c9b80 [Fix]add min scan thread num for workload group's scan
thread (#38096)
af2d93c9b80 is described below
commit af2d93c9b8015eccba7a5fbd4e99a9cb90d7617f
Author: wangbo <[email protected]>
AuthorDate: Fri Jul 19 18:05:17 2024 +0800
[Fix]add min scan thread num for workload group's scan thread (#38096)
## Proposed changes
Set workload group's and non-workload group's remote scan min thread to
reduce thread num, prevent Be core for thread Exhaustion.
before:
<img width="582" alt="image"
src="https://github.com/user-attachments/assets/3a861191-c5a9-4b73-8a08-0aec0bed1cd5">
after:
<img width="522" alt="image"
src="https://github.com/user-attachments/assets/4024bbc8-d9d3-45bd-a895-07a6d87a6fd8">
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/runtime/workload_group/workload_group.cpp | 5 +-
be/src/util/s3_util.cpp | 2 +-
be/src/vec/exec/scan/scanner_scheduler.cpp | 60 +++++++++---------------
be/src/vec/exec/scan/scanner_scheduler.h | 13 +++--
6 files changed, 38 insertions(+), 44 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b152111011e..5222100170e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -250,6 +250,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num,
[](const int config) -> b
}
return true;
});
+DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
DEFINE_Int32(remote_split_source_batch_size, "10240");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f4ed1decaa0..53261ab2fb9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -299,6 +299,7 @@
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
+DECLARE_mInt32(doris_scanner_min_thread_pool_thread_num);
// number of batch size to fetch the remote split source
DECLARE_mInt32(remote_split_source_batch_size);
// max number of remote scanner thread pool size
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index fd45093758e..f4d1e0d4f7e 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -316,7 +316,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
}
// 11 min remote scan thread num
- int min_remote_scan_thread_num =
vectorized::ScannerScheduler::get_remote_scan_thread_num();
+ int min_remote_scan_thread_num =
config::doris_scanner_min_thread_pool_thread_num;
if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
tworkload_group_info.min_remote_scan_thread_num > 0) {
min_remote_scan_thread_num =
tworkload_group_info.min_remote_scan_thread_num;
@@ -415,7 +415,8 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
std::unique_ptr<vectorized::SimplifiedScanScheduler>
remote_scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_"
+ tg_name,
cg_cpu_ctl_ptr);
- Status ret = remote_scan_scheduler->start(remote_max_thread_num,
remote_max_thread_num,
+ Status ret = remote_scan_scheduler->start(remote_max_thread_num,
+
config::doris_scanner_min_thread_pool_thread_num,
remote_scan_thread_queue_size);
if (ret.ok()) {
_remote_scan_task_sched = std::move(remote_scan_scheduler);
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index d7a83fa2cff..ffb93c2d9d9 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -257,7 +257,7 @@ std::shared_ptr<io::ObjStorageClient>
S3ClientFactory::_create_s3_client(
aws_config.maxConnections =
config::doris_scanner_thread_pool_thread_num;
#else
aws_config.maxConnections =
-
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_size();
+
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num();
#endif
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 4d07e66917d..351f5d4e275 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -80,28 +80,33 @@ void ScannerScheduler::stop() {
_is_closed = true;
- _local_scan_thread_pool->shutdown();
- _remote_scan_thread_pool->shutdown();
_limited_scan_thread_pool->shutdown();
-
- _local_scan_thread_pool->join();
- _remote_scan_thread_pool->join();
_limited_scan_thread_pool->wait();
+ _local_scan_thread_pool->stop();
+ _remote_scan_thread_pool->stop();
+
LOG(INFO) << "ScannerScheduler stopped";
}
Status ScannerScheduler::init(ExecEnv* env) {
// 1. local scan thread pool
- _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
- config::doris_scanner_thread_pool_thread_num,
- config::doris_scanner_thread_pool_queue_size, "local_scan");
+ _local_scan_thread_pool =
+
std::make_unique<vectorized::SimplifiedScanScheduler>("local_scan", nullptr);
+ Status ret1 =
_local_scan_thread_pool->start(config::doris_scanner_thread_pool_thread_num,
+
config::doris_scanner_thread_pool_thread_num,
+
config::doris_scanner_thread_pool_queue_size);
+ RETURN_IF_ERROR(ret1);
// 2. remote scan thread pool
- _remote_thread_pool_max_size =
ScannerScheduler::get_remote_scan_thread_num();
+ _remote_thread_pool_max_thread_num =
ScannerScheduler::get_remote_scan_thread_num();
int remote_scan_pool_queue_size =
ScannerScheduler::get_remote_scan_thread_queue_size();
- _remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
- _remote_thread_pool_max_size, remote_scan_pool_queue_size,
"RemoteScanThreadPool");
+ _remote_scan_thread_pool =
+
std::make_unique<vectorized::SimplifiedScanScheduler>("RemoteScanThreadPool",
nullptr);
+ Status ret2 =
_remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num,
+
config::doris_scanner_min_thread_pool_thread_num,
+ remote_scan_pool_queue_size);
+ RETURN_IF_ERROR(ret2);
// 3. limited scan thread pool
RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool")
@@ -127,9 +132,6 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return;
}
- // Submit scanners to thread pool
- // TODO(cmy): How to handle this "nice"?
- int nice = 1;
if (ctx->thread_token != nullptr) {
std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
@@ -163,27 +165,13 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
TabletStorageType type =
scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL;
- auto* scan_sched =
+ SimplifiedScanScheduler* scan_sched =
is_local ? ctx->get_simple_scan_scheduler() :
ctx->get_remote_scan_scheduler();
- auto& thread_pool = is_local ? _local_scan_thread_pool :
_remote_scan_thread_pool;
- if (scan_sched) {
- auto work_func = [scanner_ref = scan_task, ctx]() {
- auto status = [&] {
- RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx,
scanner_ref));
- return Status::OK();
- }();
-
- if (!status.ok()) {
- scanner_ref->set_status(status);
- ctx->append_block_to_queue(scanner_ref);
- }
- };
- SimplifiedScanTask simple_scan_task = {work_func, ctx};
- return scan_sched->submit_scan_task(simple_scan_task);
+ if (!scan_sched) { // query without workload group
+ scan_sched =
+ is_local ? _local_scan_thread_pool.get() :
_remote_scan_thread_pool.get();
}
-
- PriorityThreadPool::Task task;
- task.work_function = [scanner_ref = scan_task, ctx]() {
+ auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
@@ -194,10 +182,8 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
ctx->append_block_to_queue(scanner_ref);
}
};
- task.priority = nice;
- return thread_pool->offer(task)
- ? Status::OK()
- : Status::InternalError("Scan thread pool had
shutdown");
+ SimplifiedScanTask simple_scan_task = {work_func, ctx};
+ return scan_sched->submit_scan_task(simple_scan_task);
};
if (auto ret = sumbit_task(); !ret) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 238afc15bf6..ddc61396e23 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -39,6 +39,7 @@ namespace doris::vectorized {
class ScannerDelegate;
class ScanTask;
class ScannerContext;
+class SimplifiedScanScheduler;
// Responsible for the scheduling and execution of all Scanners of a BE node.
// Execution thread pool
@@ -63,7 +64,7 @@ public:
std::unique_ptr<ThreadPoolToken>
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
int
max_concurrency);
- int remote_thread_pool_max_size() const { return
_remote_thread_pool_max_size; }
+ int remote_thread_pool_max_thread_num() const { return
_remote_thread_pool_max_thread_num; }
static int get_remote_scan_thread_num();
@@ -81,14 +82,14 @@ private:
// _local_scan_thread_pool is for local scan task(typically, olap scanner)
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs,
etc.)
// _limited_scan_thread_pool is a special pool for queries with resource
limit
- std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
- std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
+ std::unique_ptr<vectorized::SimplifiedScanScheduler>
_local_scan_thread_pool;
+ std::unique_ptr<vectorized::SimplifiedScanScheduler>
_remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
- int _remote_thread_pool_max_size;
+ int _remote_thread_pool_max_thread_num;
};
struct SimplifiedScanTask {
@@ -193,6 +194,10 @@ public:
}
}
+ int get_queue_size() { return _scan_thread_pool->get_queue_size(); }
+
+ int get_active_threads() { return _scan_thread_pool->num_active_threads();
}
+
std::vector<int> thread_debug_info() { return
_scan_thread_pool->debug_info(); }
private:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]