This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new af2d93c9b80 [Fix]add min scan thread num for workload group's scan 
thread (#38096)
af2d93c9b80 is described below

commit af2d93c9b8015eccba7a5fbd4e99a9cb90d7617f
Author: wangbo <[email protected]>
AuthorDate: Fri Jul 19 18:05:17 2024 +0800

    [Fix]add min scan thread num for workload group's scan thread (#38096)
    
    ## Proposed changes
    Set workload group's and non-workload group's remote scan min thread to
    reduce thread num, prevent Be core for thread Exhaustion.
    before:
    <img width="582" alt="image"
    
src="https://github.com/user-attachments/assets/3a861191-c5a9-4b73-8a08-0aec0bed1cd5";>
    after:
    <img width="522" alt="image"
    
src="https://github.com/user-attachments/assets/4024bbc8-d9d3-45bd-a895-07a6d87a6fd8";>
---
 be/src/common/config.cpp                         |  1 +
 be/src/common/config.h                           |  1 +
 be/src/runtime/workload_group/workload_group.cpp |  5 +-
 be/src/util/s3_util.cpp                          |  2 +-
 be/src/vec/exec/scan/scanner_scheduler.cpp       | 60 +++++++++---------------
 be/src/vec/exec/scan/scanner_scheduler.h         | 13 +++--
 6 files changed, 38 insertions(+), 44 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b152111011e..5222100170e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -250,6 +250,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, 
[](const int config) -> b
     }
     return true;
 });
+DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
 DEFINE_Int32(remote_split_source_batch_size, "10240");
 DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
 // number of olap scanner thread pool queue size
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f4ed1decaa0..53261ab2fb9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -299,6 +299,7 @@ 
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
 // number of scanner thread pool size for olap table
 // and the min thread num of remote scanner thread pool
 DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
+DECLARE_mInt32(doris_scanner_min_thread_pool_thread_num);
 // number of batch size to fetch the remote split source
 DECLARE_mInt32(remote_split_source_batch_size);
 // max number of remote scanner thread pool size
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index fd45093758e..f4d1e0d4f7e 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -316,7 +316,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
     }
 
     // 11 min remote scan thread num
-    int min_remote_scan_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
+    int min_remote_scan_thread_num = 
config::doris_scanner_min_thread_pool_thread_num;
     if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
         tworkload_group_info.min_remote_scan_thread_num > 0) {
         min_remote_scan_thread_num = 
tworkload_group_info.min_remote_scan_thread_num;
@@ -415,7 +415,8 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
                 std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ tg_name,
                                                                       
cg_cpu_ctl_ptr);
-        Status ret = remote_scan_scheduler->start(remote_max_thread_num, 
remote_max_thread_num,
+        Status ret = remote_scan_scheduler->start(remote_max_thread_num,
+                                                  
config::doris_scanner_min_thread_pool_thread_num,
                                                   
remote_scan_thread_queue_size);
         if (ret.ok()) {
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index d7a83fa2cff..ffb93c2d9d9 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -257,7 +257,7 @@ std::shared_ptr<io::ObjStorageClient> 
S3ClientFactory::_create_s3_client(
         aws_config.maxConnections = 
config::doris_scanner_thread_pool_thread_num;
 #else
         aws_config.maxConnections =
-                
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_size();
+                
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num();
 #endif
     }
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 4d07e66917d..351f5d4e275 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -80,28 +80,33 @@ void ScannerScheduler::stop() {
 
     _is_closed = true;
 
-    _local_scan_thread_pool->shutdown();
-    _remote_scan_thread_pool->shutdown();
     _limited_scan_thread_pool->shutdown();
-
-    _local_scan_thread_pool->join();
-    _remote_scan_thread_pool->join();
     _limited_scan_thread_pool->wait();
 
+    _local_scan_thread_pool->stop();
+    _remote_scan_thread_pool->stop();
+
     LOG(INFO) << "ScannerScheduler stopped";
 }
 
 Status ScannerScheduler::init(ExecEnv* env) {
     // 1. local scan thread pool
-    _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
-            config::doris_scanner_thread_pool_thread_num,
-            config::doris_scanner_thread_pool_queue_size, "local_scan");
+    _local_scan_thread_pool =
+            
std::make_unique<vectorized::SimplifiedScanScheduler>("local_scan", nullptr);
+    Status ret1 = 
_local_scan_thread_pool->start(config::doris_scanner_thread_pool_thread_num,
+                                                 
config::doris_scanner_thread_pool_thread_num,
+                                                 
config::doris_scanner_thread_pool_queue_size);
+    RETURN_IF_ERROR(ret1);
 
     // 2. remote scan thread pool
-    _remote_thread_pool_max_size = 
ScannerScheduler::get_remote_scan_thread_num();
+    _remote_thread_pool_max_thread_num = 
ScannerScheduler::get_remote_scan_thread_num();
     int remote_scan_pool_queue_size = 
ScannerScheduler::get_remote_scan_thread_queue_size();
-    _remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
-            _remote_thread_pool_max_size, remote_scan_pool_queue_size, 
"RemoteScanThreadPool");
+    _remote_scan_thread_pool =
+            
std::make_unique<vectorized::SimplifiedScanScheduler>("RemoteScanThreadPool", 
nullptr);
+    Status ret2 = 
_remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num,
+                                                  
config::doris_scanner_min_thread_pool_thread_num,
+                                                  remote_scan_pool_queue_size);
+    RETURN_IF_ERROR(ret2);
 
     // 3. limited scan thread pool
     RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool")
@@ -127,9 +132,6 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
         return;
     }
 
-    // Submit scanners to thread pool
-    // TODO(cmy): How to handle this "nice"?
-    int nice = 1;
     if (ctx->thread_token != nullptr) {
         std::shared_ptr<ScannerDelegate> scanner_delegate = 
scan_task->scanner.lock();
         if (scanner_delegate == nullptr) {
@@ -163,27 +165,13 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
         TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
         auto sumbit_task = [&]() {
             bool is_local = type == TabletStorageType::STORAGE_TYPE_LOCAL;
-            auto* scan_sched =
+            SimplifiedScanScheduler* scan_sched =
                     is_local ? ctx->get_simple_scan_scheduler() : 
ctx->get_remote_scan_scheduler();
-            auto& thread_pool = is_local ? _local_scan_thread_pool : 
_remote_scan_thread_pool;
-            if (scan_sched) {
-                auto work_func = [scanner_ref = scan_task, ctx]() {
-                    auto status = [&] {
-                        RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, 
scanner_ref));
-                        return Status::OK();
-                    }();
-
-                    if (!status.ok()) {
-                        scanner_ref->set_status(status);
-                        ctx->append_block_to_queue(scanner_ref);
-                    }
-                };
-                SimplifiedScanTask simple_scan_task = {work_func, ctx};
-                return scan_sched->submit_scan_task(simple_scan_task);
+            if (!scan_sched) { // query without workload group
+                scan_sched =
+                        is_local ? _local_scan_thread_pool.get() : 
_remote_scan_thread_pool.get();
             }
-
-            PriorityThreadPool::Task task;
-            task.work_function = [scanner_ref = scan_task, ctx]() {
+            auto work_func = [scanner_ref = scan_task, ctx]() {
                 auto status = [&] {
                     RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
                     return Status::OK();
@@ -194,10 +182,8 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
                     ctx->append_block_to_queue(scanner_ref);
                 }
             };
-            task.priority = nice;
-            return thread_pool->offer(task)
-                           ? Status::OK()
-                           : Status::InternalError("Scan thread pool had 
shutdown");
+            SimplifiedScanTask simple_scan_task = {work_func, ctx};
+            return scan_sched->submit_scan_task(simple_scan_task);
         };
 
         if (auto ret = sumbit_task(); !ret) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 238afc15bf6..ddc61396e23 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -39,6 +39,7 @@ namespace doris::vectorized {
 class ScannerDelegate;
 class ScanTask;
 class ScannerContext;
+class SimplifiedScanScheduler;
 
 // Responsible for the scheduling and execution of all Scanners of a BE node.
 // Execution thread pool
@@ -63,7 +64,7 @@ public:
     std::unique_ptr<ThreadPoolToken> 
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
                                                                  int 
max_concurrency);
 
-    int remote_thread_pool_max_size() const { return 
_remote_thread_pool_max_size; }
+    int remote_thread_pool_max_thread_num() const { return 
_remote_thread_pool_max_thread_num; }
 
     static int get_remote_scan_thread_num();
 
@@ -81,14 +82,14 @@ private:
     // _local_scan_thread_pool is for local scan task(typically, olap scanner)
     // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, 
etc.)
     // _limited_scan_thread_pool is a special pool for queries with resource 
limit
-    std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool;
-    std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
+    std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_local_scan_thread_pool;
+    std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_remote_scan_thread_pool;
     std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
     // true is the scheduler is closed.
     std::atomic_bool _is_closed = {false};
     bool _is_init = false;
-    int _remote_thread_pool_max_size;
+    int _remote_thread_pool_max_thread_num;
 };
 
 struct SimplifiedScanTask {
@@ -193,6 +194,10 @@ public:
         }
     }
 
+    int get_queue_size() { return _scan_thread_pool->get_queue_size(); }
+
+    int get_active_threads() { return _scan_thread_pool->num_active_threads(); 
}
+
     std::vector<int> thread_debug_info() { return 
_scan_thread_pool->debug_info(); }
 
 private:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to