This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9ceceb4d78f [fix](memory) Fix erase invalid MemTrackerLimiter from
tracker pool (#33074)
9ceceb4d78f is described below
commit 9ceceb4d78f8524c0f0eac723cfe60063dd1e0e3
Author: Xinyi Zou <[email protected]>
AuthorDate: Sun Mar 31 13:28:34 2024 +0800
[fix](memory) Fix erase invalid MemTrackerLimiter from tracker pool (#33074)
---
be/src/runtime/memory/mem_tracker_limiter.cpp | 41 +++++++++++++++------------
be/src/runtime/memory/mem_tracker_limiter.h | 3 ++
2 files changed, 26 insertions(+), 18 deletions(-)
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index ebe874834c5..57864c3b07b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -159,12 +159,19 @@ void MemTrackerLimiter::refresh_global_counter() {
{Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0},
{Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::OTHER, 0}};
// always ExecEnv::ready(), because Daemon::_stop_background_threads_latch
- for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
+ for (unsigned i = 0; i <
ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
+ TrackerLimiterGroup& group =
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i];
std::lock_guard<std::mutex> l(group.group_lock);
- for (auto trackerWptr : group.trackers) {
- auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- type_mem_sum[tracker->type()] += tracker->consumption();
+ for (auto it = group.trackers.begin(); it != group.trackers.end();) {
+ auto tracker = (*it).lock();
+ if (tracker == nullptr) {
+ LOG(WARNING) << "abnormally invalid MemTrackerLimiter, env
tracking memory: "
+ << ExecEnv::tracking_memory() << ", group num: "
<< i;
+ it = group.trackers.erase(it);
+ } else {
+ type_mem_sum[tracker->type()] += tracker->consumption();
+ ++it;
+ }
}
}
for (auto it : type_mem_sum) {
@@ -223,9 +230,10 @@ void
MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* s
ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock);
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) {
auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- (*snapshots).emplace_back(tracker->make_snapshot());
- MemTracker::make_group_snapshot(snapshots, tracker->group_num(),
tracker->label());
+ if (tracker != nullptr) {
+ (*snapshots).emplace_back(tracker->make_snapshot());
+ MemTracker::make_group_snapshot(snapshots,
tracker->group_num(), tracker->label());
+ }
}
} else {
for (unsigned i = 1; i <
ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
@@ -233,8 +241,7 @@ void
MemTrackerLimiter::make_type_snapshots(std::vector<MemTracker::Snapshot>* s
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- if (tracker->type() == type) {
+ if (tracker != nullptr && tracker->type() == type) {
(*snapshots).emplace_back(tracker->make_snapshot());
MemTracker::make_group_snapshot(snapshots,
tracker->group_num(),
tracker->label());
@@ -253,8 +260,9 @@ void
MemTrackerLimiter::make_top_consumption_snapshots(std::vector<MemTracker::S
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- max_pq.emplace(tracker->make_snapshot());
+ if (tracker != nullptr) {
+ max_pq.emplace(tracker->make_snapshot());
+ }
}
}
@@ -286,8 +294,7 @@ std::string MemTrackerLimiter::type_detail_usage(const
std::string& msg, Type ty
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- if (tracker->type() == type) {
+ if (tracker != nullptr && tracker->type() == type) {
detail += "\n " +
MemTrackerLimiter::log_usage(tracker->make_snapshot());
}
}
@@ -468,8 +475,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
for (auto trackerWptr : tracker_groups[i].trackers) {
auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- if (tracker->type() == type) {
+ if (tracker != nullptr && tracker->type() == type) {
seek_num++;
if (tracker->is_query_cancelled()) {
canceling_task.push_back(fmt::format("{}:{} Bytes",
tracker->label(),
@@ -593,8 +599,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
for (auto trackerWptr : tracker_groups[i].trackers) {
auto tracker = trackerWptr.lock();
- CHECK(tracker != nullptr);
- if (tracker->type() == type) {
+ if (tracker != nullptr && tracker->type() == type) {
seek_num++;
// 32M small query does not cancel
if (tracker->consumption() <= 33554432 ||
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 164a9aa9e52..d22a79c73f7 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -46,6 +46,9 @@ class RuntimeProfile;
constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
struct TrackerLimiterGroup {
+ // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support
resize,
+ // the copy construction of TrackerLimiterGroup is disabled.
+ // so cannot copy TrackerLimiterGroup anywhere, should use reference.
TrackerLimiterGroup() = default;
TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {}
TrackerLimiterGroup(const TrackerLimiterGroup&) {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]