This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7b141ffde72 [pick]add min scan thread num for workload group's scan
thread (#38123)
7b141ffde72 is described below
commit 7b141ffde72e2527b5b1143d625cfa6e033c4ec8
Author: wangbo <[email protected]>
AuthorDate: Fri Jul 19 18:43:05 2024 +0800
[pick]add min scan thread num for workload group's scan thread (#38123)
## Proposed changes
pick #38096
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/runtime/workload_group/workload_group.cpp | 2 +-
be/src/util/s3_util.cpp | 2 +-
be/src/vec/exec/scan/scanner_scheduler.cpp | 60 +++++++++---------------
be/src/vec/exec/scan/scanner_scheduler.h | 13 +++--
6 files changed, 36 insertions(+), 43 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 68e05b41d3b..863d69338bc 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -241,6 +241,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num,
[](const int config) -> b
}
return true;
});
+DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
DEFINE_Int32(remote_split_source_batch_size, "10240");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1d93e4ff606..9050701261c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -289,6 +289,7 @@
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
+DECLARE_mInt32(doris_scanner_min_thread_pool_thread_num);
// number of batch size to fetch the remote split source
DECLARE_mInt32(remote_split_source_batch_size);
// max number of remote scanner thread pool size
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index f1c49d9763c..b68b1765a52 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -327,7 +327,7 @@ Status WorkloadGroupInfo::parse_topic_info(const
TWorkloadGroupInfo& tworkload_g
// 11 min remote scan thread num
workload_group_info->min_remote_scan_thread_num =
- vectorized::ScannerScheduler::get_remote_scan_thread_num();
+ config::doris_scanner_min_thread_pool_thread_num;
if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
tworkload_group_info.min_remote_scan_thread_num > 0) {
workload_group_info->min_remote_scan_thread_num =
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 063cc16c67b..c9406474526 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -189,7 +189,7 @@ std::shared_ptr<Aws::S3::S3Client>
S3ClientFactory::create(const S3Conf& s3_conf
aws_config.maxConnections =
config::doris_scanner_thread_pool_thread_num;
#else
aws_config.maxConnections =
-
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_size();
+
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num();
#endif
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 351912f5b17..e8a22374b32 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -79,28 +79,33 @@ void ScannerScheduler::stop() {
_is_closed = true;
- _local_scan_thread_pool->shutdown();
- _remote_scan_thread_pool->shutdown();
_limited_scan_thread_pool->shutdown();
-
- _local_scan_thread_pool->join();
- _remote_scan_thread_pool->join();
_limited_scan_thread_pool->wait();
+ _local_scan_thread_pool->stop();
+ _remote_scan_thread_pool->stop();
+
LOG(INFO) << "ScannerScheduler stopped";
}
Status ScannerScheduler::init(ExecEnv* env) {
// 1. local scan thread pool
- _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
- config::doris_scanner_thread_pool_thread_num,
- config::doris_scanner_thread_pool_queue_size, "local_scan");
+ _local_scan_thread_pool =
+
std::make_unique<vectorized::SimplifiedScanScheduler>("local_scan", nullptr);
+ Status ret1 =
_local_scan_thread_pool->start(config::doris_scanner_thread_pool_thread_num,
+
config::doris_scanner_thread_pool_thread_num,
+
config::doris_scanner_thread_pool_queue_size);
+ RETURN_IF_ERROR(ret1);
// 2. remote scan thread pool
- _remote_thread_pool_max_size =
ScannerScheduler::get_remote_scan_thread_num();
+ _remote_thread_pool_max_thread_num =
ScannerScheduler::get_remote_scan_thread_num();
int remote_scan_pool_queue_size =
ScannerScheduler::get_remote_scan_thread_queue_size();
- _remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
- _remote_thread_pool_max_size, remote_scan_pool_queue_size,
"RemoteScanThreadPool");
+ _remote_scan_thread_pool =
+
std::make_unique<vectorized::SimplifiedScanScheduler>("RemoteScanThreadPool",
nullptr);
+ Status ret2 =
_remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num,
+
config::doris_scanner_min_thread_pool_thread_num,
+ remote_scan_pool_queue_size);
+ RETURN_IF_ERROR(ret2);
// 3. limited scan thread pool
RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool")
@@ -126,9 +131,6 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return;
}
- // Submit scanners to thread pool
- // TODO(cmy): How to handle this "nice"?
- int nice = 1;
if (ctx->thread_token != nullptr) {
std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
@@ -162,27 +164,13 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
TabletStorageType type =
scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL;
- auto* scan_sched =
+ SimplifiedScanScheduler* scan_sched =
is_local ? ctx->get_simple_scan_scheduler() :
ctx->get_remote_scan_scheduler();
- auto& thread_pool = is_local ? _local_scan_thread_pool :
_remote_scan_thread_pool;
- if (scan_sched) {
- auto work_func = [scanner_ref = scan_task, ctx]() {
- auto status = [&] {
- RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx,
scanner_ref));
- return Status::OK();
- }();
-
- if (!status.ok()) {
- scanner_ref->set_status(status);
- ctx->append_block_to_queue(scanner_ref);
- }
- };
- SimplifiedScanTask simple_scan_task = {work_func, ctx};
- return scan_sched->submit_scan_task(simple_scan_task);
+ if (!scan_sched) { // query without workload group
+ scan_sched =
+ is_local ? _local_scan_thread_pool.get() :
_remote_scan_thread_pool.get();
}
-
- PriorityThreadPool::Task task;
- task.work_function = [scanner_ref = scan_task, ctx]() {
+ auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
@@ -193,10 +181,8 @@ void
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
ctx->append_block_to_queue(scanner_ref);
}
};
- task.priority = nice;
- return thread_pool->offer(task)
- ? Status::OK()
- : Status::InternalError("Scan thread pool had
shutdown");
+ SimplifiedScanTask simple_scan_task = {work_func, ctx};
+ return scan_sched->submit_scan_task(simple_scan_task);
};
if (auto ret = sumbit_task(); !ret) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 238afc15bf6..ddc61396e23 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -39,6 +39,7 @@ namespace doris::vectorized {
class ScannerDelegate;
class ScanTask;
class ScannerContext;
+class SimplifiedScanScheduler;
// Responsible for the scheduling and execution of all Scanners of a BE node.
// Execution thread pool
@@ -63,7 +64,7 @@ public:
std::unique_ptr<ThreadPoolToken>
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
int
max_concurrency);
- int remote_thread_pool_max_size() const { return
_remote_thread_pool_max_size; }
+ int remote_thread_pool_max_thread_num() const { return
_remote_thread_pool_max_thread_num; }
static int get_remote_scan_thread_num();
@@ -81,14 +82,14 @@ private:
// _local_scan_thread_pool is for local scan task(typically, olap scanner)
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs,
etc.)
// _limited_scan_thread_pool is a special pool for queries with resource
limit
- std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
- std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
+ std::unique_ptr<vectorized::SimplifiedScanScheduler>
_local_scan_thread_pool;
+ std::unique_ptr<vectorized::SimplifiedScanScheduler>
_remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
- int _remote_thread_pool_max_size;
+ int _remote_thread_pool_max_thread_num;
};
struct SimplifiedScanTask {
@@ -193,6 +194,10 @@ public:
}
}
+ int get_queue_size() { return _scan_thread_pool->get_queue_size(); }
+
+ int get_active_threads() { return _scan_thread_pool->num_active_threads();
}
+
std::vector<int> thread_debug_info() { return
_scan_thread_pool->debug_info(); }
private:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]