This is an automated email from the ASF dual-hosted git repository.
luozenglin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 81799d614e [feature-wip](resource-group) support resource group
interface in be. (#18588)
81799d614e is described below
commit 81799d614eedf5036171532f6b800825da1d905b
Author: luozenglin <[email protected]>
AuthorDate: Fri Apr 14 14:00:49 2023 +0800
[feature-wip](resource-group) support resource group interface in be.
(#18588)
---
be/src/pipeline/task_queue.cpp | 16 +++++++
be/src/pipeline/task_queue.h | 11 +++++
be/src/pipeline/task_scheduler.cpp | 8 ++++
be/src/pipeline/task_scheduler.h | 3 ++
be/src/runtime/fragment_mgr.cpp | 31 +++++++-----
be/src/runtime/task_group/task_group.cpp | 60 ++++++++++++++++++++++--
be/src/runtime/task_group/task_group.h | 36 ++++++++++++--
be/src/runtime/task_group/task_group_manager.cpp | 35 +++++++-------
be/src/runtime/task_group/task_group_manager.h | 12 +----
9 files changed, 160 insertions(+), 52 deletions(-)
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 0716afa924..23a601d5c0 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -346,5 +346,21 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask*
task, int64_t time_spen
}
}
+void TaskGroupTaskQueue::update_task_group(const taskgroup::TaskGroupInfo&
task_group_info,
+ taskgroup::TaskGroupPtr&
task_group) {
+ std::unique_lock<std::mutex> lock(_rs_mutex);
+ auto* entity = task_group->task_entity();
+ bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
+ if (is_in_queue) {
+ _group_entities.erase(entity);
+ _total_cpu_share -= entity->cpu_share();
+ }
+ task_group->check_and_update(task_group_info);
+ if (is_in_queue) {
+ _group_entities.emplace(entity);
+ _total_cpu_share += entity->cpu_share();
+ }
+}
+
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 6d225bc1a5..39b57815ad 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -44,6 +44,9 @@ public:
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
+ virtual void update_task_group(const taskgroup::TaskGroupInfo&
task_group_info,
+ taskgroup::TaskGroupPtr& task_group) = 0;
+
int cores() const { return _core_size; }
protected:
@@ -126,6 +129,11 @@ public:
// TODO pipeline update NormalWorkTaskQueue by time_spent.
// void update_statistics(PipelineTask* task, int64_t time_spent) override;
+ void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
+ taskgroup::TaskGroupPtr& task_group) override {
+ LOG(FATAL) << "update_task_group not implemented";
+ }
+
private:
PipelineTask* _steal_take(size_t core_id);
@@ -151,6 +159,9 @@ public:
void update_statistics(PipelineTask* task, int64_t time_spent) override;
+ void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
+ taskgroup::TaskGroupPtr& task_group) override;
+
private:
template <bool from_executor>
Status _push_back(PipelineTask* task);
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index e90fb5f536..70d1bd518f 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -334,4 +334,12 @@ void TaskScheduler::shutdown() {
}
}
+void TaskScheduler::try_update_task_group(const taskgroup::TaskGroupInfo&
task_group_info,
+ taskgroup::TaskGroupPtr& task_group)
{
+ if (!task_group->check_version(task_group_info._version)) {
+ return;
+ }
+ _task_queue->update_task_group(task_group_info, task_group);
+}
+
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 50284664f3..777923dbd3 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -72,6 +72,9 @@ public:
void shutdown();
+ void try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
+ taskgroup::TaskGroupPtr& task_group);
+
private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
std::shared_ptr<TaskQueue> _task_queue;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index aefcf57158..07ef9f3c0a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -35,6 +35,7 @@
#include "io/fs/stream_load_pipe.h"
#include "opentelemetry/trace/scope.h"
#include "pipeline/pipeline_fragment_context.h"
+#include "pipeline/task_scheduler.h"
#include "runtime/client_cache.h"
#include "runtime/datetime_value.h"
#include "runtime/descriptors.h"
@@ -654,19 +655,6 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
fragments_ctx->query_mem_tracker->enable_print_log_usage();
}
- if (pipeline) {
- int ts = fragments_ctx->timeout_second;
- taskgroup::TaskGroupPtr tg;
- auto ts_id = taskgroup::TaskGroupManager::DEFAULT_TG_ID;
- if (ts > 0 && ts <= config::pipeline_short_query_timeout_s) {
- ts_id = taskgroup::TaskGroupManager::SHORT_TG_ID;
- }
- tg =
taskgroup::TaskGroupManager::instance()->get_task_group(ts_id);
- fragments_ctx->set_task_group(tg);
- LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id)
- << "use task group: " << tg->debug_string();
- }
-
{
// Find _fragments_ctx_map again, in case some other request has
already
// create the query fragments context.
@@ -779,6 +767,23 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true,
fragments_ctx));
+ if (params.__isset.resource_groups && !params.resource_groups.empty()) {
+ taskgroup::TaskGroupInfo task_group_info;
+ auto status =
taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0],
+
&task_group_info);
+ if (status.ok()) {
+ auto tg =
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
+ task_group_info);
+
_exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info,
tg);
+ fragments_ctx->set_task_group(tg);
+ LOG(INFO) << "Query/load id: " << print_id(fragments_ctx->query_id)
+ << " use task group: " << tg->debug_string();
+ }
+ } else {
+ VLOG_DEBUG << "Query/load id: " << print_id(fragments_ctx->query_id)
+ << " does not use task group.";
+ }
+
for (size_t i = 0; i < params.local_params.size(); i++) {
const auto& local_params = params.local_params[i];
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 63f0a564fa..783470be45 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -17,6 +17,9 @@
#include "task_group.h"
+#include <charconv>
+
+#include "gen_cpp/PaloInternalService_types.h"
#include "pipeline/pipeline_task.h"
namespace doris {
@@ -32,7 +35,7 @@ pipeline::PipelineTask* TaskGroupEntity::take() {
}
void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) {
- auto v_time = runtime_ns / _tg->share();
+ auto v_time = runtime_ns / _tg->cpu_share();
_vruntime_ns += v_time;
}
@@ -46,7 +49,7 @@ void TaskGroupEntity::push_back(pipeline::PipelineTask* task)
{
}
uint64_t TaskGroupEntity::cpu_share() const {
- return _tg->share();
+ return _tg->cpu_share();
}
std::string TaskGroupEntity::debug_string() const {
@@ -54,11 +57,58 @@ std::string TaskGroupEntity::debug_string() const {
cpu_share(), _queue.size(), _vruntime_ns);
}
-TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t share)
- : _id(id), _name(name), _share(share), _task_entity(this) {}
+TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share,
int64_t version)
+ : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this),
_version(version) {}
std::string TaskGroup::debug_string() const {
- return fmt::format("TG[id = {}, name = {}, share = {}", _id, _name,
share());
+ std::shared_lock<std::shared_mutex> rl {mutex};
+ return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]",
_id, _name,
+ cpu_share(), _version);
+}
+
+bool TaskGroup::check_version(int64_t version) const {
+ std::shared_lock<std::shared_mutex> rl {mutex};
+ return version > _version;
+}
+
+void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
+ if (tg_info._id != _id) {
+ return;
+ }
+
+ std::lock_guard<std::shared_mutex> wl {mutex};
+ if (tg_info._version > _version) {
+ _name = tg_info._name;
+ _cpu_share = tg_info._cpu_share;
+ _version = tg_info._version;
+ }
+}
+
+Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup&
resource_group,
+ TaskGroupInfo* task_group_info) {
+ if (!check_group_info(resource_group)) {
+ std::stringstream ss;
+ ss << "incomplete resource group parameters: ";
+ resource_group.printTo(ss);
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ auto iter = resource_group.properties.find(CPU_SHARE);
+ uint64_t share = 0;
+ std::from_chars(iter->second.c_str(), iter->second.c_str() +
iter->second.size(), share);
+
+ task_group_info->_id = resource_group.id;
+ task_group_info->_name = resource_group.name;
+ task_group_info->_version = resource_group.version;
+ task_group_info->_cpu_share = share;
+ return Status::OK();
+}
+
+bool TaskGroupInfo::check_group_info(const TPipelineResourceGroup&
resource_group) {
+ return resource_group.__isset.id && resource_group.__isset.version &&
+ resource_group.__isset.name && resource_group.__isset.properties &&
+ resource_group.properties.count(CPU_SHARE) > 0;
}
} // namespace taskgroup
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index 62f06eb2dd..d6eed42a19 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -14,8 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
#pragma once
+
+#include <atomic>
#include <queue>
+#include <shared_mutex>
#include "olap/olap_define.h"
@@ -26,10 +30,14 @@ class PipelineTask;
}
class QueryFragmentsCtx;
+class TPipelineResourceGroup;
namespace taskgroup {
class TaskGroup;
+struct TaskGroupInfo;
+
+const static std::string CPU_SHARE = "cpu_share";
class TaskGroupEntity {
public:
@@ -60,23 +68,43 @@ using TGEntityPtr = TaskGroupEntity*;
class TaskGroup {
public:
- TaskGroup(uint64_t id, std::string name, uint64_t cpu_share);
+ TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t
version);
TaskGroupEntity* task_entity() { return &_task_entity; }
- uint64_t share() const { return _share; }
+ uint64_t cpu_share() const { return _cpu_share.load(); }
+
uint64_t id() const { return _id; }
std::string debug_string() const;
+ bool check_version(int64_t version) const;
+
+ void check_and_update(const TaskGroupInfo& tg_info);
+
private:
- uint64_t _id;
+ mutable std::shared_mutex mutex;
+ const uint64_t _id;
std::string _name;
- uint64_t _share;
+ std::atomic<uint64_t> _cpu_share;
TaskGroupEntity _task_entity;
+ int64_t _version;
};
using TaskGroupPtr = std::shared_ptr<TaskGroup>;
+struct TaskGroupInfo {
+ uint64_t _id;
+ std::string _name;
+ uint64_t _cpu_share;
+ int64_t _version;
+
+ static Status parse_group_info(const TPipelineResourceGroup&
resource_group,
+ TaskGroupInfo* task_group_info);
+
+private:
+ static bool check_group_info(const TPipelineResourceGroup& resource_group);
+};
+
} // namespace taskgroup
} // namespace doris
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index c359470579..cb7de92a6e 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -19,10 +19,7 @@
namespace doris::taskgroup {
-TaskGroupManager::TaskGroupManager() {
- _create_default_task_group();
- _create_short_task_group();
-}
+TaskGroupManager::TaskGroupManager() = default;
TaskGroupManager::~TaskGroupManager() = default;
TaskGroupManager* TaskGroupManager::instance() {
@@ -30,23 +27,23 @@ TaskGroupManager* TaskGroupManager::instance() {
return &tgm;
}
-TaskGroupPtr TaskGroupManager::get_task_group(uint64_t id) {
- std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
- if (_task_groups.count(id)) {
- return _task_groups[id];
- } else {
- return _task_groups[DEFAULT_TG_ID];
+TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo&
task_group_info) {
+ {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ if (_task_groups.count(task_group_info._id)) {
+ return _task_groups[task_group_info._id];
+ }
}
-}
-void TaskGroupManager::_create_default_task_group() {
- _task_groups[DEFAULT_TG_ID] =
- std::make_shared<TaskGroup>(DEFAULT_TG_ID, "default_tg",
DEFAULT_TG_CPU_SHARE);
-}
-
-void TaskGroupManager::_create_short_task_group() {
- _task_groups[SHORT_TG_ID] =
- std::make_shared<TaskGroup>(SHORT_TG_ID, "short_tg",
SHORT_TG_CPU_SHARE);
+ auto new_task_group =
+ std::make_shared<TaskGroup>(task_group_info._id,
task_group_info._name,
+ task_group_info._cpu_share,
task_group_info._version);
+ std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+ if (_task_groups.count(task_group_info._id)) {
+ return _task_groups[task_group_info._id];
+ }
+ _task_groups[task_group_info._id] = new_task_group;
+ return new_task_group;
}
} // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 4754e949fd..d5e32ac64e 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -29,19 +29,9 @@ public:
~TaskGroupManager();
static TaskGroupManager* instance();
- // TODO pipeline task group
- TaskGroupPtr get_task_group(uint64_t id);
-
- static constexpr uint64_t DEFAULT_TG_ID = 0;
- static constexpr uint64_t DEFAULT_TG_CPU_SHARE = 64;
-
- static constexpr uint64_t SHORT_TG_ID = 1;
- static constexpr uint64_t SHORT_TG_CPU_SHARE = 128;
+ TaskGroupPtr get_or_create_task_group(const TaskGroupInfo&
task_group_info);
private:
- void _create_default_task_group();
- void _create_short_task_group();
-
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]