This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2b1ec70afd0 [Feature](executor)Workload Group support Non-Pipeline
Execution (#30164)
2b1ec70afd0 is described below
commit 2b1ec70afd045ed421fcb346c1597d3e4931840b
Author: wangbo <[email protected]>
AuthorDate: Mon Jan 22 20:25:28 2024 +0800
[Feature](executor)Workload Group support Non-Pipeline Execution (#30164)
---
be/src/agent/workload_group_listener.cpp | 6 ++-
be/src/common/config.cpp | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 7 +--
.../pipeline_x/pipeline_x_fragment_context.cpp | 5 +-
be/src/runtime/fragment_mgr.cpp | 61 +++++++++-------------
be/src/runtime/query_context.cpp | 25 +++++++++
be/src/runtime/query_context.h | 13 +++--
be/src/runtime/task_group/task_group_manager.cpp | 57 ++++++++++++++++----
be/src/runtime/task_group/task_group_manager.h | 9 ++--
.../workload_sched_policy_mgr.cpp | 1 -
be/src/vec/exec/scan/scanner_context.h | 2 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 3 ++
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +-
gensrc/thrift/PaloInternalService.thrift | 16 +++---
14 files changed, 132 insertions(+), 77 deletions(-)
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index 6ea7c28669c..237d6c77274 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -26,6 +26,7 @@ namespace doris {
void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>&
topic_info_list) {
std::set<uint64_t> current_wg_ids;
+ bool is_set_cgroup_path = config::doris_cgroup_cpu_path != "";
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
@@ -52,7 +53,7 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
// 4 create and update task scheduler
Status ret2 =
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
_exec_env);
- if (!ret2.ok()) {
+ if (is_set_cgroup_path && !ret2.ok()) {
LOG(INFO) << "upsert task sche failed, tg_id=" <<
task_group_info.id
<< ", reason=" << ret2.to_string();
}
@@ -63,7 +64,8 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
<< ", cgroup cpu_shares=" <<
task_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" <<
task_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
- << (config::enable_cgroup_cpu_soft_limit ? "true" : "false");
+ << (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
+ << ", is set cgroup path=" << (is_set_cgroup_path ? "true" :
"flase");
}
_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5468ac29a02..271ad72410d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1133,7 +1133,7 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");
-DEFINE_mBool(enable_cgroup_cpu_soft_limit, "false");
+DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 538a2ce1bdb..909039b23fb 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -718,12 +718,7 @@ Status PipelineFragmentContext::submit() {
int submit_tasks = 0;
Status st;
- auto* scheduler = _exec_env->pipeline_task_scheduler();
- if (_query_ctx->get_task_scheduler()) {
- scheduler = _query_ctx->get_task_scheduler();
- } else if (_task_group_entity &&
_query_ctx->use_task_group_for_cpu_limit.load()) {
- scheduler = _exec_env->pipeline_task_group_scheduler();
- }
+ auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
st = scheduler->schedule_task(task.get());
if (!st) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a44db667450..4a16b97b2f3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1219,10 +1219,7 @@ Status PipelineXFragmentContext::submit() {
int submit_tasks = 0;
Status st;
- auto* scheduler = _exec_env->pipeline_task_scheduler();
- if (_task_group_entity) {
- scheduler = _exec_env->pipeline_task_group_scheduler();
- }
+ auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
for (auto& task : _tasks) {
for (auto& t : task) {
st = scheduler->schedule_task(t.get());
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0fbaacb4fa2..a76c7687f02 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -677,41 +677,27 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
query_ctx->register_memory_statistics();
query_ctx->register_cpu_statistics();
+ bool is_pipeline = false;
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
- if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
- uint64_t tg_id = params.workload_groups[0].id;
- auto* tg_mgr = _exec_env->task_group_manager();
- if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id))
{
-
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
- // set task group to queryctx for memory tracker can be
removed, see QueryContext's destructor
- query_ctx->set_task_group(task_group_ptr);
- stringstream ss;
- ss << "Query/load id: " << print_id(query_ctx->query_id())
- << ", use task group:" << task_group_ptr->debug_string()
- << ", enable cpu hard limit:"
- << (tg_mgr->enable_cpu_hard_limit() ? "true" : "false");
- bool ret = false;
- if (tg_mgr->enable_cgroup()) {
- ret = tg_mgr->set_cg_task_sche_for_query_ctx(tg_id,
query_ctx.get());
- if (ret) {
- ss << ", use cgroup for cpu limit.";
- } else {
- ss << ", not found cgroup sche, no limit for cpu.";
- }
- } else {
- ss << ", use doris sche for cpu limit.";
- query_ctx->use_task_group_for_cpu_limit.store(true);
- }
- LOG(INFO) << ss.str();
-
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
- print_id(query_id), tg_id);
- } else {
- VLOG_DEBUG << "Query/load id: " <<
print_id(query_ctx->query_id())
- << " no task group found, does not use task
group.";
- }
- } else {
- VLOG_DEBUG << "Query/load id: " <<
print_id(query_ctx->query_id())
- << " does not use task group.";
+ is_pipeline = true;
+ }
+
+ if (params.__isset.workload_groups && !params.workload_groups.empty())
{
+ uint64_t tg_id = params.workload_groups[0].id;
+ auto* tg_mgr = _exec_env->task_group_manager();
+ if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) {
+
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
+ // set task group to queryctx for memory tracker can be
removed, see QueryContext's destructor
+ query_ctx->set_task_group(task_group_ptr);
+
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
+
tg_id);
+ query_ctx->set_query_scheduler(tg_id);
+
+ LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
+ << ", use task group: " <<
task_group_ptr->debug_string()
+ << ", is pipeline: " << ((int)is_pipeline)
+ << ", enable cgroup soft limit: "
+ << ((int)config::enable_cgroup_cpu_soft_limit);
}
}
@@ -795,7 +781,12 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
std::make_pair(params.params.fragment_instance_id,
fragment_executor));
_cv.notify_all();
}
- auto st = _thread_pool->submit_func(
+
+ auto* current_thread_pool = query_ctx->get_non_pipe_exec_thread_pool();
+ if (!current_thread_pool) {
+ current_thread_pool = _thread_pool.get();
+ }
+ auto st = current_thread_pool->submit_func(
[this, fragment_executor, cb] { _exec_actual(fragment_executor,
cb); });
if (!st.ok()) {
{
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index fffb5ad57a9..a70bf6695ac 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -20,6 +20,7 @@
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/task_group/task_group_manager.h"
namespace doris {
@@ -152,4 +153,28 @@ void QueryContext::register_cpu_statistics() {
}
}
+void QueryContext::set_query_scheduler(uint64_t tg_id) {
+ auto* tg_mgr = _exec_env->task_group_manager();
+ tg_mgr->get_query_scheduler(tg_id, &_task_scheduler, &_scan_task_scheduler,
+ &_non_pipe_thread_pool);
+}
+
+doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
+ if (!config::enable_cgroup_cpu_soft_limit) {
+ return _exec_env->pipeline_task_group_scheduler();
+ } else if (_task_scheduler) {
+ return _task_scheduler;
+ } else {
+ return _exec_env->pipeline_task_scheduler();
+ }
+}
+
+ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
+ if (_task_group) {
+ return _non_pipe_thread_pool;
+ } else {
+ return nullptr;
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dd24206c415..d5a8f12cee1 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -218,6 +218,12 @@ public:
std::shared_ptr<QueryStatistics> get_cpu_statistics() { return
_cpu_statistics; }
+ void set_query_scheduler(uint64_t wg_id);
+
+ doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
+
+ ThreadPool* get_non_pipe_exec_thread_pool();
+
public:
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
@@ -247,8 +253,6 @@ public:
// only for file scan node
std::map<int, TFileScanRangeParams> file_scan_range_params_map;
- std::atomic<bool> use_task_group_for_cpu_limit = false;
-
private:
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
@@ -272,7 +276,7 @@ private:
std::shared_ptr<vectorized::SharedScannerController>
_shared_scanner_controller;
vectorized::RuntimePredicate _runtime_predicate;
- taskgroup::TaskGroupPtr _task_group;
+ taskgroup::TaskGroupPtr _task_group = nullptr;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;
@@ -281,8 +285,9 @@ private:
// to report the real message if failed.
Status _exec_status = Status::OK();
- pipeline::TaskScheduler* _task_scheduler = nullptr;
+ doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
+ ThreadPool* _non_pipe_thread_pool = nullptr;
std::unique_ptr<pipeline::Dependency> _execution_dependency;
std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index cbd7b5f73f6..74694baa9fc 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -67,18 +67,25 @@ TaskGroupPtr
TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
return nullptr;
}
-bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id,
QueryContext* query_ctx_ptr) {
- std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
+void TaskGroupManager::get_query_scheduler(uint64_t tg_id,
+ doris::pipeline::TaskScheduler**
exec_sched,
+
vectorized::SimplifiedScanScheduler** scan_sched,
+ ThreadPool** non_pipe_thread_pool) {
+ std::shared_lock<std::shared_mutex> r_lock(_task_scheduler_lock);
auto tg_sche_it = _tg_sche_map.find(tg_id);
if (tg_sche_it != _tg_sche_map.end()) {
- query_ctx_ptr->set_task_scheduler(tg_sche_it->second.get());
- auto _tg_scan_sche_it = _tg_scan_sche_map.find(tg_id);
- if (_tg_scan_sche_it != _tg_scan_sche_map.end()) {
-
query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_it->second.get());
- return true;
- }
+ *exec_sched = tg_sche_it->second.get();
+ }
+
+ auto tg_scan_sche_it = _tg_scan_sche_map.find(tg_id);
+ if (tg_scan_sche_it != _tg_scan_sche_map.end()) {
+ *scan_sched = tg_scan_sche_it->second.get();
+ }
+
+ auto non_pipe_thread_pool_iter = _non_pipe_thread_pool_map.find(tg_id);
+ if (non_pipe_thread_pool_iter != _non_pipe_thread_pool_map.end()) {
+ *non_pipe_thread_pool = non_pipe_thread_pool_iter->second.get();
}
- return false;
}
Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo*
tg_info,
@@ -135,7 +142,23 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
}
}
- // step 4 update cgroup cpu if needed
+ // step 4: init non-pipe scheduler
+ if (_non_pipe_thread_pool_map.find(tg_id) ==
_non_pipe_thread_pool_map.end()) {
+ std::unique_ptr<ThreadPool> thread_pool = nullptr;
+ auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
+ .set_min_threads(1)
+
.set_max_threads(config::fragment_pool_thread_num_max)
+
.set_max_queue_size(config::fragment_pool_queue_size)
+ .set_cgroup_cpu_ctl(cg_cu_ctl_ptr)
+ .build(&thread_pool);
+ if (!ret.ok()) {
+ LOG(INFO) << "create non-pipline thread pool failed";
+ } else {
+ _non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool));
+ }
+ }
+
+ // step 5: update cgroup cpu if needed
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
@@ -160,6 +183,7 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
// stop task sche may cost some time, so it should not be locked
std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
+ std::set<ThreadPool*> non_pip_thread_pool_to_del;
std::set<uint64_t> deleted_tg_ids;
{
std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
@@ -177,6 +201,13 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
}
}
+ for (auto iter = _non_pipe_thread_pool_map.begin(); iter !=
_non_pipe_thread_pool_map.end();
+ iter++) {
+ uint64_t tg_id = iter->first;
+ if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+
non_pip_thread_pool_to_del.insert(_non_pipe_thread_pool_map[tg_id].get());
+ }
+ }
}
// 1 stop all threads
for (auto* ptr1 : task_sche_to_del) {
@@ -185,6 +216,9 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
for (auto* ptr2 : scan_task_sche_to_del) {
ptr2->stop();
}
+ for (auto& ptr3 : non_pip_thread_pool_to_del) {
+ ptr3->shutdown();
+ }
// 2 release resource in memory
{
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
@@ -242,6 +276,9 @@ void TaskGroupManager::stop() {
for (auto& task_sche : _tg_scan_sche_map) {
task_sche.second->stop();
}
+ for (auto& no_pip_sche : _non_pipe_thread_pool_map) {
+ no_pip_sche.second->shutdown();
+ }
}
} // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 08968b6fe99..a7ccb52f00e 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -65,11 +65,9 @@ public:
bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
- bool set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext*
query_ctx_ptr);
-
- // currently cgroup both support cpu soft limit and cpu hard limit
- // doris task group only support cpu soft limit
- bool enable_cgroup() { return enable_cpu_hard_limit() ||
config::enable_cgroup_cpu_soft_limit; }
+ void get_query_scheduler(uint64_t tg_id, doris::pipeline::TaskScheduler**
exec_sched,
+ vectorized::SimplifiedScanScheduler** scan_sched,
+ ThreadPool** non_pipe_thread_pool);
private:
std::shared_mutex _group_mutex;
@@ -81,6 +79,7 @@ private:
std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>>
_tg_sche_map;
std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>>
_tg_scan_sche_map;
std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;
+ std::map<uint64_t, std::unique_ptr<ThreadPool>> _non_pipe_thread_pool_map;
std::shared_mutex _init_cg_ctl_lock;
std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
index 2398bff465c..731dd0c8661 100644
--- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -81,7 +81,6 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
if (list.size() == 0) {
continue;
}
- LOG(INFO) << "[workload_schedule] get query list size=" << list.size();
for (int i = 0; i < list.size(); i++) {
WorkloadQueryInfo* query_info_ptr = &(list[i]);
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 1dd9966b162..28aec83d6a2 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -253,7 +253,7 @@ protected:
const int64_t _max_bytes_in_queue;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
- SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; // used for cpu
hard limit
+ SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
// List "scanners" saves all "unfinished" scanners.
// The scanner scheduler will pop scanners from this list, run scanner,
// and then if the scanner is not finished, will be pushed back to this
list.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 586a27ee103..8581980da2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3639,6 +3639,9 @@ public class Coordinator implements CoordInterface {
params.params.setPerNodeScanRanges(scanRanges);
params.params.setPerExchNumSenders(perExchNumSenders);
+ if (tWorkloadGroups != null) {
+ params.setWorkloadGroups(tWorkloadGroups);
+ }
params.params.setDestinations(destinations);
params.params.setSenderId(i);
params.params.setNumSenders(instanceExecParams.size());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 767d03f43d7..8b72ce3eb2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1519,7 +1519,7 @@ public class StmtExecutor {
coordBase = new PointQueryExec(planner, analyzer);
} else {
coord = new Coordinator(context, analyzer, planner,
context.getStatsErrorEstimator());
- if (Config.enable_workload_group &&
context.sessionVariable.getEnablePipelineEngine()) {
+ if (Config.enable_workload_group) {
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
} else {
context.setWorkloadGroupName("");
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index fb9a1888f6c..10f70bd7f86 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -400,6 +400,13 @@ struct TGlobalDict {
2: optional map<i32, i32> slot_dicts // map from slot id to column dict id,
because 2 or more column may share the dict
}
+struct TPipelineWorkloadGroup {
+ 1: optional i64 id
+ 2: optional string name
+ 3: optional map<string, string> properties
+ 4: optional i64 version
+}
+
// ExecPlanFragment
struct TExecPlanFragmentParams {
1: required PaloInternalServiceVersion protocol_version
@@ -483,6 +490,8 @@ struct TExecPlanFragmentParams {
29: optional i64 content_length
+ 30: optional list<TPipelineWorkloadGroup> workload_groups
+
// For cloud
1000: optional bool is_mow_table;
}
@@ -670,13 +679,6 @@ struct TPipelineInstanceParams {
7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
}
-struct TPipelineWorkloadGroup {
- 1: optional i64 id
- 2: optional string name
- 3: optional map<string, string> properties
- 4: optional i64 version
-}
-
// ExecPlanFragment
struct TPipelineFragmentParams {
1: required PaloInternalServiceVersion protocol_version
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]