This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 44d9aa43154 [opt](resource) step3: Remove WorkloadQueryInfo, replaced
by ResourceContext (#47960)
44d9aa43154 is described below
commit 44d9aa43154b4c740a4d3667c75627eb5a256b71
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Feb 17 18:55:19 2025 +0800
[opt](resource) step3: Remove WorkloadQueryInfo, replaced by
ResourceContext (#47960)
### What problem does this PR solve?
Modify `WorkloadSchedPolicy`, `WorkloadSchedPolicyMgr`, `WorkloadAction`
---
be/src/olap/delta_writer.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 12 +---
be/src/runtime/fragment_mgr.h | 2 +-
be/src/runtime/load_channel.cpp | 2 +-
be/src/runtime/query_context.cpp | 2 +-
be/src/runtime/query_context.h | 4 +-
be/src/runtime/runtime_query_statistics_mgr.cpp | 22 ------
be/src/runtime/runtime_query_statistics_mgr.h | 5 --
be/src/runtime/thread_context.cpp | 3 +-
be/src/runtime/thread_context.h | 84 +++++++++-------------
be/src/runtime/workload_management/cpu_context.cpp | 5 +-
.../workload_management/resource_context.cpp | 6 +-
.../runtime/workload_management/resource_context.h | 10 +--
.../workload_management/workload_action.cpp | 15 ++--
.../runtime/workload_management/workload_action.h | 29 ++++++--
.../workload_management/workload_group_context.h | 45 ------------
.../workload_management/workload_query_info.h | 37 ----------
.../workload_management/workload_sched_policy.cpp | 52 ++++++++++----
.../workload_management/workload_sched_policy.h | 10 +--
.../workload_sched_policy_mgr.cpp | 26 ++++---
.../runtime/memory/thread_mem_tracker_mgr_test.cpp | 12 ++--
21 files changed, 149 insertions(+), 236 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 69c2256a5eb..992dac27ec8 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -106,7 +106,7 @@ Status BaseDeltaWriter::init() {
auto* t_ctx = doris::thread_context(true);
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (t_ctx && t_ctx->is_attach_task()) {
- wg_sptr =
t_ctx->resource_ctx()->workload_group_context()->workload_group();
+ wg_sptr = t_ctx->resource_ctx()->workload_group();
}
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_memtable_writer->init(
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 4dfa966b20b..009dc34bb96 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -79,7 +79,6 @@
#include "runtime/types.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
-#include "runtime/workload_management/workload_query_info.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
@@ -1399,18 +1398,13 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
return merge_status;
}
-void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>*
query_info_list) {
+void FragmentMgr::get_runtime_query_info(
+ std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list) {
_query_ctx_map.apply(
[&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>&
map) -> Status {
for (auto iter = map.begin(); iter != map.end();) {
if (auto q_ctx = iter->second.lock()) {
- WorkloadQueryInfo workload_query_info;
- workload_query_info.query_id = print_id(iter->first);
- workload_query_info.tquery_id = iter->first;
- workload_query_info.wg_id = q_ctx->workload_group() ==
nullptr
- ? -1
- :
q_ctx->workload_group()->id();
- query_info_list->push_back(workload_query_info);
+ _resource_ctx_list->push_back(q_ctx->resource_ctx());
iter++;
} else {
iter = map.erase(iter);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 78f117a15db..9e37b7811f0 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -176,7 +176,7 @@ public:
std::string dump_pipeline_tasks(int64_t duration = 0);
std::string dump_pipeline_tasks(TUniqueId& query_id);
- void get_runtime_query_info(std::vector<WorkloadQueryInfo>*
_query_info_list);
+ void get_runtime_query_info(std::vector<std::weak_ptr<ResourceContext>>*
_resource_ctx_list);
Status get_realtime_exec_status(const TUniqueId& query_id,
TReportExecStatusParams* exec_status);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 99f41899215..ce10666d84c 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -62,7 +62,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
wg_ptr =
ExecEnv::GetInstance()->workload_group_mgr()->get_group(wg_id);
if (wg_ptr != nullptr) {
wg_ptr->add_mem_tracker_limiter(mem_tracker);
-
_resource_ctx->workload_group_context()->set_workload_group(wg_ptr);
+ _resource_ctx->set_workload_group(wg_ptr);
}
}
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a6cccc22091..9334db576ff 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -351,7 +351,7 @@ ThreadPool* QueryContext::get_memtable_flush_pool() {
}
void QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
- _resource_ctx->workload_group_context()->set_workload_group(wg);
+ _resource_ctx->set_workload_group(wg);
// Should add query first, then the workload group will not be deleted.
// see task_group_manager::delete_workload_group_by_ids
workload_group()->add_mem_tracker_limiter(query_mem_tracker());
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c02cf97bf1b..f8515f762d3 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -260,9 +260,7 @@ public:
bool is_nereids() const { return _is_nereids; }
- WorkloadGroupPtr workload_group() const {
- return _resource_ctx->workload_group_context()->workload_group();
- }
+ WorkloadGroupPtr workload_group() const { return
_resource_ctx->workload_group(); }
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
return _resource_ctx->memory_context()->mem_tracker();
}
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index f09558f7b9c..aa1e8546e02 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -453,28 +453,6 @@ void
RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
}
}
-void RuntimeQueryStatisticsMgr::get_metric_map(
- std::string query_id, std::map<WorkloadMetricType, std::string>&
metric_map) {
- std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
- if (_resource_contexts_map.find(query_id) != _resource_contexts_map.end())
{
- auto* resource_ctx = _resource_contexts_map.at(query_id).get();
- metric_map.emplace(
- WorkloadMetricType::QUERY_TIME,
- std::to_string(MonotonicMillis() -
resource_ctx->task_controller()->finish_time()));
- metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
-
std::to_string(resource_ctx->io_context()->scan_rows()));
- metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
-
std::to_string(resource_ctx->io_context()->scan_bytes()));
- metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
-
std::to_string(resource_ctx->memory_context()->current_memory_bytes()));
- } else {
- metric_map.emplace(WorkloadMetricType::QUERY_TIME, "-1");
- metric_map.emplace(WorkloadMetricType::SCAN_ROWS, "-1");
- metric_map.emplace(WorkloadMetricType::SCAN_BYTES, "-1");
- metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, "-1");
- }
-}
-
void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block*
block) {
std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.h
b/be/src/runtime/runtime_query_statistics_mgr.h
index 71a93ee9220..0bcdd647373 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.h
+++ b/be/src/runtime/runtime_query_statistics_mgr.h
@@ -56,11 +56,6 @@ public:
void report_runtime_query_statistics();
- // used for workload scheduler policy
- // TODO: save ResourceContext in WorkloadGroupMgr, put get_metric_map into
WorkloadGroupMgr.
- void get_metric_map(std::string query_id,
- std::map<WorkloadMetricType, std::string>& metric_map);
-
// used for backend_active_tasks
void get_active_be_tasks_block(vectorized::Block* block);
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 09394c8a721..266515ae2a6 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -64,8 +64,7 @@ SwitchResourceContext::SwitchResourceContext(const
std::shared_ptr<ResourceConte
old_resource_ctx_ = thread_context()->resource_ctx();
thread_context()->resource_ctx_ = rc;
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
- rc->memory_context()->mem_tracker(),
- rc->workload_group_context()->workload_group());
+ rc->memory_context()->mem_tracker(), rc->workload_group());
}
}
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 05cef5d5f9b..fbdbe397d40 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -111,54 +111,41 @@
__VA_ARGS__;
\
} while (0)
-#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read)
\
- std::shared_ptr<IOThrottle> iot = nullptr;
\
- auto* t_ctx = doris::thread_context(true);
\
- if (t_ctx && t_ctx->is_attach_task() &&
\
- t_ctx->resource_ctx()->workload_group_context()->workload_group() !=
nullptr) { \
- iot = t_ctx->resource_ctx()
\
- ->workload_group_context()
\
- ->workload_group()
\
- ->get_local_scan_io_throttle(data_dir);
\
- }
\
- if (iot) {
\
- iot->acquire(-1);
\
- }
\
- Defer defer {
\
- [&]() {
\
- if (iot) {
\
- iot->update_next_io_time(*bytes_read);
\
- t_ctx->resource_ctx()
\
- ->workload_group_context()
\
- ->workload_group()
\
- ->update_local_scan_io(data_dir, *bytes_read);
\
- }
\
- }
\
+#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read)
\
+ std::shared_ptr<IOThrottle> iot = nullptr;
\
+ auto* t_ctx = doris::thread_context(true);
\
+ if (t_ctx && t_ctx->is_attach_task() &&
t_ctx->resource_ctx()->workload_group() != nullptr) { \
+ iot =
t_ctx->resource_ctx()->workload_group()->get_local_scan_io_throttle(data_dir);
\
+ }
\
+ if (iot) {
\
+ iot->acquire(-1);
\
+ }
\
+ Defer defer {
\
+ [&]() {
\
+ if (iot) {
\
+ iot->update_next_io_time(*bytes_read);
\
+
t_ctx->resource_ctx()->workload_group()->update_local_scan_io(data_dir,
\
+
*bytes_read); \
+ }
\
+ }
\
}
-#define LIMIT_REMOTE_SCAN_IO(bytes_read)
\
- std::shared_ptr<IOThrottle> iot = nullptr;
\
- auto* t_ctx = doris::thread_context(true);
\
- if (t_ctx && t_ctx->is_attach_task() &&
\
- t_ctx->resource_ctx()->workload_group_context()->workload_group() !=
nullptr) { \
- iot = t_ctx->resource_ctx()
\
- ->workload_group_context()
\
- ->workload_group()
\
- ->get_remote_scan_io_throttle();
\
- }
\
- if (iot) {
\
- iot->acquire(-1);
\
- }
\
- Defer defer {
\
- [&]() {
\
- if (iot) {
\
- iot->update_next_io_time(*bytes_read);
\
- t_ctx->resource_ctx()
\
- ->workload_group_context()
\
- ->workload_group()
\
- ->update_remote_scan_io(*bytes_read);
\
- }
\
- }
\
+#define LIMIT_REMOTE_SCAN_IO(bytes_read)
\
+ std::shared_ptr<IOThrottle> iot = nullptr;
\
+ auto* t_ctx = doris::thread_context(true);
\
+ if (t_ctx && t_ctx->is_attach_task() &&
t_ctx->resource_ctx()->workload_group() != nullptr) { \
+ iot =
t_ctx->resource_ctx()->workload_group()->get_remote_scan_io_throttle();
\
+ }
\
+ if (iot) {
\
+ iot->acquire(-1);
\
+ }
\
+ Defer defer {
\
+ [&]() {
\
+ if (iot) {
\
+ iot->update_next_io_time(*bytes_read);
\
+
t_ctx->resource_ctx()->workload_group()->update_remote_scan_io(*bytes_read);
\
+ }
\
+ }
\
}
namespace doris {
@@ -195,9 +182,8 @@ public:
// will only attach_task at the beginning of the thread function,
there should be no duplicate attach_task.
DCHECK(resource_ctx_ == nullptr);
resource_ctx_ = rc;
- thread_mem_tracker_mgr->attach_limiter_tracker(
- rc->memory_context()->mem_tracker(),
- rc->workload_group_context()->workload_group());
+
thread_mem_tracker_mgr->attach_limiter_tracker(rc->memory_context()->mem_tracker(),
+ rc->workload_group());
thread_mem_tracker_mgr->enable_wait_gc();
}
diff --git a/be/src/runtime/workload_management/cpu_context.cpp
b/be/src/runtime/workload_management/cpu_context.cpp
index b6bbcc0da8a..ee7a0024b0c 100644
--- a/be/src/runtime/workload_management/cpu_context.cpp
+++ b/be/src/runtime/workload_management/cpu_context.cpp
@@ -25,9 +25,8 @@ namespace doris {
void CPUContext::update_cpu_cost_ms(int64_t delta) const {
stats_.cpu_cost_ms_counter_->update(delta);
- if (resource_ctx_ != nullptr &&
- resource_ctx_->workload_group_context()->workload_group() != nullptr) {
-
resource_ctx_->workload_group_context()->workload_group()->update_cpu_time(delta);
+ if (resource_ctx_ != nullptr && resource_ctx_->workload_group() !=
nullptr) {
+ resource_ctx_->workload_group()->update_cpu_time(delta);
}
}
diff --git a/be/src/runtime/workload_management/resource_context.cpp
b/be/src/runtime/workload_management/resource_context.cpp
index 765e4615505..f78ac56e9f4 100644
--- a/be/src/runtime/workload_management/resource_context.cpp
+++ b/be/src/runtime/workload_management/resource_context.cpp
@@ -48,7 +48,11 @@ void
ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
statistics->__set_scan_bytes_from_remote_storage(
io_context()->scan_bytes_from_remote_storage());
statistics->__set_scan_bytes_from_local_storage(io_context()->scan_bytes_from_local_storage());
-
statistics->__set_workload_group_id(workload_group_context()->workload_group_id());
+ if (workload_group() != nullptr) {
+ statistics->__set_workload_group_id(workload_group()->id());
+ } else {
+ statistics->__set_workload_group_id(-1);
+ }
}
} // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.h
b/be/src/runtime/workload_management/resource_context.h
index 0c163f515cb..64d59289db8 100644
--- a/be/src/runtime/workload_management/resource_context.h
+++ b/be/src/runtime/workload_management/resource_context.h
@@ -28,7 +28,6 @@
#include "runtime/workload_management/io_context.h"
#include "runtime/workload_management/memory_context.h"
#include "runtime/workload_management/task_controller.h"
-#include "runtime/workload_management/workload_group_context.h"
#include "util/runtime_profile.h"
namespace doris {
@@ -46,7 +45,6 @@ public:
cpu_context_ = CPUContext::create_unique();
memory_context_ = MemoryContext::create_unique();
io_context_ = IOContext::create_unique();
- workload_group_context_ = WorkloadGroupContext::create_unique();
task_controller_ = TaskController::create_unique();
cpu_context_->set_resource_ctx(this);
@@ -59,8 +57,8 @@ public:
CPUContext* cpu_context() const { return cpu_context_.get(); }
MemoryContext* memory_context() const { return memory_context_.get(); }
IOContext* io_context() const { return io_context_.get(); }
- WorkloadGroupContext* workload_group_context() const { return
workload_group_context_.get(); }
TaskController* task_controller() const { return task_controller_.get(); }
+ WorkloadGroupPtr workload_group() const { return _workload_group; }
void set_cpu_context(std::unique_ptr<CPUContext> cpu_context) {
cpu_context_ = std::move(cpu_context);
@@ -74,12 +72,10 @@ public:
io_context_ = std::move(io_context);
io_context_->set_resource_ctx(this);
}
- void set_workload_group_context(std::unique_ptr<WorkloadGroupContext>
wg_context) {
- workload_group_context_ = std::move(wg_context);
- }
void set_task_controller(std::unique_ptr<TaskController> task_controller) {
task_controller_ = std::move(task_controller);
}
+ void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
RuntimeProfile* profile() { return
const_cast<RuntimeProfile*>(resource_profile_.get().get()); }
@@ -109,9 +105,9 @@ private:
std::unique_ptr<CPUContext> cpu_context_ = nullptr;
std::unique_ptr<MemoryContext> memory_context_ = nullptr;
std::unique_ptr<IOContext> io_context_ = nullptr;
- std::unique_ptr<WorkloadGroupContext> workload_group_context_ = nullptr;
std::unique_ptr<TaskController> task_controller_ = nullptr;
+ WorkloadGroupPtr _workload_group = nullptr;
MultiVersion<RuntimeProfile> resource_profile_;
};
diff --git a/be/src/runtime/workload_management/workload_action.cpp
b/be/src/runtime/workload_management/workload_action.cpp
index 77042b074fd..f9e42b43afa 100644
--- a/be/src/runtime/workload_management/workload_action.cpp
+++ b/be/src/runtime/workload_management/workload_action.cpp
@@ -21,18 +21,19 @@
namespace doris {
-void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
+void WorkloadActionCancelQuery::exec(WorkloadAction::RuntimeContext*
action_runtime_ctx) {
std::stringstream msg;
- msg << "query " << query_info->query_id
- << " cancelled by workload policy: " << query_info->policy_name
- << ", id:" << query_info->policy_id << ", " <<
query_info->cond_eval_msg;
+ msg << "query " <<
print_id(action_runtime_ctx->resource_ctx->task_controller()->task_id())
+ << " cancelled by workload policy: " << action_runtime_ctx->policy_name
+ << ", id:" << action_runtime_ctx->policy_id << ", " <<
action_runtime_ctx->cond_eval_msg;
std::string msg_str = msg.str();
LOG(INFO) << "[workload_schedule]" << msg_str;
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_info->tquery_id,
-
Status::InternalError<false>(msg_str));
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+ action_runtime_ctx->resource_ctx->task_controller()->task_id(),
+ Status::InternalError<false>(msg_str));
}
-void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
+void WorkloadActionMoveQuery::exec(WorkloadAction::RuntimeContext*
action_runtime_ctx) {
LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name;
};
diff --git a/be/src/runtime/workload_management/workload_action.h
b/be/src/runtime/workload_management/workload_action.h
index 785acc73c3a..4943b13a5d5 100644
--- a/be/src/runtime/workload_management/workload_action.h
+++ b/be/src/runtime/workload_management/workload_action.h
@@ -17,36 +17,53 @@
#pragma once
-#include "runtime/workload_management/workload_query_info.h"
+#include <gen_cpp/BackendService_types.h>
+#include <glog/logging.h>
namespace doris {
+class ResourceContext;
+
enum WorkloadActionType { MOVE_QUERY_TO_GROUP = 0, CANCEL_QUERY = 1 };
class WorkloadAction {
public:
+ // only used as a temporary variable in
`WorkloadSchedPolicyMgr::_schedule_workload`.
+ struct RuntimeContext {
+ public:
+ RuntimeContext(const std::shared_ptr<ResourceContext>& ctx) :
resource_ctx(ctx) {}
+
+ int64_t policy_id;
+ std::string policy_name;
+ std::string cond_eval_msg;
+
+ std::shared_ptr<ResourceContext> resource_ctx;
+ };
+
WorkloadAction() = default;
virtual ~WorkloadAction() = default;
- virtual void exec(WorkloadQueryInfo* query_info) = 0;
+ virtual void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) = 0;
virtual WorkloadActionType get_action_type() = 0;
};
class WorkloadActionCancelQuery : public WorkloadAction {
public:
- void exec(WorkloadQueryInfo* query_info) override;
+ void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override;
- WorkloadActionType get_action_type() override { return CANCEL_QUERY; }
+ WorkloadActionType get_action_type() override { return
WorkloadActionType::CANCEL_QUERY; }
};
//todo(wb) implement it
class WorkloadActionMoveQuery : public WorkloadAction {
public:
WorkloadActionMoveQuery(std::string wg_name) : _wg_name(wg_name) {}
- void exec(WorkloadQueryInfo* query_info) override;
+ void exec(WorkloadAction::RuntimeContext* action_runtime_ctx) override;
- WorkloadActionType get_action_type() override { return
MOVE_QUERY_TO_GROUP; }
+ WorkloadActionType get_action_type() override {
+ return WorkloadActionType::MOVE_QUERY_TO_GROUP;
+ }
private:
std::string _wg_name;
diff --git a/be/src/runtime/workload_management/workload_group_context.h
b/be/src/runtime/workload_management/workload_group_context.h
deleted file mode 100644
index 38cad310925..00000000000
--- a/be/src/runtime/workload_management/workload_group_context.h
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/factory_creator.h"
-#include "runtime/workload_group/workload_group.h"
-
-namespace doris {
-
-class WorkloadGroupContext {
- ENABLE_FACTORY_CREATOR(WorkloadGroupContext);
-
-public:
- WorkloadGroupContext() = default;
- virtual ~WorkloadGroupContext() = default;
-
- int64_t workload_group_id() {
- if (workload_group() != nullptr) {
- return workload_group()->id();
- }
- return -1;
- }
- WorkloadGroupPtr workload_group() { return _workload_group; }
- void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
-
-protected:
- WorkloadGroupPtr _workload_group = nullptr;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/workload_management/workload_query_info.h
b/be/src/runtime/workload_management/workload_query_info.h
deleted file mode 100644
index 16151eec390..00000000000
--- a/be/src/runtime/workload_management/workload_query_info.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <map>
-
-#include "runtime/workload_management/workload_condition.h"
-
-namespace doris {
-
-class WorkloadQueryInfo {
-public:
- std::map<WorkloadMetricType, std::string> metric_map;
- TUniqueId tquery_id;
- std::string query_id;
- int64_t wg_id;
- int64_t policy_id;
- std::string policy_name {""};
- std::string cond_eval_msg {""};
-};
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index 63b9362bc21..597daafa6bc 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -17,6 +17,9 @@
#include "runtime/workload_management/workload_sched_policy.h"
+#include "runtime/workload_management/resource_context.h"
+#include "util/time.h"
+
namespace doris {
void WorkloadSchedPolicy::init(int64_t id, std::string name, int version, bool
enabled,
@@ -47,26 +50,47 @@ void WorkloadSchedPolicy::init(int64_t id, std::string
name, int version, bool e
}
}
-bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) {
+bool WorkloadSchedPolicy::is_match(WorkloadAction::RuntimeContext*
action_runtime_ctx) const {
if (!_enabled) {
return false;
}
// 1 when policy has no group(_wg_id_set.size() < 0), it should match all
query
// 2 when policy has group, it can only match the query which has the same
group
- if (_wg_id_set.size() > 0 && (query_info_ptr->wg_id <= 0 ||
- _wg_id_set.find(query_info_ptr->wg_id) ==
_wg_id_set.end())) {
+ if (!_wg_id_set.empty() &&
+ (action_runtime_ctx->resource_ctx->workload_group() == nullptr ||
+
_wg_id_set.find(action_runtime_ctx->resource_ctx->workload_group()->id()) ==
+ _wg_id_set.end())) {
return false;
}
- auto& metric_val_map = query_info_ptr->metric_map;
- std::string cond_eval_msg = "";
- for (auto& cond : _condition_list) {
- if (metric_val_map.find(cond->get_workload_metric_type()) ==
metric_val_map.end()) {
+ std::string cond_eval_msg;
+ for (const auto& cond : _condition_list) {
+ std::string val;
+ switch (cond->get_workload_metric_type()) {
+ case WorkloadMetricType::QUERY_TIME: {
+ val = std::to_string(
+ MonotonicMillis() -
+
action_runtime_ctx->resource_ctx->task_controller()->finish_time());
+ break;
+ }
+ case WorkloadMetricType::SCAN_BYTES: {
+ val =
std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_bytes());
+ break;
+ }
+ case WorkloadMetricType::SCAN_ROWS: {
+ val =
std::to_string(action_runtime_ctx->resource_ctx->io_context()->scan_rows());
+ break;
+ }
+ case WorkloadMetricType::QUERY_MEMORY_BYTES: {
+ val = std::to_string(
+
action_runtime_ctx->resource_ctx->memory_context()->current_memory_bytes());
+ break;
+ }
+ default:
return false;
}
- std::string val = metric_val_map.at(cond->get_workload_metric_type());
if (!cond->eval(val)) {
return false;
}
@@ -74,15 +98,15 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo*
query_info_ptr) {
cond->get_metric_value_string() + "), ";
}
cond_eval_msg = cond_eval_msg.substr(0, cond_eval_msg.size() - 2);
- query_info_ptr->cond_eval_msg = cond_eval_msg;
+ action_runtime_ctx->cond_eval_msg = cond_eval_msg;
return true;
}
-void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
- for (int i = 0; i < _action_list.size(); i++) {
- query_info->policy_id = this->_id;
- query_info->policy_name = this->_name;
- _action_list[i]->exec(query_info);
+void WorkloadSchedPolicy::exec_action(WorkloadAction::RuntimeContext*
action_runtime_ctx) {
+ for (auto& action : _action_list) {
+ action_runtime_ctx->policy_id = this->_id;
+ action_runtime_ctx->policy_name = this->_name;
+ action->exec(action_runtime_ctx);
}
}
diff --git a/be/src/runtime/workload_management/workload_sched_policy.h
b/be/src/runtime/workload_management/workload_sched_policy.h
index 6554634d9af..ce1e4b6655e 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.h
+++ b/be/src/runtime/workload_management/workload_sched_policy.h
@@ -34,16 +34,16 @@ public:
std::vector<std::unique_ptr<WorkloadCondition>> condition_list,
std::vector<std::unique_ptr<WorkloadAction>> action_list);
- bool enabled() { return _enabled; }
- int priority() { return _priority; }
+ bool enabled() const { return _enabled; }
+ int priority() const { return _priority; }
- bool is_match(WorkloadQueryInfo* query_info);
+ bool is_match(WorkloadAction::RuntimeContext* action_runtime_ctx) const;
WorkloadActionType get_first_action_type() { return _first_action_type; }
- void exec_action(WorkloadQueryInfo* query_info);
+ void exec_action(WorkloadAction::RuntimeContext* action_runtime_ctx);
- int version() { return _version; }
+ int version() const { return _version; }
private:
int64_t _id;
diff --git a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
index 4690ed1d4f2..4802704c136 100644
--- a/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy_mgr.cpp
@@ -17,8 +17,10 @@
#include "runtime/workload_management/workload_sched_policy_mgr.h"
+#include <memory>
+
#include "runtime/fragment_mgr.h"
-#include "runtime/runtime_query_statistics_mgr.h"
+#include "runtime/workload_management/resource_context.h"
namespace doris {
@@ -75,18 +77,20 @@ void WorkloadSchedPolicyMgr::update_workload_sched_policy(
void WorkloadSchedPolicyMgr::_schedule_workload() {
while (!_stop_latch.wait_for(std::chrono::milliseconds(500))) {
- // 1 get query info
- std::vector<WorkloadQueryInfo> list;
+ // 1 get query resource context
+ std::vector<std::weak_ptr<ResourceContext>> list;
_exec_env->fragment_mgr()->get_runtime_query_info(&list);
// todo: add timer
- if (list.size() == 0) {
+ if (list.empty()) {
continue;
}
- for (int i = 0; i < list.size(); i++) {
- WorkloadQueryInfo* query_info_ptr = &(list[i]);
-
_exec_env->runtime_query_statistics_mgr()->get_metric_map(query_info_ptr->query_id,
-
query_info_ptr->metric_map);
+ for (const auto& i : list) {
+ auto resource_ctx = i.lock();
+ if (resource_ctx == nullptr) {
+ continue;
+ }
+ WorkloadAction::RuntimeContext action_runtime_ctx(resource_ctx);
// 2 get matched policy
std::map<WorkloadActionType, std::shared_ptr<WorkloadSchedPolicy>>
matched_policy_map;
@@ -94,7 +98,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
std::shared_lock<std::shared_mutex> read_lock(_policy_lock);
for (auto& entity : _id_policy_map) {
auto& new_policy = entity.second;
- if (new_policy->is_match(query_info_ptr)) {
+ if (new_policy->is_match(&action_runtime_ctx)) {
WorkloadActionType new_policy_type =
new_policy->get_first_action_type();
if (matched_policy_map.find(new_policy_type) ==
matched_policy_map.end() ||
new_policy->priority() >
@@ -105,7 +109,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
}
}
- if (matched_policy_map.size() == 0) {
+ if (matched_policy_map.empty()) {
continue;
}
LOG(INFO) << "[workload_schedule] matched policy size=" <<
matched_policy_map.size();
@@ -128,7 +132,7 @@ void WorkloadSchedPolicyMgr::_schedule_workload() {
// 4 exec policy action
for (const auto& [key, value] : matched_policy_map) {
- value->exec_action(query_info_ptr);
+ value->exec_action(&action_runtime_ctx);
}
}
}
diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
index 94e32f67810..2049bb29043 100644
--- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
+++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
@@ -43,7 +43,7 @@ TEST_F(ThreadMemTrackerMgrTest, ConsumeMemory) {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"UT-ConsumeMemory");
std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
rc->memory_context()->set_mem_tracker(t);
- rc->workload_group_context()->set_workload_group(workload_group);
+ rc->set_workload_group(workload_group);
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
@@ -104,7 +104,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker3");
std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
rc->memory_context()->set_mem_tracker(t1);
- rc->workload_group_context()->set_workload_group(workload_group);
+ rc->set_workload_group(workload_group);
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
@@ -177,7 +177,7 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) {
std::shared_ptr<MemTracker> t3 =
std::make_shared<MemTracker>("UT-MultiMemTracker3");
std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
rc->memory_context()->set_mem_tracker(t1);
- rc->workload_group_context()->set_workload_group(workload_group);
+ rc->set_workload_group(workload_group);
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
@@ -239,7 +239,7 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"UT-ReserveMemory");
std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
rc->memory_context()->set_mem_tracker(t);
- rc->workload_group_context()->set_workload_group(workload_group);
+ rc->set_workload_group(workload_group);
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
@@ -337,7 +337,7 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory");
std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
rc->memory_context()->set_mem_tracker(t);
- rc->workload_group_context()->set_workload_group(workload_group);
+ rc->set_workload_group(workload_group);
int64_t size2 = 4 * 1024 * 1024;
int64_t size3 = size2 * 2;
@@ -396,7 +396,7 @@ TEST_F(ThreadMemTrackerMgrTest,
NestedSwitchMemTrackerReserveMemory) {
MemTrackerLimiter::Type::OTHER,
"UT-NestedSwitchMemTrackerReserveMemory3");
std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
rc->memory_context()->set_mem_tracker(t1);
- rc->workload_group_context()->set_workload_group(workload_group);
+ rc->set_workload_group(workload_group);
int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]