This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 31b3be456c5 add workload scheduler in be (#29116)
31b3be456c5 is described below
commit 31b3be456c53c8c0ec7bb64ecca42866b71e86e0
Author: wangbo <[email protected]>
AuthorDate: Thu Dec 28 15:04:22 2023 +0800
add workload scheduler in be (#29116)
---
be/src/agent/agent_server.cpp | 8 ++
be/src/agent/workload_sched_policy_listener.cpp | 78 ++++++++++++
.../src/agent/workload_sched_policy_listener.h | 30 ++---
be/src/runtime/exec_env.h | 4 +
be/src/runtime/exec_env_init.cpp | 8 ++
be/src/runtime/fragment_mgr.cpp | 20 ++++
be/src/runtime/fragment_mgr.h | 3 +
be/src/runtime/query_context.h | 2 +
.../workload_management/workload_action.cpp | 28 ++---
.../runtime/workload_management/workload_action.h | 68 +++++++++++
.../workload_management/workload_comparator.h | 77 ++++++++++++
.../workload_management/workload_condition.cpp | 59 ++++++++++
.../workload_management/workload_condition.h | 97 +++++++++++++++
.../workload_management/workload_query_info.h | 26 ++--
.../workload_management/workload_sched_policy.cpp | 73 ++++++++++++
.../workload_management/workload_sched_policy.h | 59 ++++++++++
.../workload_sched_policy_mgr.cpp | 131 +++++++++++++++++++++
.../workload_sched_policy_mgr.h | 43 ++++---
.../main/java/org/apache/doris/catalog/Env.java | 3 +
.../doris/common/publish/TopicPublisherThread.java | 7 +-
.../common/publish/WorkloadGroupPublisher.java | 9 +-
.../workloadschedpolicy/WorkloadSchedPolicy.java | 67 +++++++++++
.../WorkloadSchedPolicyMgr.java | 17 +++
.../WorkloadSchedPolicyPublisher.java} | 17 ++-
gensrc/thrift/BackendService.thrift | 44 ++++++-
25 files changed, 907 insertions(+), 71 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 9902a0ad726..a3b18c53567 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -31,6 +31,7 @@
#include "agent/topic_subscriber.h"
#include "agent/utils.h"
#include "agent/workload_group_listener.h"
+#include "agent/workload_sched_policy_listener.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
@@ -72,6 +73,13 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
LOG(INFO) << "Register workload group listener";
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
std::move(wg_listener));
+
+ std::unique_ptr<TopicListener> policy_listener =
+ std::make_unique<WorkloadschedPolicyListener>(exec_env);
+ LOG(INFO) << "Register workload scheduler policy listener";
+
_topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_SCHED_POLICY,
+ std::move(policy_listener));
+
#endif
}
diff --git a/be/src/agent/workload_sched_policy_listener.cpp
b/be/src/agent/workload_sched_policy_listener.cpp
new file mode 100644
index 00000000000..461fd2cbb0f
--- /dev/null
+++ b/be/src/agent/workload_sched_policy_listener.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "agent/workload_sched_policy_listener.h"
+
+#include "runtime/workload_management/workload_action.h"
+#include "runtime/workload_management/workload_condition.h"
+#include "runtime/workload_management/workload_sched_policy.h"
+#include "runtime/workload_management/workload_sched_policy_mgr.h"
+
+namespace doris {
+
+void WorkloadschedPolicyListener::handle_topic_info(const
std::vector<TopicInfo>& topic_info_list) {
+ std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> policy_map;
+ for (const TopicInfo& topic_info : topic_info_list) {
+ if (!topic_info.__isset.workload_sched_policy) {
+ continue;
+ }
+
+ TWorkloadSchedPolicy tpolicy = topic_info.workload_sched_policy;
+ // some metric or action can not exec in be, then need skip
+ bool need_skip_current_policy = false;
+
+ std::vector<std::unique_ptr<WorkloadCondition>> cond_ptr_list;
+ for (TWorkloadCondition& cond : tpolicy.condition_list) {
+ std::unique_ptr<WorkloadCondition> cond_ptr =
+ WorkloadConditionFactory::create_workload_condition(&cond);
+ if (cond_ptr == nullptr) {
+ need_skip_current_policy = true;
+ break;
+ }
+ cond_ptr_list.push_back(std::move(cond_ptr));
+ }
+ if (need_skip_current_policy) {
+ continue;
+ }
+
+ std::vector<std::unique_ptr<WorkloadAction>> action_ptr_list;
+ for (TWorkloadAction& action : tpolicy.action_list) {
+ std::unique_ptr<WorkloadAction> action_ptr =
+ WorkloadActionFactory::create_workload_action(&action);
+ if (action_ptr == nullptr) {
+ need_skip_current_policy = true;
+ break;
+ }
+ action_ptr_list.push_back(std::move(action_ptr));
+ }
+ if (need_skip_current_policy) {
+ continue;
+ }
+
+ std::shared_ptr<WorkloadSchedPolicy> policy_ptr =
std::make_shared<WorkloadSchedPolicy>();
+ policy_ptr->init(tpolicy.id, tpolicy.name, tpolicy.version,
tpolicy.enabled,
+ tpolicy.priority, std::move(cond_ptr_list),
std::move(action_ptr_list));
+ policy_map.emplace(tpolicy.id, std::move(policy_ptr));
+ }
+ size_t new_policy_size = policy_map.size();
+ if (new_policy_size > 0) {
+
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
+ }
+ LOG(INFO) << "[workload_schedule]finish update workload schedule policy,
size="
+ << new_policy_size;
+}
+} // namespace doris
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/be/src/agent/workload_sched_policy_listener.h
similarity index 59%
copy from
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/agent/workload_sched_policy_listener.h
index 6c5ce9e4c11..f1410b502fd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/agent/workload_sched_policy_listener.h
@@ -15,23 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.publish;
+#pragma once
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include <glog/logging.h>
-public class WorkloadGroupPublisher implements TopicPublisher {
+#include "agent/topic_listener.h"
+#include "runtime/exec_env.h"
- private Env env;
+namespace doris {
- public WorkloadGroupPublisher(Env env) {
- this.env = env;
- }
+class WorkloadschedPolicyListener : public TopicListener {
+public:
+ WorkloadschedPolicyListener(ExecEnv* exec_env) : _exec_env(exec_env) {}
- @Override
- public void getTopicInfo(TPublishTopicRequest req) {
- req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
- env.getWorkloadGroupMgr().getPublishTopicInfo());
- }
-}
+ void handle_topic_info(const std::vector<TopicInfo>& topic_info_list)
override;
+
+private:
+ ExecEnv* _exec_env = nullptr;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4c1060891f5..f8a52ad1ec4 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -63,6 +63,7 @@ class InvertedIndexSearcherCache;
class InvertedIndexQueryCache;
} // namespace segment_v2
+class WorkloadSchedPolicyMgr;
class BfdParser;
class BrokerMgr;
template <class T>
@@ -153,6 +154,7 @@ public:
pipeline::TaskScheduler* pipeline_task_scheduler() { return
_without_group_task_scheduler; }
pipeline::TaskScheduler* pipeline_task_group_scheduler() { return
_with_group_task_scheduler; }
taskgroup::TaskGroupManager* task_group_manager() { return
_task_group_manager; }
+ WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return
_workload_sched_mgr; }
// using template to simplify client cache management
template <typename T>
@@ -378,6 +380,8 @@ private:
std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_with_group_block_scheduler;
doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue =
nullptr;
+
+ WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 4df69d67c03..48d229b9e21 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -76,6 +76,7 @@
#include "runtime/task_group/task_group_manager.h"
#include "runtime/thread_context.h"
#include "runtime/user_function_cache.h"
+#include "runtime/workload_management/workload_sched_policy_mgr.h"
#include "service/backend_options.h"
#include "service/backend_service.h"
#include "service/point_query_executor.h"
@@ -194,6 +195,10 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_threads(1)
.set_max_queue_size(1000000)
.build(&_lazy_release_obj_pool));
+
+ _workload_sched_mgr = new WorkloadSchedPolicyMgr();
+ _workload_sched_mgr->start(this);
+
init_file_cache_factory();
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_task_group_manager = new taskgroup::TaskGroupManager();
@@ -519,6 +524,8 @@ void ExecEnv::destroy() {
SAFE_STOP(_group_commit_mgr);
// _routine_load_task_executor should be stopped before
_new_load_stream_mgr.
SAFE_STOP(_routine_load_task_executor);
+ // stop workload scheduler
+ SAFE_STOP(_workload_sched_mgr);
// stop pipline step 1, non-cgroup execution
SAFE_SHUTDOWN(_without_group_block_scheduler.get());
SAFE_STOP(_without_group_task_scheduler);
@@ -593,6 +600,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_bfd_parser);
SAFE_DELETE(_result_cache);
SAFE_DELETE(_fragment_mgr);
+ SAFE_DELETE(_workload_sched_mgr);
SAFE_DELETE(_task_group_manager);
SAFE_DELETE(_with_group_task_scheduler);
SAFE_DELETE(_without_group_task_scheduler);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 65e1ba475ae..9256c3bccfd 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -73,6 +73,7 @@
#include "runtime/task_group/task_group_manager.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
+#include "runtime/workload_management/workload_query_info.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
@@ -1567,4 +1568,23 @@ void
FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
}
}
+void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>*
query_info_list) {
+ {
+ std::lock_guard<std::mutex> lock(_lock);
+ // todo: use monotonic time
+ VecDateTimeValue now = VecDateTimeValue::local_time();
+ for (const auto& q : _query_ctx_map) {
+ WorkloadQueryInfo workload_query_info;
+ workload_query_info.query_id = print_id(q.first);
+ workload_query_info.tquery_id = q.first;
+
+ uint64_t query_time_millisecond = q.second->query_time(now) * 1000;
+
workload_query_info.metric_map.emplace(WorkloadMetricType::QUERY_TIME,
+
std::to_string(query_time_millisecond));
+ // todo, add scan rows, scan bytes
+ query_info_list->push_back(workload_query_info);
+ }
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 21d85503803..f9a6d4f72ed 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -64,6 +64,7 @@ class TPipelineInstanceParams;
class TScanColumnDesc;
class TScanOpenParams;
class Thread;
+class WorkloadQueryInfo;
std::string to_load_error_http_path(const std::string& file_name);
@@ -153,6 +154,8 @@ public:
std::string dump_pipeline_tasks();
+ void get_runtime_query_info(std::vector<WorkloadQueryInfo>*
_query_info_list);
+
private:
void cancel_unlocked_impl(const TUniqueId& id, const
PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock,
bool is_pipeline,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a230fd653e8..203c5b6e3f4 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -93,6 +93,8 @@ public:
return false;
}
+ int64_t query_time(VecDateTimeValue& now) { return
now.second_diff(_start_time); }
+
void set_thread_token(int concurrency, bool is_serial) {
_thread_token =
_exec_env->scanner_scheduler()->new_limited_scan_pool_token(
is_serial ? ThreadPool::ExecutionMode::SERIAL
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/be/src/runtime/workload_management/workload_action.cpp
similarity index 55%
copy from
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/runtime/workload_management/workload_action.cpp
index 6c5ce9e4c11..39916bc7cc1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/runtime/workload_management/workload_action.cpp
@@ -15,23 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.publish;
+#include "runtime/workload_management/workload_action.h"
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include "runtime/fragment_mgr.h"
-public class WorkloadGroupPublisher implements TopicPublisher {
+namespace doris {
- private Env env;
+void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
+ LOG(INFO) << "[workload_schedule]workload scheduler cancel query " <<
query_info->query_id;
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+ query_info->tquery_id, PPlanFragmentCancelReason::INTERNAL_ERROR,
+ std::string("query canceled by workload scheduler"));
+}
- public WorkloadGroupPublisher(Env env) {
- this.env = env;
- }
+void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
+ LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name;
+};
- @Override
- public void getTopicInfo(TPublishTopicRequest req) {
- req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
- env.getWorkloadGroupMgr().getPublishTopicInfo());
- }
-}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_action.h
b/be/src/runtime/workload_management/workload_action.h
new file mode 100644
index 00000000000..29c01320b78
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_action.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/workload_management/workload_query_info.h"
+
+namespace doris {
+
+enum WorkloadActionType { MOVE_QUERY_TO_GROUP = 0, CANCEL_QUERY = 1 };
+
+class WorkloadAction {
+public:
+ WorkloadAction() = default;
+ virtual ~WorkloadAction() = default;
+
+ virtual void exec(WorkloadQueryInfo* query_info) = 0;
+
+ virtual WorkloadActionType get_action_type() = 0;
+};
+
+class WorkloadActionCancelQuery : public WorkloadAction {
+public:
+ void exec(WorkloadQueryInfo* query_info) override;
+
+ WorkloadActionType get_action_type() override { return CANCEL_QUERY; }
+};
+
+//todo(wb) implement it
+class WorkloadActionMoveQuery : public WorkloadAction {
+public:
+ WorkloadActionMoveQuery(std::string wg_name) : _wg_name(wg_name) {}
+ void exec(WorkloadQueryInfo* query_info) override;
+
+ WorkloadActionType get_action_type() override { return
MOVE_QUERY_TO_GROUP; }
+
+private:
+ std::string _wg_name;
+};
+
+class WorkloadActionFactory {
+public:
+ static std::unique_ptr<WorkloadAction>
create_workload_action(TWorkloadAction* action) {
+ if (TWorkloadActionType::type::MOVE_QUERY_TO_GROUP == action->action) {
+ return
std::make_unique<WorkloadActionMoveQuery>(action->action_args);
+ } else if (TWorkloadActionType::type::CANCEL_QUERY == action->action) {
+ return std::make_unique<WorkloadActionCancelQuery>();
+ }
+ LOG(ERROR) << "not find a action " << action->action;
+ return nullptr;
+ }
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_comparator.h
b/be/src/runtime/workload_management/workload_comparator.h
new file mode 100644
index 00000000000..00981a89dc8
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_comparator.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+enum WorkloadCompareOperator { EQUAL, GREATER, GREATER_EQUAL, LESS,
LESS_EQUAL, INVALID_OP };
+
+class WorkloadCompareUtils {
+public:
+ static WorkloadCompareOperator
get_workload_compare_operator(TCompareOperator::type t_op) {
+ if (TCompareOperator::type::EQUAL == t_op) {
+ return EQUAL;
+ } else if (TCompareOperator::type::GREATER == t_op) {
+ return GREATER;
+ } else if (TCompareOperator::type::GREATER_EQUAL == t_op) {
+ return GREATER_EQUAL;
+ } else if (TCompareOperator::type::LESS == t_op) {
+ return LESS;
+ } else if (TCompareOperator::type::LESS_EQUAL == t_op) {
+ return LESS_EQUAL;
+ }
+ LOG(ERROR) << "can not find a valid op ";
+ return INVALID_OP;
+ }
+
+ static bool compare_signed_integer(WorkloadCompareOperator op, int64_t
first_val,
+ int64_t second_val) {
+ switch (op) {
+ case EQUAL:
+ return first_val == second_val;
+ case GREATER:
+ return first_val > second_val;
+ case GREATER_EQUAL:
+ return first_val >= second_val;
+ case LESS:
+ return first_val < second_val;
+ case LESS_EQUAL:
+ return first_val <= second_val;
+ default:
+ LOG(ERROR) << "unexpected signed integer compare operator " << op;
+ return false;
+ }
+ }
+
+ static bool compare_string(WorkloadCompareOperator op, std::string
first_val,
+ std::string second_val) {
+ switch (op) {
+ case EQUAL:
+ return first_val == second_val;
+ default:
+ LOG(ERROR) << "unexpected string compare operator " << op;
+ return false;
+ }
+ }
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.cpp
b/be/src/runtime/workload_management/workload_condition.cpp
new file mode 100644
index 00000000000..dff6f2adc24
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_condition.cpp
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_condition.h"
+
+namespace doris {
+
+// query time
+WorkloadConditionQueryTime::WorkloadConditionQueryTime(WorkloadCompareOperator
op,
+ std::string str_val) {
+ _op = op;
+ _query_time = std::stol(str_val);
+}
+
+bool WorkloadConditionQueryTime::eval(std::string str_val) {
+ int64_t query_time_args = std::stol(str_val);
+ return WorkloadCompareUtils::compare_signed_integer(_op, query_time_args,
_query_time);
+}
+
+// scan rows
+WorkloadConditionScanRows::WorkloadConditionScanRows(WorkloadCompareOperator
op,
+ std::string str_val) {
+ _op = op;
+ _scan_rows = std::stol(str_val);
+}
+
+bool WorkloadConditionScanRows::eval(std::string str_val) {
+ int64_t scan_rows_args = std::stol(str_val);
+ return WorkloadCompareUtils::compare_signed_integer(_op, scan_rows_args,
_scan_rows);
+}
+
+// scan bytes
+WorkloadConditionScanBytes::WorkloadConditionScanBytes(WorkloadCompareOperator
op,
+ std::string str_val) {
+ _op = op;
+ _scan_bytes = std::stol(str_val);
+}
+
+// todo(wb): need handle invalid input value
+bool WorkloadConditionScanBytes::eval(std::string str_val) {
+ int64_t scan_bytes_args = std::stol(str_val);
+ return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args,
_scan_bytes);
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.h
b/be/src/runtime/workload_management/workload_condition.h
new file mode 100644
index 00000000000..3486742c968
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_condition.h
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/BackendService_types.h>
+
+#include "runtime/workload_management/workload_comparator.h"
+
+namespace doris {
+
+enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES };
+
+class WorkloadCondition {
+public:
+ WorkloadCondition() = default;
+ virtual ~WorkloadCondition() = default;
+
+ virtual bool eval(std::string str_val) = 0;
+
+ virtual WorkloadMetricType get_workload_metric_type() = 0;
+};
+
+class WorkloadConditionQueryTime : public WorkloadCondition {
+public:
+ WorkloadConditionQueryTime(WorkloadCompareOperator op, std::string
str_val);
+
+ bool eval(std::string str_val) override;
+
+ WorkloadMetricType get_workload_metric_type() override {
+ return WorkloadMetricType::QUERY_TIME;
+ }
+
+private:
+ int64_t _query_time;
+ WorkloadCompareOperator _op;
+};
+
+class WorkloadConditionScanRows : public WorkloadCondition {
+public:
+ WorkloadConditionScanRows(WorkloadCompareOperator op, std::string str_val);
+ bool eval(std::string str_val) override;
+ WorkloadMetricType get_workload_metric_type() override { return
WorkloadMetricType::SCAN_ROWS; }
+
+private:
+ int64_t _scan_rows;
+ WorkloadCompareOperator _op;
+};
+
+class WorkloadConditionScanBytes : public WorkloadCondition {
+public:
+ WorkloadConditionScanBytes(WorkloadCompareOperator op, std::string
str_val);
+ bool eval(std::string str_val) override;
+ WorkloadMetricType get_workload_metric_type() override {
+ return WorkloadMetricType::SCAN_BYTES;
+ }
+
+private:
+ int64_t _scan_bytes;
+ WorkloadCompareOperator _op;
+};
+
+class WorkloadConditionFactory {
+public:
+ static std::unique_ptr<WorkloadCondition> create_workload_condition(
+ TWorkloadCondition* t_cond) {
+ WorkloadCompareOperator op =
+
WorkloadCompareUtils::get_workload_compare_operator(t_cond->op);
+ std::string str_val = t_cond->value;
+ TWorkloadMetricType::type metric_name = t_cond->metric_name;
+ if (TWorkloadMetricType::type::QUERY_TIME == metric_name) {
+ return std::make_unique<WorkloadConditionQueryTime>(op, str_val);
+ } else if (TWorkloadMetricType::type::SCAN_ROWS == metric_name) {
+ return std::make_unique<WorkloadConditionScanRows>(op, str_val);
+ } else if (TWorkloadMetricType::type::SCAN_BYTES == metric_name) {
+ return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
+ }
+ LOG(ERROR) << "not find a metric name " << metric_name;
+ return nullptr;
+ }
+};
+
+} // namespace doris
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/be/src/runtime/workload_management/workload_query_info.h
similarity index 59%
copy from
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/runtime/workload_management/workload_query_info.h
index 6c5ce9e4c11..9c24e9dee8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/runtime/workload_management/workload_query_info.h
@@ -15,23 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.publish;
+#pragma once
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include <map>
-public class WorkloadGroupPublisher implements TopicPublisher {
+#include "runtime/workload_management/workload_condition.h"
- private Env env;
+namespace doris {
- public WorkloadGroupPublisher(Env env) {
- this.env = env;
- }
+class WorkloadQueryInfo {
+public:
+ std::map<WorkloadMetricType, std::string> metric_map;
+ TUniqueId tquery_id;
+ std::string query_id;
+};
- @Override
- public void getTopicInfo(TPublishTopicRequest req) {
- req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
- env.getWorkloadGroupMgr().getPublishTopicInfo());
- }
-}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp
b/be/src/runtime/workload_management/workload_sched_policy.cpp
new file mode 100644
index 00000000000..bc543a7d770
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_sched_policy.h"
+
+namespace doris {
+
+void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool
enabled,
+ int priority,
+ std::vector<std::unique_ptr<WorkloadCondition>>
condition_list,
+ std::vector<std::unique_ptr<WorkloadAction>>
action_list) {
+ _id = id;
+ _name = name;
+ _version = version;
+ _enabled = enabled;
+ _priority = priority;
+ _condition_list = std::move(condition_list);
+ _action_list = std::move(action_list);
+
+ _first_action_type = _action_list[0]->get_action_type();
+ if (_first_action_type != WorkloadActionType::MOVE_QUERY_TO_GROUP &&
+ _first_action_type != WorkloadActionType::CANCEL_QUERY) {
+ for (int i = 1; i < _action_list.size(); i++) {
+ WorkloadActionType cur_action_type =
_action_list[i]->get_action_type();
+ // one policy can not both contains move and cancel
+ if (cur_action_type == WorkloadActionType::MOVE_QUERY_TO_GROUP ||
+ cur_action_type == WorkloadActionType::CANCEL_QUERY) {
+ _first_action_type = cur_action_type;
+ break;
+ }
+ }
+ }
+}
+
+bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) {
+ if (!_enabled) {
+ return false;
+ }
+ auto& metric_val_map = query_info_ptr->metric_map;
+ for (auto& cond : _condition_list) {
+ if (metric_val_map.find(cond->get_workload_metric_type()) ==
metric_val_map.end()) {
+ return false;
+ }
+
+ std::string val = metric_val_map.at(cond->get_workload_metric_type());
+ if (!cond->eval(val)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
+ for (int i = 0; i < _action_list.size(); i++) {
+ _action_list[i]->exec(query_info);
+ }
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.h
b/be/src/runtime/workload_management/workload_sched_policy.h
new file mode 100644
index 00000000000..82f42b9a0b4
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_sched_policy.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "runtime/workload_management/workload_action.h"
+#include "runtime/workload_management/workload_condition.h"
+
+namespace doris {
+
+class WorkloadSchedPolicy {
+public:
+ WorkloadSchedPolicy() = default;
+ ~WorkloadSchedPolicy() = default;
+
+ void init(int64_t id, std::string name, int version, bool enabled, int
priority,
+ std::vector<std::unique_ptr<WorkloadCondition>> condition_list,
+ std::vector<std::unique_ptr<WorkloadAction>> action_list);
+
+ bool enabled() { return _enabled; }
+ int priority() { return _priority; }
+
+ bool is_match(WorkloadQueryInfo* query_info);
+
+ WorkloadActionType get_first_action_type() { return _first_action_type; }
+
+ void exec_action(WorkloadQueryInfo* query_info);
+
+ int version() { return _version; }
+
+private:
+ int64_t _id;
+ std::string _name;
+ int _version;
+ bool _enabled;
+ int _priority;
+
+ std::vector<std::unique_ptr<WorkloadCondition>> _condition_list;
+ std::vector<std::unique_ptr<WorkloadAction>> _action_list;
+
+ WorkloadActionType _first_action_type;
+};
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
new file mode 100644
index 00000000000..8a30b5395eb
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_management/workload_sched_policy_mgr.h"
+
+#include "runtime/fragment_mgr.h"
+
+namespace doris {
+
+void WorkloadSchedPolicyMgr::start(ExecEnv* exec_env) {
+ _stop_latch.reset(1);
+ _exec_env = exec_env;
+
+ Status st;
+ st = Thread::create(
+ "workload", "workload_scheduler", [this]() {
this->_schedule_workload(); }, &_thread);
+ if (!st.ok()) {
+ LOG(WARNING) << "create workload scheduler thread failed";
+ } else {
+ LOG(INFO) << "start workload scheduler ";
+ }
+}
+
+void WorkloadSchedPolicyMgr::stop() {
+ std::lock_guard<std::shared_mutex> write_lock(_stop_lock);
+ if (_stop_latch.count() == 0) {
+ LOG(INFO) << "workload schedule manager is already stopped. ";
+ return;
+ }
+ _stop_latch.count_down();
+ if (_thread) {
+ _thread->join();
+ }
+ LOG(INFO) << "workload schedule manager stopped, thread is finished. ";
+}
+
+void WorkloadSchedPolicyMgr::update_workload_sched_policy(
+ std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>>
new_policy_map) {
+ std::lock_guard<std::shared_mutex> write_lock(_policy_lock);
+ // 1 upsert
+ for (const auto& [id, policy] : new_policy_map) {
+ if (_id_policy_map.find(id) == _id_policy_map.end()) {
+ _id_policy_map.emplace(id, policy);
+ } else {
+ if (policy->version() > _id_policy_map.at(id)->version()) {
+ _id_policy_map[id] = policy;
+ }
+ }
+ }
+
+ // 2 delete
+ for (auto iter = _id_policy_map.begin(); iter != _id_policy_map.end();) {
+ if (new_policy_map.find(iter->first) == new_policy_map.end()) {
+ iter = _id_policy_map.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+}
+
+void WorkloadSchedPolicyMgr::_schedule_workload() {
+ while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) {
+ // 1 get query info
+ std::vector<WorkloadQueryInfo> list;
+ _exec_env->fragment_mgr()->get_runtime_query_info(&list);
+ // todo: add timer
+ LOG(INFO) << "[workload_schedule] get query list size=" << list.size();
+
+ for (int i = 0; i < list.size(); i++) {
+ WorkloadQueryInfo* query_info_ptr = &(list[i]);
+ // 2 get matched policy
+ std::map<WorkloadActionType, std::shared_ptr<WorkloadSchedPolicy>>
matched_policy_map;
+ {
+ std::shared_lock<std::shared_mutex> read_lock(_policy_lock);
+ for (auto& entity : _id_policy_map) {
+ auto& new_policy = entity.second;
+ if (new_policy->is_match(query_info_ptr)) {
+ WorkloadActionType new_policy_type =
new_policy->get_first_action_type();
+ if (matched_policy_map.find(new_policy_type) ==
matched_policy_map.end() ||
+ new_policy->priority() >
+
matched_policy_map.at(new_policy_type)->priority()) {
+ matched_policy_map[new_policy_type] = new_policy;
+ }
+ }
+ }
+ }
+
+ if (matched_policy_map.size() == 0) {
+ continue;
+ }
+ LOG(INFO) << "[workload_schedule] matched policy size=" <<
matched_policy_map.size();
+ // 3 check action conflicts
+ if
(matched_policy_map.find(WorkloadActionType::MOVE_QUERY_TO_GROUP) !=
+ matched_policy_map.end() &&
+ matched_policy_map.find(WorkloadActionType::CANCEL_QUERY) !=
+ matched_policy_map.end()) {
+ // compare priority
+ int move_prio =
+
matched_policy_map.at(WorkloadActionType::MOVE_QUERY_TO_GROUP)->priority();
+ int cancel_prio =
+
matched_policy_map.at(WorkloadActionType::CANCEL_QUERY)->priority();
+ if (cancel_prio >= move_prio) {
+
matched_policy_map.erase(WorkloadActionType::MOVE_QUERY_TO_GROUP);
+ } else {
+ matched_policy_map.erase(WorkloadActionType::CANCEL_QUERY);
+ }
+ }
+
+ // 4 exec policy action
+ for (const auto& [key, value] : matched_policy_map) {
+ value->exec_action(query_info_ptr);
+ }
+ }
+ }
+}
+
+} // namespace doris
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/be/src/runtime/workload_management/workload_sched_policy_mgr.h
similarity index 51%
copy from
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to be/src/runtime/workload_management/workload_sched_policy_mgr.h
index 6c5ce9e4c11..52b41eacf4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.h
@@ -15,23 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.publish;
+#pragma once
-import org.apache.doris.catalog.Env;
-import org.apache.doris.thrift.TPublishTopicRequest;
-import org.apache.doris.thrift.TTopicInfoType;
+#include "runtime/exec_env.h"
+#include "runtime/workload_management/workload_sched_policy.h"
+#include "util/countdown_latch.h"
-public class WorkloadGroupPublisher implements TopicPublisher {
+namespace doris {
- private Env env;
+class WorkloadSchedPolicyMgr {
+public:
+ WorkloadSchedPolicyMgr() : _stop_latch(0) {}
+ ~WorkloadSchedPolicyMgr() = default;
- public WorkloadGroupPublisher(Env env) {
- this.env = env;
- }
+ void start(ExecEnv* exec_env);
- @Override
- public void getTopicInfo(TPublishTopicRequest req) {
- req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
- env.getWorkloadGroupMgr().getPublishTopicInfo());
- }
-}
+ void stop();
+
+ void update_workload_sched_policy(
+ std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>>
policy_map);
+
+private:
+ void _schedule_workload();
+
+ std::shared_mutex _policy_lock;
+ std::map<uint64_t, std::shared_ptr<WorkloadSchedPolicy>> _id_policy_map;
+
+ std::shared_mutex _stop_lock;
+ CountDownLatch _stop_latch;
+ scoped_refptr<Thread> _thread;
+ ExecEnv* _exec_env;
+};
+
+}; // namespace doris
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index cd77f70f59c..1ad131abb0a 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -230,6 +230,7 @@ import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
+import
org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
@@ -998,6 +999,8 @@ public class Env {
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
topicPublisherThread.addToTopicPublisherList(wgPublisher);
+ WorkloadSchedPolicyPublisher wpPublisher = new
WorkloadSchedPolicyPublisher(this);
+ topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();
workloadGroupMgr.startUpdateThread();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
index 616c8a30b56..db5158e24f5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -59,9 +59,6 @@ public class TopicPublisherThread extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- if (!Config.enable_workload_group) {
- return;
- }
LOG.info("begin publish topic info");
// step 1: get all publish topic info
TPublishTopicRequest request = new TPublishTopicRequest();
@@ -69,6 +66,10 @@ public class TopicPublisherThread extends MasterDaemon {
topicPublisher.getTopicInfo(request);
}
+ if (request.getTopicMap().size() == 0) {
+ return;
+ }
+
// step 2: publish topic info to all be
Collection<Backend> nodesToPublish =
clusterInfoService.getIdToBackend().values();
AckResponseHandler handler = new AckResponseHandler(nodesToPublish);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
index 6c5ce9e4c11..e81ccb06d3e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
@@ -20,6 +20,9 @@ package org.apache.doris.common.publish;
import org.apache.doris.catalog.Env;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
+
+import java.util.List;
public class WorkloadGroupPublisher implements TopicPublisher {
@@ -31,7 +34,9 @@ public class WorkloadGroupPublisher implements TopicPublisher
{
@Override
public void getTopicInfo(TPublishTopicRequest req) {
- req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
- env.getWorkloadGroupMgr().getPublishTopicInfo());
+ List<TopicInfo> list = env.getWorkloadGroupMgr().getPublishTopicInfo();
+ if (list.size() > 0) {
+ req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, list);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 827c2367133..d514ea62d2d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -22,8 +22,16 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCompareOperator;
+import org.apache.doris.thrift.TWorkloadAction;
+import org.apache.doris.thrift.TWorkloadActionType;
+import org.apache.doris.thrift.TWorkloadCondition;
+import org.apache.doris.thrift.TWorkloadMetricType;
+import org.apache.doris.thrift.TWorkloadSchedPolicy;
+import org.apache.doris.thrift.TopicInfo;
import com.esotericsoftware.minlog.Log;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
@@ -41,6 +49,23 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
public static final ImmutableSet<String> POLICY_PROPERTIES = new
ImmutableSet.Builder<String>()
.add(ENABLED).add(PRIORITY).build();
+ // used for convert fe type to thrift type
+ private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType>
METRIC_MAP
+ = new ImmutableMap.Builder<WorkloadMetricType,
TWorkloadMetricType>()
+ .put(WorkloadMetricType.QUERY_TIME,
TWorkloadMetricType.QUERY_TIME).build();
+ private static ImmutableMap<WorkloadActionType, TWorkloadActionType>
ACTION_MAP
+ = new ImmutableMap.Builder<WorkloadActionType,
TWorkloadActionType>()
+ .put(WorkloadActionType.MOVE_QUERY_TO_GROUP,
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
+ .put(WorkloadActionType.CANCEL_QUERY,
TWorkloadActionType.CANCEL_QUERY).build();
+
+ private static ImmutableMap<WorkloadConditionOperator, TCompareOperator>
OP_MAP
+ = new ImmutableMap.Builder<WorkloadConditionOperator,
TCompareOperator>()
+ .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
+ .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
+ .put(WorkloadConditionOperator.GREATER_EQUAL,
TCompareOperator.GREATER_EQUAL)
+ .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
+ .put(WorkloadConditionOperator.LESS_EQUAl,
TCompareOperator.LESS_EQUAL).build();
+
@SerializedName(value = "id")
long id;
@SerializedName(value = "name")
@@ -173,6 +198,48 @@ public class WorkloadSchedPolicy implements Writable,
GsonPostProcessable {
return actionMetaList;
}
+ public TopicInfo toTopicInfo() {
+ TWorkloadSchedPolicy tPolicy = new TWorkloadSchedPolicy();
+ tPolicy.setId(id);
+ tPolicy.setName(name);
+ tPolicy.setVersion(version);
+ tPolicy.setPriority(priority);
+ tPolicy.setEnabled(enabled);
+
+ List<TWorkloadCondition> condList = new ArrayList();
+ for (WorkloadConditionMeta cond : conditionMetaList) {
+ TWorkloadCondition tCond = new TWorkloadCondition();
+ TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName);
+ if (metricType == null) {
+ return null;
+ }
+ tCond.setMetricName(metricType);
+ tCond.setOp(OP_MAP.get(cond.op));
+ tCond.setValue(cond.value);
+ condList.add(tCond);
+ }
+
+ List<TWorkloadAction> actionList = new ArrayList();
+ for (WorkloadActionMeta action : actionMetaList) {
+ TWorkloadAction tAction = new TWorkloadAction();
+ TWorkloadActionType tActionType = ACTION_MAP.get(action.action);
+ if (tActionType == null) {
+ return null;
+ }
+ tAction.setAction(tActionType);
+ tAction.setActionArgs(action.actionArgs);
+ actionList.add(tAction);
+ }
+
+ tPolicy.setConditionList(condList);
+ tPolicy.setActionList(actionList);
+
+ TopicInfo topicInfo = new TopicInfo();
+ topicInfo.setWorkloadSchedPolicy(tPolicy);
+
+ return topicInfo;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 346e34796c7..45ba3a35de4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -35,6 +35,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TopicInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -412,6 +413,22 @@ public class WorkloadSchedPolicyMgr implements Writable,
GsonPostProcessable {
lock.writeLock().unlock();
}
+ public List<TopicInfo> getPublishTopicInfoList() {
+ List<TopicInfo> topicInfoList = new ArrayList();
+ readLock();
+ try {
+ for (Map.Entry<Long, WorkloadSchedPolicy> entry :
idToPolicy.entrySet()) {
+ TopicInfo tInfo = entry.getValue().toTopicInfo();
+ if (tInfo != null) {
+ topicInfoList.add(tInfo);
+ }
+ }
+ } finally {
+ readUnlock();
+ }
+ return topicInfoList;
+ }
+
public void replayCreateWorkloadSchedPolicy(WorkloadSchedPolicy policy) {
insertWorkloadSchedPolicy(policy);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java
similarity index 68%
copy from
fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
copy to
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java
index 6c5ce9e4c11..5083d183e6c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyPublisher.java
@@ -15,23 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.publish;
+package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.thrift.TPublishTopicRequest;
import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TopicInfo;
-public class WorkloadGroupPublisher implements TopicPublisher {
+import java.util.List;
+
+public class WorkloadSchedPolicyPublisher implements TopicPublisher {
private Env env;
- public WorkloadGroupPublisher(Env env) {
+ public WorkloadSchedPolicyPublisher(Env env) {
this.env = env;
}
@Override
public void getTopicInfo(TPublishTopicRequest req) {
- req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP,
- env.getWorkloadGroupMgr().getPublishTopicInfo());
+ List<TopicInfo> list =
env.getWorkloadSchedPolicyMgr().getPublishTopicInfoList();
+ if (list.size() > 0) {
+ req.putToTopicMap(TTopicInfoType.WORKLOAD_SCHED_POLICY, list);
+ }
}
+
}
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 855b7489711..2c5b199fc17 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -163,6 +163,7 @@ struct TQueryIngestBinlogResult {
enum TTopicInfoType {
WORKLOAD_GROUP
MOVE_QUERY_TO_GROUP
+ WORKLOAD_SCHED_POLICY
}
struct TWorkloadGroupInfo {
@@ -178,12 +179,53 @@ struct TWorkloadGroupInfo {
struct TWorkloadMoveQueryToGroupAction {
1: optional Types.TUniqueId query_id
- 2: optional i64 workload_group_id;
+ 2: optional i64 workload_group_id
+}
+
+enum TWorkloadMetricType {
+ QUERY_TIME
+ SCAN_ROWS
+ SCAN_BYTES
+}
+
+enum TCompareOperator {
+ EQUAL
+ GREATER
+ GREATER_EQUAL
+ LESS
+ LESS_EQUAL
+}
+
+struct TWorkloadCondition {
+ 1: optional TWorkloadMetricType metric_name
+ 2: optional TCompareOperator op
+ 3: optional string value
+}
+
+enum TWorkloadActionType {
+ MOVE_QUERY_TO_GROUP
+ CANCEL_QUERY
+}
+
+struct TWorkloadAction {
+ 1: optional TWorkloadActionType action
+ 2: optional string action_args
+}
+
+struct TWorkloadSchedPolicy {
+ 1: optional i64 id
+ 2: optional string name
+ 3: optional i32 version
+ 4: optional i32 priority
+ 5: optional bool enabled
+ 6: optional list<TWorkloadCondition> condition_list
+ 7: optional list<TWorkloadAction> action_list
}
struct TopicInfo {
1: optional TWorkloadGroupInfo workload_group_info
2: optional TWorkloadMoveQueryToGroupAction move_action
+ 3: optional TWorkloadSchedPolicy workload_sched_policy
}
struct TPublishTopicRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]