wangbo commented on code in PR #19526:
URL: https://github.com/apache/doris/pull/19526#discussion_r1192956627
##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -63,36 +72,105 @@ std::string TaskGroupEntity::debug_string() const {
cpu_share(), _queue.size(), _vruntime_ns);
}
-TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share,
int64_t version)
- : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this),
_version(version) {}
+TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
+ : _id(tg_info.id),
+ _name(tg_info.name),
+ _cpu_share(tg_info.cpu_share),
+ _memory_limit(tg_info.memory_limit),
+ _version(tg_info.version),
+ _task_entity(this),
+ _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
std::string TaskGroup::debug_string() const {
- std::shared_lock<std::shared_mutex> rl {mutex};
- return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]",
_id, _name,
- cpu_share(), _version);
-}
-
-bool TaskGroup::check_version(int64_t version) const {
- std::shared_lock<std::shared_mutex> rl {mutex};
- return version > _version;
+ std::shared_lock<std::shared_mutex> rl {_mutex};
+ return fmt::format("TG[id = {}, name = {}, cpu_share = {}, memory_limit =
{}, version = {}]",
+ _id, _name, cpu_share(),
PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+ _version);
}
void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
- if (tg_info._id != _id) {
+ if (UNLIKELY(tg_info.id != _id)) {
return;
}
+ {
+ std::shared_lock<std::shared_mutex> rl {_mutex};
+ if (LIKELY(tg_info.version <= _version)) {
+ return;
+ }
+ }
- std::lock_guard<std::shared_mutex> wl {mutex};
- if (tg_info._version > _version) {
- _name = tg_info._name;
- _cpu_share = tg_info._cpu_share;
- _version = tg_info._version;
+ std::lock_guard<std::shared_mutex> wl {_mutex};
+ if (tg_info.version > _version) {
+ _name = tg_info.name;
+ _version = tg_info.version;
+ _memory_limit = tg_info.memory_limit;
+ if (_cpu_share != tg_info.cpu_share) {
+
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share(
+ tg_info, shared_from_this());
+ }
}
}
+void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) {
+ _cpu_share = tg_info.cpu_share;
+}
+
+std::list<MemTrackerLimiter*>::iterator TaskGroup::add_mem_tracker_limiter(
+ MemTrackerLimiter* mem_tracker_ptr, int64_t group_num) {
+ std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
+ return _mem_tracker_limiter_pool[group_num].trackers.insert(
+ _mem_tracker_limiter_pool[group_num].trackers.end(),
mem_tracker_ptr);
+}
+
+void TaskGroup::remove_mem_tracker_limiter(int64_t group_num,
+ const
std::list<MemTrackerLimiter*>::iterator& iter) {
+ std::lock_guard<std::mutex>
l(_mem_tracker_limiter_pool[group_num].group_lock);
+ _mem_tracker_limiter_pool[group_num].trackers.erase(iter);
+}
+
+int64_t TaskGroup::memory_limit_gc() {
Review Comment:
From the perspective of code style, maybe it's better to move this method to
mem_tracker_limiter.
RG GC logic can be more complex later, I think we can sperate it to another
place, it could be current mem_tracker_limiter, or a
ResourceGroupMemTrackerLimiter.
TaskGroup just need to hold TaskGroup's info.
We can discuss it later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]