This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 02e3de35f0d [Improment]add internal workload group (#42006)
02e3de35f0d is described below
commit 02e3de35f0d1a2ae6014c3056548639eb951920f
Author: wangbo <[email protected]>
AuthorDate: Thu Nov 14 13:12:02 2024 +0800
[Improment]add internal workload group (#42006)
## Proposed changes
Add an internal workload group when Doris started, currently it mainly
used to manage compaction workload cpu usage.
---
be/src/agent/cgroup_cpu_ctl.cpp | 6 +-
be/src/agent/cgroup_cpu_ctl.h | 2 +-
be/src/agent/topic_subscriber.cpp | 6 +-
be/src/agent/workload_group_listener.cpp | 2 +-
be/src/cloud/cloud_storage_engine.cpp | 31 ++-
be/src/cloud/cloud_storage_engine.h | 2 +-
be/src/olap/olap_server.cpp | 77 ++++---
be/src/olap/storage_engine.h | 5 +-
be/src/pipeline/task_scheduler.h | 4 +-
be/src/runtime/exec_env_init.cpp | 4 +-
be/src/runtime/workload_group/workload_group.cpp | 85 +++++---
be/src/runtime/workload_group/workload_group.h | 25 ++-
.../workload_group/workload_group_manager.cpp | 23 +++
.../workload_group/workload_group_manager.h | 12 ++
be/src/util/threadpool.cpp | 7 +-
be/src/util/threadpool.h | 6 +-
be/src/vec/exec/scan/scanner_scheduler.h | 9 +-
be/src/vec/sink/writer/async_result_writer.cpp | 11 +-
.../doris/analysis/AlterWorkloadGroupStmt.java | 20 +-
.../doris/analysis/CreateWorkloadGroupStmt.java | 18 +-
.../doris/analysis/DropWorkloadGroupStmt.java | 3 -
.../main/java/org/apache/doris/catalog/Env.java | 2 +
.../doris/catalog/InternalSchemaInitializer.java | 5 +-
.../java/org/apache/doris/common/FeConstants.java | 3 +
.../CreateInternalWorkloadGroupThread.java | 55 +++++
.../resource/workloadgroup/WorkloadGroup.java | 59 +++++-
.../resource/workloadgroup/WorkloadGroupMgr.java | 109 +++++++---
.../workloadgroup/WorkloadGroupMgrTest.java | 222 +++++++++++++++++++++
.../apache/doris/utframe/TestWithFeService.java | 1 +
gensrc/thrift/BackendService.thrift | 4 +
.../workload_manager_p0/test_curd_wlg.groovy | 29 +++
31 files changed, 698 insertions(+), 149 deletions(-)
diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index e68535a708c..76b72f2c9d0 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
return _is_enable_cgroup_v2_in_env ? 100 : 1024;
}
-std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t
wg_id) {
+std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t
wg_id) {
if (_is_enable_cgroup_v2_in_env) {
- return std::make_unique<CgroupV2CpuCtl>(wg_id);
+ return std::make_shared<CgroupV2CpuCtl>(wg_id);
} else if (_is_enable_cgroup_v1_in_env) {
- return std::make_unique<CgroupV1CpuCtl>(wg_id);
+ return std::make_shared<CgroupV1CpuCtl>(wg_id);
}
return nullptr;
}
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 84e191159f1..b23f1f4dd9c 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -52,7 +52,7 @@ public:
static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);
- static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
+ static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
static bool is_a_valid_cgroup_path(std::string cg_path);
diff --git a/be/src/agent/topic_subscriber.cpp
b/be/src/agent/topic_subscriber.cpp
index f62bdaef099..b470e1534e1 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const
TPublishTopicRequest& topic_reques
// eg, update workload info may delay other listener, then we need add a
thread here
// to handle_topic_info asynchronous
std::shared_lock lock(_listener_mtx);
- LOG(INFO) << "[topic_publish]begin handle topic info";
for (auto& listener_pair : _registered_listeners) {
if (topic_request.topic_map.find(listener_pair.first) !=
topic_request.topic_map.end()) {
- LOG(INFO) << "[topic_publish]begin handle topic " <<
listener_pair.first
- << ", size=" <<
topic_request.topic_map.at(listener_pair.first).size();
listener_pair.second->handle_topic_info(
topic_request.topic_map.at(listener_pair.first));
- LOG(INFO) << "[topic_publish]finish handle topic " <<
listener_pair.first;
+ LOG(INFO) << "[topic_publish]finish handle topic " <<
listener_pair.first
+ << ", size=" <<
topic_request.topic_map.at(listener_pair.first).size();
}
}
}
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index f0f57869f25..7b688b7dcdf 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -59,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
workload_group_info.enable_cpu_hard_limit);
// 4 create and update task scheduler
- wg->upsert_task_scheduler(&workload_group_info, _exec_env);
+ wg->upsert_task_scheduler(&workload_group_info);
// 5 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 5d7b445917a..dc6abbac31b 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -231,7 +231,7 @@ Result<BaseTabletSPtr>
CloudStorageEngine::get_tablet(int64_t tablet_id) {
});
}
-Status CloudStorageEngine::start_bg_threads() {
+Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup>
wg_sptr) {
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "refresh_s3_info_thread",
[this]() { this->_refresh_storage_vault_info_thread_callback(); },
@@ -266,14 +266,27 @@ Status CloudStorageEngine::start_bg_threads() {
// compaction tasks producer thread
int base_thread_num = get_base_thread_num();
int cumu_thread_num = get_cumu_thread_num();
- RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
- .set_min_threads(base_thread_num)
- .set_max_threads(base_thread_num)
- .build(&_base_compaction_thread_pool));
- RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
- .set_min_threads(cumu_thread_num)
- .set_max_threads(cumu_thread_num)
- .build(&_cumu_compaction_thread_pool));
+ if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
+ RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(base_thread_num)
+ .set_max_threads(base_thread_num)
+
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+ .build(&_base_compaction_thread_pool));
+ RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(cumu_thread_num)
+ .set_max_threads(cumu_thread_num)
+
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+ .build(&_cumu_compaction_thread_pool));
+ } else {
+ RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(base_thread_num)
+ .set_max_threads(base_thread_num)
+ .build(&_base_compaction_thread_pool));
+ RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(cumu_thread_num)
+ .set_max_threads(cumu_thread_num)
+ .build(&_cumu_compaction_thread_pool));
+ }
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "compaction_tasks_producer_thread",
[this]() { this->_compaction_tasks_producer_callback(); },
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 92d2917a916..072b8366542 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -57,7 +57,7 @@ public:
Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) override;
- Status start_bg_threads() override;
+ Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr)
override;
Status set_cluster_id(int32_t cluster_id) override {
_effective_cluster_id = cluster_id;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index a0c5a05636b..736bdaa9930 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -210,7 +210,7 @@ static int32_t
get_single_replica_compaction_threads_num(size_t data_dirs_num) {
return threads_num;
}
-Status StorageEngine::start_bg_threads() {
+Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr)
{
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "unused_rowset_monitor_thread",
[this]() { this->_unused_rowset_monitor_thread_callback(); },
@@ -243,29 +243,60 @@ Status StorageEngine::start_bg_threads() {
auto single_replica_compaction_threads =
get_single_replica_compaction_threads_num(data_dirs.size());
- RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
- .set_min_threads(base_compaction_threads)
- .set_max_threads(base_compaction_threads)
- .build(&_base_compaction_thread_pool));
- RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
- .set_min_threads(cumu_compaction_threads)
- .set_max_threads(cumu_compaction_threads)
- .build(&_cumu_compaction_thread_pool));
- RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
- .set_min_threads(single_replica_compaction_threads)
- .set_max_threads(single_replica_compaction_threads)
- .build(&_single_replica_compaction_thread_pool));
-
- if (config::enable_segcompaction) {
- RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
-
.set_min_threads(config::segcompaction_num_threads)
-
.set_max_threads(config::segcompaction_num_threads)
- .build(&_seg_compaction_thread_pool));
+ if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
+ RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool")
+ .set_min_threads(base_compaction_threads)
+ .set_max_threads(base_compaction_threads)
+
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+ .build(&_base_compaction_thread_pool));
+ RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool")
+ .set_min_threads(cumu_compaction_threads)
+ .set_max_threads(cumu_compaction_threads)
+
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+ .build(&_cumu_compaction_thread_pool));
+
RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool")
+
.set_min_threads(single_replica_compaction_threads)
+
.set_max_threads(single_replica_compaction_threads)
+
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+
.build(&_single_replica_compaction_thread_pool));
+
+ if (config::enable_segcompaction) {
+ RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool")
+
.set_min_threads(config::segcompaction_num_threads)
+
.set_max_threads(config::segcompaction_num_threads)
+
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+ .build(&_seg_compaction_thread_pool));
+ }
+ RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool")
+
.set_min_threads(config::cold_data_compaction_thread_num)
+
.set_max_threads(config::cold_data_compaction_thread_num)
+
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+ .build(&_cold_data_compaction_thread_pool));
+ } else {
+ RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(base_compaction_threads)
+ .set_max_threads(base_compaction_threads)
+ .build(&_base_compaction_thread_pool));
+ RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(cumu_compaction_threads)
+ .set_max_threads(cumu_compaction_threads)
+ .build(&_cumu_compaction_thread_pool));
+
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
+
.set_min_threads(single_replica_compaction_threads)
+
.set_max_threads(single_replica_compaction_threads)
+
.build(&_single_replica_compaction_thread_pool));
+
+ if (config::enable_segcompaction) {
+ RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
+
.set_min_threads(config::segcompaction_num_threads)
+
.set_max_threads(config::segcompaction_num_threads)
+ .build(&_seg_compaction_thread_pool));
+ }
+ RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
+
.set_min_threads(config::cold_data_compaction_thread_num)
+
.set_max_threads(config::cold_data_compaction_thread_num)
+ .build(&_cold_data_compaction_thread_pool));
}
- RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
-
.set_min_threads(config::cold_data_compaction_thread_num)
-
.set_max_threads(config::cold_data_compaction_thread_num)
- .build(&_cold_data_compaction_thread_pool));
// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 421c0eb352d..a2201589898 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -72,6 +72,7 @@ class ReportWorker;
class CreateTabletRRIdxCache;
struct DirInfo;
class SnapshotManager;
+class WorkloadGroup;
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
using SegCompactionCandidatesSharedPtr =
std::shared_ptr<SegCompactionCandidates>;
@@ -105,7 +106,7 @@ public:
virtual bool stopped() = 0;
// start all background threads. This should be call after env is ready.
- virtual Status start_bg_threads() = 0;
+ virtual Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr =
nullptr) = 0;
virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) = 0;
@@ -278,7 +279,7 @@ public:
return _default_rowset_type;
}
- Status start_bg_threads() override;
+ Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr)
override;
// clear trash and snapshot file
// option: update disk usage after sweep
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index bdb5bec1776..3c1b08063df 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -43,7 +43,7 @@ namespace doris::pipeline {
class TaskScheduler {
public:
- TaskScheduler(int core_num, std::string name, CgroupCpuCtl* cgroup_cpu_ctl)
+ TaskScheduler(int core_num, std::string name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
: _task_queue(core_num),
_shutdown(false),
_name(std::move(name)),
@@ -65,7 +65,7 @@ private:
std::vector<bool> _markers;
bool _shutdown;
std::string _name;
- CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+ std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
void _do_work(int index);
};
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 75ec588aa50..0f0b677bb1c 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -276,6 +276,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_pipeline_tracer_ctx =
std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_workload_group_manager = new WorkloadGroupMgr();
+ _workload_group_manager->init_internal_workload_group();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -364,7 +365,8 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
return st;
}
_storage_engine->set_heartbeat_flags(this->heartbeat_flags());
- if (st = _storage_engine->start_bg_threads(); !st.ok()) {
+ WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
+ if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" <<
st;
return st;
}
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index c6a3c07adda..f62179273cf 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -50,7 +50,9 @@ const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
-WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
+WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) :
WorkloadGroup(wg_info, true) {}
+
+WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool
need_create_query_thread_pool)
: _id(tg_info.id),
_name(tg_info.name),
_version(tg_info.version),
@@ -65,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark),
_scan_bytes_per_second(tg_info.read_bytes_per_second),
- _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
+ _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
+ _need_create_query_thread_pool(need_create_query_thread_pool) {
std::vector<DataDirInfo>& data_dir_list =
io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] =
@@ -434,54 +437,60 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.remote_read_bytes_per_second = remote_read_bytes_per_second};
}
-void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv*
exec_env) {
- uint64_t tg_id = tg_info->id;
- std::string tg_name = tg_info->name;
- int cpu_hard_limit = tg_info->cpu_hard_limit;
- uint64_t cpu_shares = tg_info->cpu_share;
- bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
- int scan_thread_num = tg_info->scan_thread_num;
- int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
- int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
+std::weak_ptr<CgroupCpuCtl> WorkloadGroup::get_cgroup_cpu_ctl_wptr() {
+ std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+ return _cgroup_cpu_ctl;
+}
+void WorkloadGroup::create_cgroup_cpu_ctl() {
std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+ create_cgroup_cpu_ctl_no_lock();
+}
+
+void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() {
if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
- std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
+ std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
CgroupCpuCtl::create_cgroup_cpu_ctl(_id);
if (cgroup_cpu_ctl) {
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
- LOG(INFO) << "[upsert wg thread pool] cgroup init success,
wg_id=" << tg_id;
+ LOG(INFO) << "[upsert wg thread pool] cgroup init success,
wg_id=" << _id;
} else {
- LOG(INFO) << "[upsert wg thread pool] cgroup init failed,
wg_id=" << tg_id
+ LOG(INFO) << "[upsert wg thread pool] cgroup init failed,
wg_id=" << _id
<< ", reason=" << ret.to_string();
}
} else {
- LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for "
<< tg_id << " failed";
+ LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl
wg_id=" << _id << " failed";
}
}
+}
- CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
-
+void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+ std::shared_ptr<CgroupCpuCtl>
cg_cpu_ctl_ptr) {
+ uint64_t wg_id = wg_info->id;
+ std::string wg_name = wg_info->name;
+ int scan_thread_num = wg_info->scan_thread_num;
+ int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
+ int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
if (_task_sched == nullptr) {
int32_t executors_size = config::pipeline_executor_size;
if (executors_size <= 0) {
executors_size = CpuInfo::num_cores();
}
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
- std::make_unique<pipeline::TaskScheduler>(executors_size,
"Pipe_" + tg_name,
+ std::make_unique<pipeline::TaskScheduler>(executors_size,
"Pipe_" + wg_name,
cg_cpu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
if (ret.ok()) {
_task_sched = std::move(pipeline_task_scheduler);
} else {
- LOG(INFO) << "[upsert wg thread pool] task scheduler start failed,
gid= " << tg_id;
+ LOG(INFO) << "[upsert wg thread pool] task scheduler start failed,
gid= " << wg_id;
}
}
if (_scan_task_sched == nullptr) {
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
- std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_"
+ tg_name,
+ std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_"
+ wg_name,
cg_cpu_ctl_ptr);
Status ret =
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_thread_num,
@@ -489,7 +498,7 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
if (ret.ok()) {
_scan_task_sched = std::move(scan_scheduler);
} else {
- LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed,
gid=" << tg_id;
+ LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed,
gid=" << wg_id;
}
}
if (scan_thread_num > 0 && _scan_task_sched) {
@@ -501,7 +510,7 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
int remote_scan_thread_queue_size =
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
std::unique_ptr<vectorized::SimplifiedScanScheduler>
remote_scan_scheduler =
- std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_"
+ tg_name,
+ std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_"
+ wg_name,
cg_cpu_ctl_ptr);
Status ret = remote_scan_scheduler->start(remote_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
@@ -510,7 +519,7 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
_remote_scan_task_sched = std::move(remote_scan_scheduler);
} else {
LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start
failed, gid="
- << tg_id;
+ << wg_id;
}
}
if (max_remote_scan_thread_num >= min_remote_scan_thread_num &&
_remote_scan_task_sched) {
@@ -532,7 +541,7 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
: std::min(num_disk * min_threads,
num_cpus *
config::wg_flush_thread_num_per_cpu);
- std::string pool_name = "wg_flush_" + tg_name;
+ std::string pool_name = "wg_flush_" + wg_name;
auto ret = ThreadPoolBuilder(pool_name)
.set_min_threads(min_threads)
.set_max_threads(max_threads)
@@ -540,17 +549,24 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
.build(&thread_pool);
if (!ret.ok()) {
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + "
failed, gid="
- << tg_id;
+ << wg_id;
} else {
_memtable_flush_pool = std::move(thread_pool);
- LOG(INFO) << "[upsert wg thread pool] create " + pool_name + "
succ, gid=" << tg_id
+ LOG(INFO) << "[upsert wg thread pool] create " + pool_name + "
succ, gid=" << wg_id
<< ", max thread num=" << max_threads
<< ", min thread num=" << min_threads;
}
}
}
+}
+
+void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) {
+ uint64_t wg_id = wg_info->id;
+ int cpu_hard_limit = wg_info->cpu_hard_limit;
+ uint64_t cpu_shares = wg_info->cpu_share;
+ bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit;
+ create_cgroup_cpu_ctl_no_lock();
- // step 6: update cgroup cpu if needed
if (_cgroup_cpu_ctl) {
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
@@ -560,15 +576,24 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
} else {
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is "
"illegal: "
- << cpu_hard_limit << ", gid=" << tg_id;
+ << cpu_hard_limit << ", gid=" << wg_id;
}
} else {
_cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
_cgroup_cpu_ctl->update_cpu_hard_limit(
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
}
- _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
-
&(tg_info->cgroup_cpu_hard_limit));
+ _cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares),
+
&(wg_info->cgroup_cpu_hard_limit));
+ }
+}
+
+void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info) {
+ std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+ upsert_cgroup_cpu_ctl_no_lock(wg_info);
+
+ if (_need_create_query_thread_pool) {
+ upsert_thread_pool_no_lock(wg_info, _cgroup_cpu_ctl);
}
}
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 2ba84ce982b..96b8a36df1c 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -58,6 +58,8 @@ class WorkloadGroup : public
std::enable_shared_from_this<WorkloadGroup> {
public:
explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
+ explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool
need_create_query_thread_pool);
+
int64_t version() const { return _version; }
uint64_t cpu_share() const { return _cpu_share.load(); }
@@ -165,7 +167,7 @@ public:
int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool
is_minor_gc);
- void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
+ void upsert_task_scheduler(WorkloadGroupInfo* tg_info);
void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
vectorized::SimplifiedScanScheduler** scan_sched,
@@ -198,18 +200,21 @@ public:
}
int64_t get_remote_scan_bytes_per_second();
- CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() {
- std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
- return _cgroup_cpu_ctl.get();
- }
-
ThreadPool* get_memtable_flush_pool_ptr() {
// no lock here because this is called by memtable flush,
// to avoid lock competition with the workload thread pool's update
return _memtable_flush_pool.get();
}
+ void create_cgroup_cpu_ctl();
+
+ std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();
private:
+ void create_cgroup_cpu_ctl_no_lock();
+ void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
+ void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+ std::shared_ptr<CgroupCpuCtl>
cg_cpu_ctl_ptr);
+
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
const uint64_t _id;
std::string _name;
@@ -240,7 +245,10 @@ private:
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
std::shared_mutex _task_sched_lock;
- std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
+ // _cgroup_cpu_ctl not only used by threadpool which managed by
WorkloadGroup,
+ // but also some global background threadpool which not owned by
WorkloadGroup,
+ // so it should be shared ptr;
+ std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched
{nullptr};
std::unique_ptr<vectorized::SimplifiedScanScheduler>
_remote_scan_task_sched {nullptr};
@@ -249,6 +257,9 @@ private:
std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr};
+ // for some background workload, it doesn't need to create query thread
pool
+ const bool _need_create_query_thread_pool;
+
// bvar metric
std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 927d4d13814..4d32fc8700e 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -34,6 +34,25 @@
namespace doris {
+void WorkloadGroupMgr::init_internal_workload_group() {
+ WorkloadGroupPtr internal_wg = nullptr;
+ {
+ std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+ if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) ==
_workload_groups.end()) {
+ WorkloadGroupInfo internal_wg_info {
+ .id = INTERNAL_WORKLOAD_GROUP_ID,
+ .name = INTERNAL_WORKLOAD_GROUP_NAME,
+ .cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()};
+ internal_wg = std::make_shared<WorkloadGroup>(internal_wg_info,
false);
+ _workload_groups[internal_wg_info.id] = internal_wg;
+ }
+ }
+ DCHECK(internal_wg != nullptr);
+ if (internal_wg) {
+ internal_wg->create_cgroup_cpu_ctl();
+ }
+}
+
WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
const WorkloadGroupInfo& workload_group_info) {
{
@@ -86,6 +105,10 @@ void
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
old_wg_size = _workload_groups.size();
for (auto iter = _workload_groups.begin(); iter !=
_workload_groups.end(); iter++) {
uint64_t wg_id = iter->first;
+ // internal workload group created by BE can not be dropped
+ if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) {
+ continue;
+ }
auto workload_group_ptr = iter->second;
if (used_wg_id.find(wg_id) == used_wg_id.end()) {
workload_group_ptr->shutdown();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index f76e98d2606..18a0687b373 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -36,11 +36,18 @@ class TaskScheduler;
class MultiCoreTaskQueue;
} // namespace pipeline
+// internal_group is used for doris internal workload, currently is mainly
compaction
+const static uint64_t INTERNAL_WORKLOAD_GROUP_ID =
+ static_cast<uint64_t>(TWorkloadType::type::INTERNAL);
+const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal";
+
class WorkloadGroupMgr {
public:
WorkloadGroupMgr() = default;
~WorkloadGroupMgr() = default;
+ void init_internal_workload_group();
+
WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo&
workload_group_info);
void get_related_workload_groups(const std::function<bool(const
WorkloadGroupPtr& ptr)>& pred,
@@ -64,6 +71,11 @@ public:
void get_wg_resource_usage(vectorized::Block* block);
+ WorkloadGroupPtr get_internal_wg() {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
+ }
+
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index 15fb36181d4..f5ea38515de 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -75,7 +75,8 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int
max_queue_size) {
return *this;
}
-ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl*
cgroup_cpu_ctl) {
+ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
+ std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
_cgroup_cpu_ctl = cgroup_cpu_ctl;
return *this;
}
@@ -476,8 +477,8 @@ void ThreadPool::dispatch_thread() {
_num_threads++;
_num_threads_pending_start--;
- if (_cgroup_cpu_ctl != nullptr) {
- static_cast<void>(_cgroup_cpu_ctl->add_thread_to_cgroup());
+ if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr =
_cgroup_cpu_ctl.lock()) {
+ static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
}
// Owned by this worker thread and added/removed from _idle_threads as
needed.
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 5ce27e2f27b..9bd4a7246fb 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -107,7 +107,7 @@ public:
ThreadPoolBuilder& set_min_threads(int min_threads);
ThreadPoolBuilder& set_max_threads(int max_threads);
ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
- ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl);
+ ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl>
cgroup_cpu_ctl);
template <class Rep, class Period>
ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep,
Period>& idle_timeout) {
_idle_timeout =
std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
@@ -133,7 +133,7 @@ private:
int _min_threads;
int _max_threads;
int _max_queue_size;
- CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+ std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
std::chrono::milliseconds _idle_timeout;
ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
@@ -345,7 +345,7 @@ private:
// Protected by _lock.
int _total_queued_tasks;
- CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+ std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
// All allocated tokens.
//
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 56c49368598..7731b3ba8f9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -114,11 +114,8 @@ struct SimplifiedScanTask {
class SimplifiedScanScheduler {
public:
- SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl*
cgroup_cpu_ctl) {
- _is_stop.store(false);
- _cgroup_cpu_ctl = cgroup_cpu_ctl;
- _sched_name = sched_name;
- }
+ SimplifiedScanScheduler(std::string sched_name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+ : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl),
_sched_name(sched_name) {}
~SimplifiedScanScheduler() {
stop();
@@ -217,7 +214,7 @@ public:
private:
std::unique_ptr<ThreadPool> _scan_thread_pool;
std::atomic<bool> _is_stop;
- CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+ std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
std::string _sched_name;
};
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 432ec1c54b5..c17b84b2dbe 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -107,12 +107,13 @@ void AsyncResultWriter::process_block(RuntimeState*
state, RuntimeProfile* profi
force_close(status);
}
- if (state && state->get_query_ctx()) {
- WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group();
- if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) {
- Status ret =
wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup();
+ if (state && state->get_query_ctx() &&
state->get_query_ctx()->workload_group()) {
+ if (auto cg_ctl_sptr =
+
state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) {
+ Status ret = cg_ctl_sptr->add_thread_to_cgroup();
if (ret.ok()) {
- std::string wg_tname = "asyc_wr_" + wg_ptr->name();
+ std::string wg_tname =
+ "asyc_wr_" +
state->get_query_ctx()->workload_group()->name();
Thread::set_self_name(wg_tname);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
index 8cba792dd39..4405da6ce13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
@@ -25,6 +25,10 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+
+import org.apache.commons.lang3.StringUtils;
import java.util.Map;
@@ -55,14 +59,26 @@ public class AlterWorkloadGroupStmt extends DdlStmt
implements NotFallbackInPars
}
if (properties == null || properties.isEmpty()) {
- throw new AnalysisException("Resource group properties can't be
null");
+ throw new AnalysisException("Workload Group properties can't be
empty");
+ }
+
+ if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
+ throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can
not be create or modified ");
+ }
+
+ String tagStr = properties.get(WorkloadGroup.TAG);
+ if (!StringUtils.isEmpty(tagStr) &&
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
+ ||
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+ throw new AnalysisException(
+ WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " +
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ + " group can not set tag");
}
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
- sb.append("ALTER RESOURCE GROUP
'").append(workloadGroupName).append("' ");
+ sb.append("ALTER WORKLOAD GROUP
'").append(workloadGroupName).append("' ");
sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ",
true, false)).append(")");
return sb.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
index fc4f99046d5..dd13542a836 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
@@ -27,6 +27,9 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+
+import org.apache.commons.lang3.StringUtils;
import java.util.Map;
@@ -68,12 +71,19 @@ public class CreateWorkloadGroupStmt extends DdlStmt
implements NotFallbackInPar
FeNameFormat.checkWorkloadGroupName(workloadGroupName);
if (properties == null || properties.isEmpty()) {
- throw new AnalysisException("Resource group properties can't be
null");
+ throw new AnalysisException("Workload Group properties can't be
empty");
+ }
+
+ if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
+ throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can
not be create or modified ");
}
- String wgTag = properties.get(WorkloadGroup.TAG);
- if (wgTag != null) {
- FeNameFormat.checkCommonName("workload group tag", wgTag);
+ String tagStr = properties.get(WorkloadGroup.TAG);
+ if (!StringUtils.isEmpty(tagStr) &&
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
+ ||
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+ throw new AnalysisException(
+ WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " +
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+ + " group can not set tag");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
index e4e3055f128..9356c6b5c4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
@@ -20,7 +20,6 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@@ -50,8 +49,6 @@ public class DropWorkloadGroupStmt extends DdlStmt implements
NotFallbackInParse
if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
}
-
- FeNameFormat.checkWorkloadGroupName(workloadGroupName);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 12ac201eb65..7fb12b9621a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -250,6 +250,7 @@ import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.AdmissionControl;
import org.apache.doris.resource.Tag;
+import
org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
@@ -1874,6 +1875,7 @@ public class Env {
WorkloadSchedPolicyPublisher wpPublisher = new
WorkloadSchedPolicyPublisher(this);
topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();
+ new CreateInternalWorkloadGroupThread().start();
// auto analyze related threads.
statisticsCleaner.start();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 2dc6a90d593..cfc9d85c6a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -244,10 +244,11 @@ public class InternalSchemaInitializer extends Thread {
// statistics
Env.getCurrentEnv().getInternalCatalog().createTable(
buildStatisticsTblStmt(StatisticConstants.TABLE_STATISTIC_TBL_NAME,
- Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id",
"idx_id", "col_id", "part_id")));
+ Lists.newArrayList("id", "catalog_id", "db_id",
"tbl_id", "idx_id", "col_id", "part_id")));
Env.getCurrentEnv().getInternalCatalog().createTable(
buildStatisticsTblStmt(StatisticConstants.PARTITION_STATISTIC_TBL_NAME,
- Lists.newArrayList("catalog_id", "db_id", "tbl_id",
"idx_id", "part_name", "part_id", "col_id")));
+ Lists.newArrayList("catalog_id", "db_id", "tbl_id",
"idx_id", "part_name", "part_id",
+ "col_id")));
// audit table
Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 1c24ca69d4f..214fbe0e410 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -50,6 +50,9 @@ public class FeConstants {
// set to false to disable internal schema db
public static boolean enableInternalSchemaDb = true;
+ // for UT, create internal workload group thread can not start
+ public static boolean shouldCreateInternalWorkloadGroup = true;
+
// default scheduler interval is 10 seconds
public static int default_scheduler_interval_millisecond = 10000;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
new file mode 100644
index 00000000000..7c6d0e3a080
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadgroup;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CreateInternalWorkloadGroupThread extends Thread {
+
+ private static final Logger LOG =
LogManager.getLogger(CreateInternalWorkloadGroupThread.class);
+
+ public CreateInternalWorkloadGroupThread() {
+ super("CreateInternalWorkloadGroupThread");
+ }
+
+ public void run() {
+ if (!FeConstants.shouldCreateInternalWorkloadGroup) {
+ return;
+ }
+ try {
+ Env env = Env.getCurrentEnv();
+ while (!env.isReady()) {
+ Thread.sleep(5000);
+ }
+ if (!env.getWorkloadGroupMgr()
+
.isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) {
+ env.getWorkloadGroupMgr().createInternalWorkloadGroup();
+ LOG.info("create internal workload group succ");
+ } else {
+ LOG.info("internal workload group already exists.");
+ }
+ } catch (Throwable t) {
+ LOG.warn("create internal workload group failed. ", t);
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 44fb98e10ef..7d5e792ef71 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -30,8 +30,10 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TWorkloadGroupInfo;
+import org.apache.doris.thrift.TWorkloadType;
import org.apache.doris.thrift.TopicInfo;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
@@ -43,8 +45,11 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
public class WorkloadGroup implements Writable, GsonPostProcessable {
private static final Logger LOG =
LogManager.getLogger(WorkloadGroup.class);
@@ -79,6 +84,11 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
public static final String REMOTE_READ_BYTES_PER_SECOND =
"remote_read_bytes_per_second";
+ // it's used to define Doris's internal workload group,
+ // currently it is internal, only contains compaction
+ // later more type and workload may be included in the future.
+ public static final String INTERNAL_TYPE = "internal_type";
+
// NOTE(wb): all property is not required, some properties default value
is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit),
enable_memory_overcommit=true
@@ -87,7 +97,10 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
-
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
+
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build();
+
+ public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new
ImmutableMap.Builder<String, Integer>()
+ .put(TWorkloadType.INTERNAL.toString().toLowerCase(),
TWorkloadType.INTERNAL.getValue()).build();
public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
@@ -420,13 +433,31 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
String[] tagArr = tagStr.split(",");
for (String tag : tagArr) {
try {
- FeNameFormat.checkCommonName("workload group tag name",
tag);
+ FeNameFormat.checkCommonName("workload group tag", tag);
} catch (AnalysisException e) {
- throw new DdlException("workload group tag name format is
illegal, " + tagStr);
+ throw new DdlException("tag format is illegal, " + tagStr);
}
}
}
+ // internal workload group is usually created by Doris.
+ // If exception happens here, it means thrift not match
WORKLOAD_TYPE_MAP.
+ String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE);
+ if (!StringUtils.isEmpty(interTypeId)) {
+ int wid = Integer.valueOf(interTypeId);
+ if (TWorkloadType.findByValue(wid) == null) {
+ throw new DdlException("error internal type id: " + wid + ",
current id map:" + WORKLOAD_TYPE_MAP);
+ }
+ }
+
+ }
+
+ Optional<Integer> getInternalTypeId() {
+ String typeIdStr = this.properties.get(INTERNAL_TYPE);
+ if (StringUtils.isEmpty(typeIdStr)) {
+ return Optional.empty();
+ }
+ return Optional.of(Integer.valueOf(typeIdStr));
}
public long getId() {
@@ -535,8 +566,18 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
return cpuHardLimit;
}
- public String getTag() {
- return properties.get(TAG);
+ public Optional<Set<String>> getTag() {
+ String tagStr = properties.get(TAG);
+ if (StringUtils.isEmpty(tagStr)) {
+ return Optional.empty();
+ }
+
+ Set<String> tagSet = new HashSet<>();
+ String[] ss = tagStr.split(",");
+ for (String str : ss) {
+ tagSet.add(str);
+ }
+ return Optional.of(tagSet);
}
@Override
@@ -550,7 +591,13 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
public TopicInfo toTopicInfo() {
TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
- tWorkloadGroupInfo.setId(id);
+ long wgId = this.id;
+ Optional<Integer> internalTypeId = getInternalTypeId();
+ if (internalTypeId.isPresent()) {
+ wgId = internalTypeId.get();
+ }
+ tWorkloadGroupInfo.setId(wgId);
+
tWorkloadGroupInfo.setName(name);
tWorkloadGroupInfo.setVersion(version);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 8464d83bdbc..26798bb1ec3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -42,6 +42,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TWorkloadType;
import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Strings;
@@ -49,7 +50,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
-import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -62,6 +62,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -71,6 +72,12 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
public static final Long DEFAULT_GROUP_ID = 1L;
+ public static final String INTERNAL_GROUP_NAME = "_internal";
+
+ // internal_type_id could be converted to workload group id when Workload
published to BE
+ // refer WorkloadGroup.toTopicInfo
+ public static final Long INTERNAL_TYPE_ID =
Long.valueOf(TWorkloadType.INTERNAL.getValue());
+
public static final ImmutableList<String>
WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
.add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
@@ -375,44 +382,84 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
LOG.info("Create workload group success: {}", workloadGroup);
}
+ public void createInternalWorkloadGroup() {
+ Map<String, String> properties = Maps.newHashMap();
+ // 100 is cgroup v2 default cpu_share value
+ properties.put(WorkloadGroup.CPU_SHARE, "100");
+ properties.put(WorkloadGroup.INTERNAL_TYPE,
String.valueOf(INTERNAL_TYPE_ID));
+ WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(),
INTERNAL_GROUP_NAME, properties);
+ writeLock();
+ try {
+ if (!nameToWorkloadGroup.containsKey(wg.getName())) {
+ nameToWorkloadGroup.put(wg.getName(), wg);
+ idToWorkloadGroup.put(wg.getId(), wg);
+ Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg);
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
// NOTE: used for checking sum value of 100% for cpu_hard_limit and
memory_limit
// when create/alter workload group with same tag.
// when oldWg is null it means caller is an alter stmt.
private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg)
throws DdlException {
- String wgTag = newWg.getTag();
- double sumOfAllMemLimit = 0;
- int sumOfAllCpuHardLimit = 0;
- for (Map.Entry<Long, WorkloadGroup> entry :
idToWorkloadGroup.entrySet()) {
- WorkloadGroup wg = entry.getValue();
- if (!StringUtils.equals(wgTag, wg.getTag())) {
- continue;
- }
+ Optional<Set<String>> newWgTag = newWg.getTag();
+ Set<String> newWgTagSet = null;
+ if (newWgTag.isPresent()) {
+ newWgTagSet = newWgTag.get();
+ } else {
+ newWgTagSet = new HashSet<>();
+ newWgTagSet.add(null);
+ }
- if (oldWg != null && entry.getKey() == oldWg.getId()) {
- continue;
- }
+ for (String newWgOneTag : newWgTagSet) {
+ double sumOfAllMemLimit = 0;
+ int sumOfAllCpuHardLimit = 0;
- if (wg.getCpuHardLimit() > 0) {
- sumOfAllCpuHardLimit += wg.getCpuHardLimit();
- }
- if (wg.getMemoryLimitPercent() > 0) {
- sumOfAllMemLimit += wg.getMemoryLimitPercent();
+ // 1 get sum value of all wg which has same tag without current wg
+ for (Map.Entry<Long, WorkloadGroup> entry :
idToWorkloadGroup.entrySet()) {
+ WorkloadGroup wg = entry.getValue();
+ Optional<Set<String>> wgTag = wg.getTag();
+
+ if (oldWg != null && entry.getKey() == oldWg.getId()) {
+ continue;
+ }
+
+ if (newWgOneTag == null) {
+ if (wgTag.isPresent()) {
+ continue;
+ }
+ } else if (!wgTag.isPresent() ||
(!wgTag.get().contains(newWgOneTag))) {
+ continue;
+ }
+
+ if (wg.getCpuHardLimit() > 0) {
+ sumOfAllCpuHardLimit += wg.getCpuHardLimit();
+ }
+ if (wg.getMemoryLimitPercent() > 0) {
+ sumOfAllMemLimit += wg.getMemoryLimitPercent();
+ }
}
- }
- sumOfAllMemLimit += newWg.getMemoryLimitPercent();
- sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
+ // 2 sum current wg value
+ sumOfAllMemLimit += newWg.getMemoryLimitPercent();
+ sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
- if (sumOfAllMemLimit > 100.0 + 1e-6) {
- throw new DdlException(
- "The sum of all workload group " +
WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag
- + " cannot be greater than 100.0%.");
- }
+ // 3 check total sum
+ if (sumOfAllMemLimit > 100.0 + 1e-6) {
+ throw new DdlException(
+ "The sum of all workload group " +
WorkloadGroup.MEMORY_LIMIT + " within tag " + (
+ newWgTag.isPresent() ? newWgTag.get() : "")
+ + " cannot be greater than 100.0%. current sum
val:" + sumOfAllMemLimit);
+ }
- if (sumOfAllCpuHardLimit > 100) {
- throw new DdlException(
- "sum of all workload group " +
WorkloadGroup.CPU_HARD_LIMIT + " within tag "
- + wgTag + " can not be greater than 100% ");
+ if (sumOfAllCpuHardLimit > 100) {
+ throw new DdlException(
+ "sum of all workload group " +
WorkloadGroup.CPU_HARD_LIMIT + " within tag " + (
+ newWgTag.isPresent()
+ ? newWgTag.get() : "") + " can not be
greater than 100% ");
+ }
}
}
@@ -446,8 +493,8 @@ public class WorkloadGroupMgr extends MasterDaemon
implements Writable, GsonPost
public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws
DdlException {
String workloadGroupName = stmt.getWorkloadGroupName();
- if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
- throw new DdlException("Dropping default workload group " +
workloadGroupName + " is not allowed");
+ if (DEFAULT_GROUP_NAME.equals(workloadGroupName) ||
INTERNAL_GROUP_NAME.equals(workloadGroupName)) {
+ throw new DdlException("Dropping workload group " +
workloadGroupName + " is not allowed");
}
// if a workload group exists in user property, it should not be
dropped
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 5f1e3565966..d729881358e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -235,4 +235,226 @@ public class WorkloadGroupMgrTest {
}
Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare()
== 5);
}
+
+ @Test
+ public void testMultiTagCreateWorkloadGroup() throws UserException {
+ Config.enable_workload_group = true;
+ WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+
+ {
+ String name = "empty_g1";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
+ properties.put(WorkloadGroup.TAG, "");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "empty_g2";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g1";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn1,cn2");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g2";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn3,cn2");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+
+ {
+ String name = "not_empty_g3";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+ properties.put(WorkloadGroup.TAG, "cn2,cn100");
+ try {
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ } catch (DdlException e) {
+ Assert.assertTrue(e.getMessage().contains("The sum of all
workload group " + WorkloadGroup.MEMORY_LIMIT));
+ }
+ }
+
+ {
+ String name = "not_empty_g3";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+ properties.put(WorkloadGroup.TAG, "cn3,cn100");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g5";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+ properties.put(WorkloadGroup.TAG, "cn5");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g6";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn5");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g7";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+ properties.put(WorkloadGroup.TAG, "cn5");
+ try {
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ } catch (DdlException e) {
+ Assert.assertTrue(e.getMessage().contains("The sum of all
workload group " + WorkloadGroup.MEMORY_LIMIT));
+ }
+ }
+
+ }
+
+
+ @Test
+ public void testMultiTagAlterWorkloadGroup() throws UserException {
+ Config.enable_workload_group = true;
+ WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+ {
+ String name = "empty_g1";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
+ properties.put(WorkloadGroup.TAG, "");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "empty_g2";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g1";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn1,cn2");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g2";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn3,cn2");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g3";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn2,cn100");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g3";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+ properties.put(WorkloadGroup.TAG, "cn2,cn100");
+ AlterWorkloadGroupStmt alterStmt = new
AlterWorkloadGroupStmt(name, properties);
+ try {
+ workloadGroupMgr.alterWorkloadGroup(alterStmt);
+ } catch (DdlException e) {
+ Assert.assertTrue(e.getMessage().contains("The sum of all
workload group " + WorkloadGroup.MEMORY_LIMIT));
+ }
+ }
+ }
+
+
+ @Test
+ public void testMultiTagCreateWorkloadGroupWithNoTag() throws
UserException {
+ Config.enable_workload_group = true;
+ WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+
+ {
+ String name = "not_empty_g1";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn1,cn2");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "not_empty_g2";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ properties.put(WorkloadGroup.TAG, "cn3,cn2");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ // create not tag workload group
+ {
+ String name = "no_tag_g1";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+ properties.put(WorkloadGroup.TAG, "");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "no_tag_g2";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+
+ {
+ String name = "no_tag_g3";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ try {
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ } catch (DdlException e) {
+ Assert.assertTrue(e.getMessage().contains("The sum of all
workload group " + WorkloadGroup.MEMORY_LIMIT));
+ }
+ }
+
+ {
+ String name = "no_tag_g3";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+ CreateWorkloadGroupStmt createStmt = new
CreateWorkloadGroupStmt(false, name, properties);
+ workloadGroupMgr.createWorkloadGroup(createStmt);
+ }
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 8e25efdfada..70adbbd7f99 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -153,6 +153,7 @@ public abstract class TestWithFeService {
@BeforeAll
public final void beforeAll() throws Exception {
FeConstants.enableInternalSchemaDb = false;
+ FeConstants.shouldCreateInternalWorkloadGroup = false;
beforeCreatingConnectContext();
connectContext = createDefaultCtx();
beforeCluster();
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index ed0ae243a1d..533999a853f 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -327,6 +327,10 @@ struct TPublishTopicResult {
1: required Status.TStatus status
}
+enum TWorkloadType {
+ INTERNAL = 2
+}
+
struct TGetRealtimeExecStatusRequest {
// maybe query id or other unique id
1: optional Types.TUniqueId id
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 41cc190a017..7807578ea81 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -176,6 +176,30 @@ suite("test_crud_wlg") {
exception "can not be greater than 100%"
}
+ // test alter tag and type
+ test {
+ sql "alter workload group test_group properties ( 'internal_type'='13'
);"
+
+ exception "internal_type can not be create or modified"
+ }
+
+ test {
+ sql "create workload group inter_wg properties('internal_type'='123');"
+ exception "internal_type can not be create or modified"
+ }
+
+ test {
+ sql "alter workload group normal properties ('tag'='123')"
+
+ exception "_internal and normal group can not set tag"
+ }
+
+ test {
+ sql "alter workload group _internal properties ('tag'='123')"
+
+ exception "_internal and normal group can not set tag"
+ }
+
sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%'
);"
qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
qt_cpu_hard_limit_2 "select
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
from information_schema.workload_groups where name in ('normal','test_group')
order by name;"
@@ -492,6 +516,11 @@ suite("test_crud_wlg") {
// test workload group's tag property, cpu_hard_limit
+ test {
+ sql "create workload group tag_test properties('tag'=' a, b , c ');"
+ exception "tag format is illegal"
+ }
+
test {
sql "create workload group if not exists tag1_wg1 properties (
'cpu_hard_limit'='101%', 'tag'='tag1')"
exception "must be a positive integer"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]