This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e41d091a7147c37a3f234d12944c2c11d063aa18
Author: wangbo <[email protected]>
AuthorDate: Mon Feb 26 19:01:46 2024 +0800

    [Improvement](executor)add remote scan thread pool (#31376)
    
    * add remote scan thread pool
    
    * +1
---
 be/src/agent/cgroup_cpu_ctl.cpp                    |   2 +-
 be/src/agent/cgroup_cpu_ctl.h                      |   5 +-
 be/src/agent/workload_group_listener.cpp           |   2 +-
 be/src/runtime/query_context.cpp                   |   4 +-
 be/src/runtime/query_context.h                     |   5 +
 be/src/runtime/task_group/task_group.cpp           | 172 ++++++++++++++++++++-
 be/src/runtime/task_group/task_group.h             |  28 ++++
 be/src/runtime/task_group/task_group_manager.cpp   | 171 +-------------------
 be/src/runtime/task_group/task_group_manager.h     |  23 +--
 be/src/vec/exec/scan/scanner_context.cpp           |   1 +
 be/src/vec/exec/scan/scanner_context.h             |   3 +
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  24 ++-
 be/src/vec/exec/scan/scanner_scheduler.h           |  34 +++-
 .../resource/workloadgroup/WorkloadGroup.java      |  47 +++++-
 .../resource/workloadgroup/WorkloadGroupMgr.java   |   3 +-
 .../doris/tablefunction/MetadataGenerator.java     |  10 +-
 .../WorkloadGroupsTableValuedFunction.java         |   2 +
 gensrc/thrift/BackendService.thrift                |   2 +
 gensrc/thrift/FrontendService.thrift               |   2 +-
 19 files changed, 322 insertions(+), 218 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index c94a3c05f1e..5263c060530 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -172,7 +172,7 @@ Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int 
cpu_shares) {
 
 Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
     int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * 
cpu_hard_limit / 100)
-                                 : CPU_HARD_LIMIT_DEFAULT_VALUE;
+                                 : CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE;
     std::string msg = "modify cpu quota value to " + std::to_string(val);
     return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, 
msg, false);
 }
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 94514c8e2e0..1289f26307b 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -29,10 +29,7 @@
 namespace doris {
 
 // cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
-const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
-
-// cgroup cpu.shares default value
-const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
+const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
 
 class CgroupCpuCtl {
 public:
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index 5f9da64bd2a..1d5a8544e11 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -50,7 +50,7 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
                 task_group_info.enable_cpu_hard_limit);
 
         // 4 create and update task scheduler
-        
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info, 
_exec_env);
+        tg->upsert_task_scheduler(&task_group_info, _exec_env);
 
         LOG(INFO) << "update task group finish, tg info=" << tg->debug_string()
                   << ", enable_cpu_hard_limit="
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index dadb6ada172..2c25d37d14b 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -210,8 +210,8 @@ Status 
QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) {
     // see task_group_manager::delete_task_group_by_ids
     RETURN_IF_ERROR(_task_group->add_query(_query_id));
     _task_group->add_mem_tracker_limiter(query_mem_tracker);
-    _exec_env->task_group_manager()->get_query_scheduler(
-            _task_group->id(), &_task_scheduler, &_scan_task_scheduler, 
&_non_pipe_thread_pool);
+    _task_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
+                                     &_non_pipe_thread_pool, 
&_remote_scan_task_scheduler);
     return Status::OK();
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d7b3813dcef..a639268c552 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -196,6 +196,10 @@ public:
 
     vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return 
_scan_task_scheduler; }
 
+    vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() {
+        return _remote_scan_task_scheduler;
+    }
+
     pipeline::Dependency* get_execution_dependency() { return 
_execution_dependency.get(); }
 
     void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
@@ -283,6 +287,7 @@ private:
     doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
     vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
     ThreadPool* _non_pipe_thread_pool = nullptr;
+    vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
     std::unique_ptr<pipeline::Dependency> _execution_dependency;
 
     std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index ddddf39dbc8..53534444a5d 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -34,6 +34,7 @@
 #include "util/mem_info.h"
 #include "util/parse_util.h"
 #include "util/runtime_profile.h"
+#include "util/threadpool.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 
 namespace doris {
@@ -43,6 +44,7 @@ const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024;
 const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
 const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
 const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
+const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
 
 TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
         : _id(tg_info.id),
@@ -53,16 +55,19 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
           _cpu_share(tg_info.cpu_share),
           _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
           _cpu_hard_limit(tg_info.cpu_hard_limit),
-          _scan_thread_num(tg_info.scan_thread_num) {}
+          _scan_thread_num(tg_info.scan_thread_num),
+          _max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
+          _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num) {}
 
 std::string TaskGroup::debug_string() const {
     std::shared_lock<std::shared_mutex> rl {_mutex};
     return fmt::format(
             "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, 
enable_memory_overcommit = "
-            "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = {}]",
+            "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
+            "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}]",
             _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, 
TUnit::BYTES),
             _enable_memory_overcommit ? "true" : "false", _version, 
cpu_hard_limit(),
-            _scan_thread_num);
+            _scan_thread_num, _max_remote_scan_thread_num, 
_min_remote_scan_thread_num);
 }
 
 void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
@@ -85,6 +90,8 @@ void TaskGroup::check_and_update(const TaskGroupInfo& 
tg_info) {
             _cpu_share = tg_info.cpu_share;
             _cpu_hard_limit = tg_info.cpu_hard_limit;
             _scan_thread_num = tg_info.scan_thread_num;
+            _max_remote_scan_thread_num = tg_info.max_remote_scan_thread_num;
+            _min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num;
         } else {
             return;
         }
@@ -266,8 +273,167 @@ Status TaskGroupInfo::parse_topic_info(const 
TWorkloadGroupInfo& workload_group_
         task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
     }
 
+    // 10 max remote scan thread num
+    task_group_info->max_remote_scan_thread_num = 
config::doris_scanner_thread_pool_thread_num;
+    if (workload_group_info.__isset.max_remote_scan_thread_num &&
+        workload_group_info.max_remote_scan_thread_num > 0) {
+        task_group_info->max_remote_scan_thread_num =
+                workload_group_info.max_remote_scan_thread_num;
+    }
+
+    // 11 min remote scan thread num
+    task_group_info->min_remote_scan_thread_num = 
config::doris_scanner_thread_pool_thread_num;
+    if (workload_group_info.__isset.min_remote_scan_thread_num &&
+        workload_group_info.min_remote_scan_thread_num > 0) {
+        task_group_info->min_remote_scan_thread_num =
+                workload_group_info.min_remote_scan_thread_num;
+    }
+
     return Status::OK();
 }
 
+void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, 
ExecEnv* exec_env) {
+    uint64_t tg_id = tg_info->id;
+    std::string tg_name = tg_info->name;
+    int cpu_hard_limit = tg_info->cpu_hard_limit;
+    uint64_t cpu_shares = tg_info->cpu_share;
+    bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
+    int scan_thread_num = tg_info->scan_thread_num;
+    int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
+    int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
+
+    std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
+        std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_unique<CgroupV1CpuCtl>(tg_id);
+        Status ret = cgroup_cpu_ctl->init();
+        if (ret.ok()) {
+            _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
+            LOG(INFO) << "[upsert wg thread pool] cgroup init success";
+        } else {
+            LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " 
<< tg_id
+                      << ", reason=" << ret.to_string();
+        }
+    }
+
+    CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
+
+    if (_task_sched == nullptr) {
+        int32_t executors_size = config::pipeline_executor_size;
+        if (executors_size <= 0) {
+            executors_size = CpuInfo::num_cores();
+        }
+        auto task_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
+        std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
+                std::make_unique<pipeline::TaskScheduler>(
+                        exec_env, exec_env->get_global_block_scheduler(), 
std::move(task_queue),
+                        "Pipe_" + tg_name, cg_cpu_ctl_ptr);
+        Status ret = pipeline_task_scheduler->start();
+        if (ret.ok()) {
+            _task_sched = std::move(pipeline_task_scheduler);
+        } else {
+            LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << tg_id;
+        }
+    }
+
+    if (_scan_task_sched == nullptr) {
+        std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
+                std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ tg_name,
+                                                                      
cg_cpu_ctl_ptr);
+        Status ret = scan_scheduler->start();
+        if (ret.ok()) {
+            _scan_task_sched = std::move(scan_scheduler);
+        } else {
+            LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << tg_id;
+        }
+    }
+    if (scan_thread_num > 0 && _scan_task_sched) {
+        _scan_task_sched->reset_thread_num(scan_thread_num);
+    }
+
+    if (_remote_scan_task_sched == nullptr) {
+        std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
+                std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ tg_name,
+                                                                      
cg_cpu_ctl_ptr);
+        Status ret = remote_scan_scheduler->start();
+        if (ret.ok()) {
+            _remote_scan_task_sched = std::move(remote_scan_scheduler);
+        } else {
+            LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start 
failed, gid="
+                      << tg_id;
+        }
+    }
+    if (max_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
+        
_remote_scan_task_sched->reset_max_thread_num(max_remote_scan_thread_num);
+    }
+    if (min_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
+        
_remote_scan_task_sched->reset_min_thread_num(min_remote_scan_thread_num);
+    }
+
+    if (_non_pipe_thread_pool == nullptr) {
+        std::unique_ptr<ThreadPool> thread_pool = nullptr;
+        auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
+                           .set_min_threads(1)
+                           
.set_max_threads(config::fragment_pool_thread_num_max)
+                           
.set_max_queue_size(config::fragment_pool_queue_size)
+                           .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr)
+                           .build(&thread_pool);
+        if (!ret.ok()) {
+            LOG(INFO) << "[upsert wg thread pool] create non-pipline thread 
pool failed, gid="
+                      << tg_id;
+        } else {
+            _non_pipe_thread_pool = std::move(thread_pool);
+        }
+    }
+
+    // step 6: update cgroup cpu if needed
+    if (_cgroup_cpu_ctl) {
+        if (enable_cpu_hard_limit) {
+            if (cpu_hard_limit > 0) {
+                _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
+                
_cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+            } else {
+                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is illegal: "
+                          << cpu_hard_limit << ", gid=" << tg_id;
+            }
+        } else {
+            if (config::enable_cgroup_cpu_soft_limit) {
+                _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
+                _cgroup_cpu_ctl->update_cpu_hard_limit(
+                        CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard 
limit
+            }
+        }
+        _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
+                                             
&(tg_info->cgroup_cpu_hard_limit));
+    }
+}
+
+void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler** 
exec_sched,
+                                    vectorized::SimplifiedScanScheduler** 
scan_sched,
+                                    ThreadPool** non_pipe_thread_pool,
+                                    vectorized::SimplifiedScanScheduler** 
remote_scan_sched) {
+    std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+    *exec_sched = _task_sched.get();
+    *scan_sched = _scan_task_sched.get();
+    *remote_scan_sched = _remote_scan_task_sched.get();
+    *non_pipe_thread_pool = _non_pipe_thread_pool.get();
+}
+
+void TaskGroup::try_stop_schedulers() {
+    std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+    if (_task_sched) {
+        _task_sched->stop();
+    }
+    if (_scan_task_sched) {
+        _scan_task_sched->stop();
+    }
+    if (_remote_scan_task_sched) {
+        _remote_scan_task_sched->stop();
+    }
+    if (_non_pipe_thread_pool) {
+        _non_pipe_thread_pool->shutdown();
+        _non_pipe_thread_pool->wait();
+    }
+}
+
 } // namespace taskgroup
 } // namespace doris
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 7604ee45121..c54fce2ab65 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -36,9 +36,17 @@ namespace doris {
 
 class MemTrackerLimiter;
 class RuntimeProfile;
+class ThreadPool;
+class ExecEnv;
+class CgroupCpuCtl;
+
+namespace vectorized {
+class SimplifiedScanScheduler;
+}
 
 namespace pipeline {
 class PipelineTask;
+class TaskScheduler;
 } // namespace pipeline
 
 namespace taskgroup {
@@ -121,6 +129,15 @@ public:
 
     int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile);
 
+    void upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* 
exec_env);
+
+    void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
+                             vectorized::SimplifiedScanScheduler** scan_sched,
+                             ThreadPool** non_pipe_thread_pool,
+                             vectorized::SimplifiedScanScheduler** 
remote_scan_sched);
+
+    void try_stop_schedulers();
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
@@ -132,12 +149,21 @@ private:
     std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
     std::atomic<int> _cpu_hard_limit;
     std::atomic<int> _scan_thread_num;
+    std::atomic<int> _max_remote_scan_thread_num;
+    std::atomic<int> _min_remote_scan_thread_num;
 
     // means task group is mark dropped
     // new query can not submit
     // waiting running query to be cancelled or finish
     bool _is_shutdown = false;
     std::unordered_set<TUniqueId> _query_id_set;
+
+    std::shared_mutex _task_sched_lock;
+    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr;
+    std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched = nullptr;
+    std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched = 
nullptr;
+    std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_remote_scan_task_sched = nullptr;
+    std::unique_ptr<ThreadPool> _non_pipe_thread_pool = nullptr;
 };
 
 using TaskGroupPtr = std::shared_ptr<TaskGroup>;
@@ -152,6 +178,8 @@ struct TaskGroupInfo {
     int cpu_hard_limit;
     bool enable_cpu_hard_limit;
     int scan_thread_num;
+    int max_remote_scan_thread_num;
+    int min_remote_scan_thread_num;
     // log cgroup cpu info
     uint64_t cgroup_cpu_shares = 0;
     int cgroup_cpu_hard_limit = 0;
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index b0b84a0eb89..819e63c855d 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -21,9 +21,10 @@
 #include <mutex>
 
 #include "pipeline/task_scheduler.h"
-#include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/task_group/task_group.h"
+#include "util/threadpool.h"
+#include "util/time.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 
 namespace doris::taskgroup {
@@ -68,126 +69,6 @@ TaskGroupPtr 
TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
     return nullptr;
 }
 
-void TaskGroupManager::get_query_scheduler(uint64_t tg_id,
-                                           doris::pipeline::TaskScheduler** 
exec_sched,
-                                           
vectorized::SimplifiedScanScheduler** scan_sched,
-                                           ThreadPool** non_pipe_thread_pool) {
-    std::shared_lock<std::shared_mutex> r_lock(_task_scheduler_lock);
-    auto tg_sche_it = _tg_sche_map.find(tg_id);
-    if (tg_sche_it != _tg_sche_map.end()) {
-        *exec_sched = tg_sche_it->second.get();
-    }
-
-    auto tg_scan_sche_it = _tg_scan_sche_map.find(tg_id);
-    if (tg_scan_sche_it != _tg_scan_sche_map.end()) {
-        *scan_sched = tg_scan_sche_it->second.get();
-    }
-
-    auto non_pipe_thread_pool_iter = _non_pipe_thread_pool_map.find(tg_id);
-    if (non_pipe_thread_pool_iter != _non_pipe_thread_pool_map.end()) {
-        *non_pipe_thread_pool = non_pipe_thread_pool_iter->second.get();
-    }
-}
-
-void TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* 
tg_info,
-                                                ExecEnv* exec_env) {
-    uint64_t tg_id = tg_info->id;
-    std::string tg_name = tg_info->name;
-    int cpu_hard_limit = tg_info->cpu_hard_limit;
-    uint64_t cpu_shares = tg_info->cpu_share;
-    bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
-    int scan_thread_num = tg_info->scan_thread_num;
-
-    std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
-    // step 1: init cgroup cpu controller
-    CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
-    if (config::doris_cgroup_cpu_path != "" &&
-        _cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
-        std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_unique<CgroupV1CpuCtl>(tg_id);
-        Status ret = cgroup_cpu_ctl->init();
-        if (ret.ok()) {
-            cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
-            _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
-            LOG(INFO) << "[upsert wg thread pool] cgroup init success";
-        } else {
-            LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " 
<< tg_id
-                      << ", reason=" << ret.to_string();
-        }
-    }
-
-    // step 2: init task scheduler
-    if (_tg_sche_map.find(tg_id) == _tg_sche_map.end()) {
-        int32_t executors_size = config::pipeline_executor_size;
-        if (executors_size <= 0) {
-            executors_size = CpuInfo::num_cores();
-        }
-        auto task_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
-
-        auto pipeline_task_scheduler = 
std::make_unique<pipeline::TaskScheduler>(
-                exec_env, exec_env->get_global_block_scheduler(), 
std::move(task_queue),
-                "Exec_" + tg_name, cg_cu_ctl_ptr);
-        Status ret = pipeline_task_scheduler->start();
-        if (ret.ok()) {
-            _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
-        } else {
-            LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << tg_id;
-        }
-    }
-
-    // step 3: init scan scheduler
-    if (_tg_scan_sche_map.find(tg_id) == _tg_scan_sche_map.end()) {
-        auto scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>(tg_name, 
cg_cu_ctl_ptr);
-        Status ret = scan_scheduler->start();
-        if (ret.ok()) {
-            _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
-        } else {
-            LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << tg_id;
-        }
-    }
-    if (scan_thread_num > 0 && _tg_scan_sche_map.find(tg_id) != 
_tg_scan_sche_map.end()) {
-        _tg_scan_sche_map.at(tg_id)->reset_thread_num(scan_thread_num);
-    }
-
-    // step 4: init non-pipe scheduler
-    if (_non_pipe_thread_pool_map.find(tg_id) == 
_non_pipe_thread_pool_map.end()) {
-        std::unique_ptr<ThreadPool> thread_pool = nullptr;
-        auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
-                           .set_min_threads(1)
-                           
.set_max_threads(config::fragment_pool_thread_num_max)
-                           
.set_max_queue_size(config::fragment_pool_queue_size)
-                           .set_cgroup_cpu_ctl(cg_cu_ctl_ptr)
-                           .build(&thread_pool);
-        if (!ret.ok()) {
-            LOG(INFO) << "[upsert wg thread pool] create non-pipline thread 
pool failed, gid="
-                      << tg_id;
-        } else {
-            _non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool));
-        }
-    }
-
-    // step 5: update cgroup cpu if needed
-    if (_cgroup_ctl_map.find(tg_id) != _cgroup_ctl_map.end()) {
-        if (enable_cpu_hard_limit) {
-            if (cpu_hard_limit > 0) {
-                
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
-                
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
-            } else {
-                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is illegal: "
-                          << cpu_hard_limit << ", gid=" << tg_id;
-            }
-        } else {
-            if (config::enable_cgroup_cpu_soft_limit) {
-                _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
-                _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
-                        CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard 
limit
-            }
-        }
-        
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
-                                                       
&(tg_info->cgroup_cpu_hard_limit));
-    }
-}
-
 void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) 
{
     int64_t begin_time = MonotonicMillis();
     // 1 get delete group without running queries
@@ -209,45 +90,11 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
     }
 
     // 2 stop active thread
-    std::vector<doris::pipeline::TaskScheduler*> task_sched_to_stop;
-    std::vector<vectorized::SimplifiedScanScheduler*> scan_task_sched_to_stop;
-    std::vector<ThreadPool*> non_pip_thread_pool_to_stop;
-    {
-        std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
-        for (uint64_t tg_id : deleted_tg_ids) {
-            if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
-                task_sched_to_stop.emplace_back(_tg_sche_map.at(tg_id).get());
-            }
-            if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
-                
scan_task_sched_to_stop.emplace_back(_tg_scan_sche_map.at(tg_id).get());
-            }
-            if (_non_pipe_thread_pool_map.find(tg_id) != 
_non_pipe_thread_pool_map.end()) {
-                
non_pip_thread_pool_to_stop.emplace_back(_non_pipe_thread_pool_map.at(tg_id).get());
-            }
-        }
-    }
-    for (auto* ptr1 : task_sched_to_stop) {
-        ptr1->stop();
-    }
-    for (auto* ptr2 : scan_task_sched_to_stop) {
-        ptr2->stop();
-    }
-    for (auto& ptr3 : non_pip_thread_pool_to_stop) {
-        ptr3->shutdown();
-        ptr3->wait();
+    for (uint64_t tg_id : deleted_tg_ids) {
+        _task_groups.at(tg_id)->try_stop_schedulers();
     }
 
     // 3 release resource in memory
-    {
-        std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
-        for (uint64_t tg_id : deleted_tg_ids) {
-            _tg_sche_map.erase(tg_id);
-            _tg_scan_sche_map.erase(tg_id);
-            _cgroup_ctl_map.erase(tg_id);
-            _non_pipe_thread_pool_map.erase(tg_id);
-        }
-    }
-
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
         for (uint64_t tg_id : deleted_tg_ids) {
@@ -286,14 +133,8 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
 }
 
 void TaskGroupManager::stop() {
-    for (auto& task_sche : _tg_sche_map) {
-        task_sche.second->stop();
-    }
-    for (auto& task_sche : _tg_scan_sche_map) {
-        task_sche.second->stop();
-    }
-    for (auto& no_pip_sche : _non_pipe_thread_pool_map) {
-        no_pip_sche.second->shutdown();
+    for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) 
{
+        iter->second->try_stop_schedulers();
     }
 }
 
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 29e5c30a596..21772bd3bca 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -21,18 +21,11 @@
 #include <shared_mutex>
 #include <unordered_map>
 
-#include "pipeline/task_queue.h"
-#include "pipeline/task_scheduler.h"
 #include "task_group.h"
 
 namespace doris {
-class ExecEnv;
-class QueryContext;
-class CgroupCpuCtl;
 
-namespace vectorized {
-class SimplifiedScanScheduler;
-}
+class CgroupCpuCtl;
 
 namespace pipeline {
 class TaskScheduler;
@@ -51,8 +44,6 @@ public:
     void get_related_taskgroups(const std::function<bool(const TaskGroupPtr& 
ptr)>& pred,
                                 std::vector<TaskGroupPtr>* task_groups);
 
-    void upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* 
exec_env);
-
     void delete_task_group_by_ids(std::set<uint64_t> id_set);
 
     TaskGroupPtr get_task_group_by_id(uint64_t tg_id);
@@ -65,22 +56,10 @@ public:
 
     bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
 
-    void get_query_scheduler(uint64_t tg_id, doris::pipeline::TaskScheduler** 
exec_sched,
-                             vectorized::SimplifiedScanScheduler** scan_sched,
-                             ThreadPool** non_pipe_thread_pool);
-
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
 
-    // map for workload group id and task scheduler pool
-    // used for cpu hard limit
-    std::shared_mutex _task_scheduler_lock;
-    std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> 
_tg_sche_map;
-    std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> 
_tg_scan_sche_map;
-    std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;
-    std::map<uint64_t, std::unique_ptr<ThreadPool>> _non_pipe_thread_pool_map;
-
     std::shared_mutex _init_cg_ctl_lock;
     std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
     bool _is_init_succ = false;
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 1edf77798d0..82e95708ef8 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -128,6 +128,7 @@ Status ScannerContext::init() {
         if (_simple_scan_scheduler) {
             _should_reset_thread_name = false;
         }
+        _remote_scan_task_scheduler = 
_state->get_query_ctx()->get_remote_scan_scheduler();
     }
 #endif
 
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 393920a7462..1052e77ae0a 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -147,6 +147,8 @@ public:
 
     SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
+    SimplifiedScanScheduler* get_remote_scan_scheduler() { return 
_remote_scan_task_scheduler; }
+
     void stop_scanners(RuntimeState* state);
 
     int32_t get_max_thread_num() const { return _max_thread_num; }
@@ -205,6 +207,7 @@ protected:
     int64_t _max_bytes_in_queue;
     doris::vectorized::ScannerScheduler* _scanner_scheduler;
     SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
+    SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
     moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners;
     int32_t _num_scheduled_scanners = 0;
     int32_t _num_finished_scanners = 0;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 75bd04b0e7a..dd3be9f8f3f 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -157,12 +157,12 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
         TabletStorageType type = 
scanner_delegate->_scanner->get_storage_type();
         bool ret = false;
         if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
-            if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+            if (auto* scan_sched = ctx->get_simple_scan_scheduler()) {
                 auto work_func = [this, scanner_ref = scan_task, ctx]() {
                     this->_scanner_scan(ctx, scanner_ref);
                 };
                 SimplifiedScanTask simple_scan_task = {work_func, ctx};
-                ret = scan_sche->submit_scan_task(simple_scan_task);
+                ret = scan_sched->submit_scan_task(simple_scan_task);
             } else {
                 PriorityThreadPool::Task task;
                 task.work_function = [this, scanner_ref = scan_task, ctx]() {
@@ -172,12 +172,20 @@ void 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
                 ret = _local_scan_thread_pool->offer(task);
             }
         } else {
-            PriorityThreadPool::Task task;
-            task.work_function = [this, scanner_ref = scan_task, ctx]() {
-                this->_scanner_scan(ctx, scanner_ref);
-            };
-            task.priority = nice;
-            ret = _remote_scan_thread_pool->offer(task);
+            if (auto* remote_scan_sched = ctx->get_remote_scan_scheduler()) {
+                auto work_func = [this, scanner_ref = scan_task, ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
+                };
+                SimplifiedScanTask simple_scan_task = {work_func, ctx};
+                ret = remote_scan_sched->submit_scan_task(simple_scan_task);
+            } else {
+                PriorityThreadPool::Task task;
+                task.work_function = [this, scanner_ref = scan_task, ctx]() {
+                    this->_scanner_scan(ctx, scanner_ref);
+                };
+                task.priority = nice;
+                ret = _remote_scan_thread_pool->offer(task);
+            }
         }
         if (!ret) {
             scan_task->set_status(
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 746aa34ff9a..f3f9caaa4d3 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -101,15 +101,15 @@ struct SimplifiedScanTask {
 
 class SimplifiedScanScheduler {
 public:
-    SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl) 
{
+    SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* 
cgroup_cpu_ctl) {
         _is_stop.store(false);
         _cgroup_cpu_ctl = cgroup_cpu_ctl;
-        _wg_name = wg_name;
+        _sched_name = sched_name;
     }
 
     ~SimplifiedScanScheduler() {
         stop();
-        LOG(INFO) << "Scanner sche " << _wg_name << " shutdown";
+        LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
     }
 
     void stop() {
@@ -119,7 +119,7 @@ public:
     }
 
     Status start() {
-        RETURN_IF_ERROR(ThreadPoolBuilder("Scan_" + _wg_name)
+        RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name)
                                 
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
                                 
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
                                 .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
@@ -131,7 +131,7 @@ public:
         if (!_is_stop) {
             return _scan_thread_pool->submit_func([scan_task] { 
scan_task.scan_func(); });
         } else {
-            return Status::InternalError<false>("scanner pool {} is 
shutdown.", _wg_name);
+            return Status::InternalError<false>("scanner pool {} is 
shutdown.", _sched_name);
         }
     }
 
@@ -148,11 +148,33 @@ public:
         }
     }
 
+    void reset_max_thread_num(int thread_num) {
+        int max_thread_num = _scan_thread_pool->max_threads();
+
+        if (max_thread_num != thread_num) {
+            Status st = _scan_thread_pool->set_max_threads(thread_num);
+            if (!st.ok()) {
+                LOG(INFO) << "reset max thread num failed, sche name=" << 
_sched_name;
+            }
+        }
+    }
+
+    void reset_min_thread_num(int thread_num) {
+        int min_thread_num = _scan_thread_pool->min_threads();
+
+        if (min_thread_num != thread_num) {
+            Status st = _scan_thread_pool->set_min_threads(thread_num);
+            if (!st.ok()) {
+                LOG(INFO) << "reset min thread num failed, sche name=" << 
_sched_name;
+            }
+        }
+    }
+
 private:
     std::unique_ptr<ThreadPool> _scan_thread_pool;
     std::atomic<bool> _is_stop;
     CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
-    std::string _wg_name;
+    std::string _sched_name;
 };
 
 } // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 0def933c8cf..b14b3afec03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -63,12 +63,17 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public static final String SCAN_THREAD_NUM = "scan_thread_num";
 
+    public static final String MAX_REMOTE_SCAN_THREAD_NUM = 
"max_remote_scan_thread_num";
+
+    public static final String MIN_REMOTE_SCAN_THREAD_NUM = 
"min_remote_scan_thread_num";
+
     // NOTE(wb): all property is not required, some properties default value 
is set in be
     // default value is as followed
     // cpu_share=1024, memory_limit=0%(0 means not limit), 
enable_memory_overcommit=true
     private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>()
             
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
-            
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM).build();
+            
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
+            
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM).build();
 
     @SerializedName(value = "id")
     private long id;
@@ -225,6 +230,32 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             }
         }
 
+        if (properties.containsKey(MAX_REMOTE_SCAN_THREAD_NUM)) {
+            String value = properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
+            try {
+                int intValue = Integer.parseInt(value);
+                if (intValue <= 0 && intValue != -1) {
+                    throw new NumberFormatException();
+                }
+            } catch (NumberFormatException e) {
+                throw new DdlException(
+                        MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive 
integer or -1. but input value is " + value);
+            }
+        }
+
+        if (properties.containsKey(MIN_REMOTE_SCAN_THREAD_NUM)) {
+            String value = properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
+            try {
+                int intValue = Integer.parseInt(value);
+                if (intValue <= 0 && intValue != -1) {
+                    throw new NumberFormatException();
+                }
+            } catch (NumberFormatException e) {
+                throw new DdlException(
+                        MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive 
integer or -1. but input value is " + value);
+            }
+        }
+
         // check queue property
         if (properties.containsKey(MAX_CONCURRENCY)) {
             try {
@@ -309,6 +340,10 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                 row.add("true");
             } else if (SCAN_THREAD_NUM.equals(key) && 
!properties.containsKey(key)) {
                 row.add("-1");
+            } else if (MAX_REMOTE_SCAN_THREAD_NUM.equals(key) && 
!properties.containsKey(key)) {
+                row.add("-1");
+            }  else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && 
!properties.containsKey(key)) {
+                row.add("-1");
             } else {
                 row.add(properties.get(key));
             }
@@ -369,6 +404,16 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             
tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr));
         }
 
+        String maxRemoteScanThreadNumStr = 
properties.get(MAX_REMOTE_SCAN_THREAD_NUM);
+        if (maxRemoteScanThreadNumStr != null) {
+            
tWorkloadGroupInfo.setMaxRemoteScanThreadNum(Integer.parseInt(maxRemoteScanThreadNumStr));
+        }
+
+        String minRemoteScanThreadNumStr = 
properties.get(MIN_REMOTE_SCAN_THREAD_NUM);
+        if (minRemoteScanThreadNumStr != null) {
+            
tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr));
+        }
+
         TopicInfo topicInfo = new TopicInfo();
         topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
         return topicInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 60d7e296ed3..a7c26f7cec5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -70,7 +70,8 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
             
.add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE)
             .add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT)
-            .add(WorkloadGroup.SCAN_THREAD_NUM)
+            
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
+            .add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
             
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
             .build();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 6feb8fa94f4..8a3df743e24 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -378,14 +378,18 @@ public class MetadataGenerator {
             trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(1)));             // name
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share
             trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(3)));             // mem_limit
-            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(4)));             //mem overcommit
+            trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(4)));             // mem overcommit
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
             trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
             trow.addToColumnValue(new 
TCell().setStringVal(rGroupsInfo.get(8)));             // cpu hard limit
             trow.addToColumnValue(new 
TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(9)))); // scan thread num
-            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // running query num
-            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); // waiting query num
+            // max remote scan thread num
+            trow.addToColumnValue(new 
TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(10))));
+            // min remote scan thread num
+            trow.addToColumnValue(new 
TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(11))));
+            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(12)))); // running query num
+            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(13)))); // waiting query num
             dataBatch.add(trow);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java
index a50fb7ca853..27e011d60d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java
@@ -50,6 +50,8 @@ public class WorkloadGroupsTableValuedFunction extends 
MetadataTableValuedFuncti
             new Column(WorkloadGroup.QUEUE_TIMEOUT, 
ScalarType.createType(PrimitiveType.BIGINT)),
             new Column(WorkloadGroup.CPU_HARD_LIMIT, 
ScalarType.createStringType()),
             new Column(WorkloadGroup.SCAN_THREAD_NUM, 
ScalarType.createType(PrimitiveType.INT)),
+            new Column(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM, 
ScalarType.createType(PrimitiveType.INT)),
+            new Column(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM, 
ScalarType.createType(PrimitiveType.INT)),
             new Column(QueryQueue.RUNNING_QUERY_NUM, 
ScalarType.createType(PrimitiveType.BIGINT)),
             new Column(QueryQueue.WAITING_QUERY_NUM, 
ScalarType.createType(PrimitiveType.BIGINT)));
 
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 24edaefc103..b803618af49 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -177,6 +177,8 @@ struct TWorkloadGroupInfo {
   7: optional bool enable_memory_overcommit
   8: optional bool enable_cpu_hard_limit
   9: optional i32 scan_thread_num
+  10: optional i32 max_remote_scan_thread_num
+  11: optional i32 min_remote_scan_thread_num
 }
 
 enum TWorkloadMetricType {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 88cf25b9c67..1e433d42f5d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -412,7 +412,7 @@ struct TQueryStatistics {
 
 struct TReportWorkloadRuntimeStatusParams {
     1: optional i64 backend_id
-    2: map<string, TQueryStatistics> query_statistics_map
+    2: optional map<string, TQueryStatistics> query_statistics_map
 }
 
 // The results of an INSERT query, sent to the coordinator as part of


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to