This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 3e2ae923f7 [dev-1.1.2](cherry-pick) Optimize readability of mem exceed
limit error message #11943
3e2ae923f7 is described below
commit 3e2ae923f7aa5f4d3f3e4dc9c64a9982a15c764d
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Aug 22 08:46:34 2022 +0800
[dev-1.1.2](cherry-pick) Optimize readability of mem exceed limit error
message #11943
---
be/src/runtime/memory/mem_tracker.cpp | 39 ++++++-------
be/src/runtime/memory/mem_tracker.h | 8 +--
be/src/runtime/memory/mem_tracker_limiter.cpp | 70 +++++++++++++++++++-----
be/src/runtime/memory/mem_tracker_limiter.h | 29 +++++++---
be/src/runtime/memory/mem_tracker_task_pool.cpp | 4 +-
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 16 ++----
be/src/runtime/memory/thread_mem_tracker_mgr.h | 4 +-
be/src/runtime/runtime_state.cpp | 4 +-
be/src/runtime/thread_context.h | 9 +--
be/src/service/doris_main.cpp | 1 +
10 files changed, 116 insertions(+), 68 deletions(-)
diff --git a/be/src/runtime/memory/mem_tracker.cpp
b/be/src/runtime/memory/mem_tracker.cpp
index dda2c43102..c4801d6611 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -42,7 +42,7 @@ struct TrackerGroup {
// Multiple groups are used to reduce the impact of locks.
static std::vector<TrackerGroup> mem_tracker_pool(1000);
-NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile*
profile, bool is_limiter) {
+NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile*
profile) {
if (profile == nullptr) {
_consumption =
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
} else {
@@ -58,30 +58,24 @@ NewMemTracker::NewMemTracker(const std::string& label,
RuntimeProfile* profile,
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME,
TUnit::BYTES);
}
- _is_limiter = is_limiter;
- if (!_is_limiter) {
- if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
- _label = fmt::format(
- "{} | {}", label,
-
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
- } else {
- _label = label + " | ";
- }
-
- _bind_group_num =
-
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
- {
- std::lock_guard<std::mutex>
l(mem_tracker_pool[_bind_group_num].group_lock);
- _tracker_group_it =
mem_tracker_pool[_bind_group_num].trackers.insert(
- mem_tracker_pool[_bind_group_num].trackers.end(), this);
- }
+ if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
+ _label = fmt::format(
+ "{} | {}", label,
+
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
} else {
- _label = label;
+ _label = label + " | ";
+ }
+
+ _bind_group_num =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
+ {
+ std::lock_guard<std::mutex>
l(mem_tracker_pool[_bind_group_num].group_lock);
+ _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
+ mem_tracker_pool[_bind_group_num].trackers.end(), this);
}
}
NewMemTracker::~NewMemTracker() {
- if (!_is_limiter) {
+ if (_bind_group_num != -1) {
std::lock_guard<std::mutex>
l(mem_tracker_pool[_bind_group_num].group_lock);
if (_tracker_group_it !=
mem_tracker_pool[_bind_group_num].trackers.end()) {
mem_tracker_pool[_bind_group_num].trackers.erase(_tracker_group_it);
@@ -102,8 +96,9 @@ NewMemTracker::Snapshot NewMemTracker::make_snapshot(size_t
level) const {
return snapshot;
}
-void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>*
snapshots, size_t level,
- int64_t group_num, std::string
related_label) {
+void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>*
snapshots,
+ size_t level, int64_t group_num,
+ std::string related_label) {
std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
for (auto tracker : mem_tracker_pool[group_num].trackers) {
if (split(tracker->label(), " | ")[1] == related_label) {
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index 41d2fe4d1c..258f244aae 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -44,8 +44,9 @@ public:
};
// Creates and adds the tracker to the mem_tracker_pool.
- NewMemTracker(const std::string& label = std::string(), RuntimeProfile*
profile = nullptr,
- bool is_limiter = false);
+ NewMemTracker(const std::string& label, RuntimeProfile* profile = nullptr);
+ // For MemTrackerLimiter
+ NewMemTracker() { _bind_group_num = -1; }
~NewMemTracker();
@@ -93,9 +94,6 @@ protected:
// Tracker is located in group num in mem_tracker_pool
int64_t _bind_group_num;
- // Whether is a MemTrackerLimiter
- bool _is_limiter;
-
// Iterator into mem_tracker_pool for this object. Stored to have O(1)
remove.
std::list<NewMemTracker*>::iterator _tracker_group_it;
};
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index dc7789bfe1..3f9a1786d4 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -32,9 +32,14 @@ namespace doris {
MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string&
label,
const std::shared_ptr<MemTrackerLimiter>&
parent,
- RuntimeProfile* profile)
- : NewMemTracker(label, profile, true) {
+ RuntimeProfile* profile) {
DCHECK_GE(byte_limit, -1);
+ if (profile == nullptr) {
+ _consumption =
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
+ } else {
+ _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME,
TUnit::BYTES);
+ }
+ _label = label;
_limit = byte_limit;
_group_num = GetCurrentTimeMicros() % 1000;
_parent = parent ? parent :
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
@@ -143,9 +148,9 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
if (UNLIKELY(gc_memory(_limit - bytes))) {
- return Status::MemoryLimitExceeded(
- fmt::format("label={}, limit={}, used={}, failed consume
size={}", label(), _limit,
- _consumption->current_value(), bytes));
+ return Status::MemoryLimitExceeded(fmt::format(
+ "need_size={}, exceeded_tracker={}, limit={}, peak_used={},
current_used={}", bytes,
+ label(), _limit, _consumption->value(),
_consumption->current_value()));
}
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << _consumption->current_value() << "
limit=" << _limit;
@@ -218,20 +223,19 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth,
return join(usage_strings, "\n");
}
-Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size) {
- STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) {
+ DCHECK(_limit != -1);
std::string detail = fmt::format(
- "{}, failed mem consume:<consume_size={}, mem_limit={},
mem_used={}, tracker_label={}, "
- "in backend={} free memory left={}. details mem usage see
be.INFO.",
- msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES),
_limit,
- _consumption->current_value(), _label,
BackendOptions::get_localhost(),
-
PrettyPrinter::print(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity(),
- TUnit::BYTES));
+ "{}, backend={} memory used={}, free memory left={}. If is query,
can change the limit "
+ "by `set exec_mem_limit=xxx`, details mem usage see be.INFO.",
+ msg, BackendOptions::get_localhost(),
+ PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES),
+ PrettyPrinter::print(MemInfo::mem_limit() -
PerfCounters::get_vm_rss(), TUnit::BYTES));
Status status = Status::MemoryLimitExceeded(detail);
// only print the tracker log_usage in be log.
if (_print_log_usage) {
- if
(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() <
failed_consume_size) {
+ if (_label == "Process") {
// Dumping the process MemTracker is expensive. Limiting the
recursive depth to two
// levels limits the level of detail to a one-line summary for
each query MemTracker.
detail += "\n" +
ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
@@ -245,6 +249,44 @@ Status MemTrackerLimiter::mem_limit_exceeded(const
std::string& msg, int64_t fai
return status;
}
+Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size) {
+ STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+ DCHECK(!_limited_ancestors.empty());
+ for (const auto& tracker : _limited_ancestors) {
+ if (tracker->has_limit() &&
+ tracker->limit() < tracker->peak_consumption() +
failed_consume_size) {
+ std::string detail;
+ if (failed_consume_size != 0) {
+ detail = fmt::format(
+ "memory limit exceeded:<consumed_tracker={},
need_size={}, "
+ "exceeded_tracker={}, limit={}, peak_used={},
current_used={}>, "
+ "executing:<{}>",
+ _label, PrettyPrinter::print(failed_consume_size,
TUnit::BYTES),
+ tracker->label(), tracker->limit(),
tracker->peak_consumption(),
+ tracker->consumption(), msg);
+ } else {
+ detail = fmt::format(
+ "memory limit exceeded:<exceeded_tracker={}, limit={},
peak_used={}, "
+ "current_used={}>, executing:<{}>",
+ tracker->label(), tracker->limit(),
tracker->peak_consumption(),
+ tracker->consumption(), msg);
+ }
+ return tracker->mem_limit_exceeded_log(detail);
+ }
+ }
+ return Status::MemoryLimitExceeded("no mem tracker exceed limit");
+}
+
+Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
+ MemTrackerLimiter* failed_tracker,
+ Status failed_try_consume_st) {
+ STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+ std::string detail =
+ fmt::format("memory limit exceeded:<consumed_tracker={}, {}>,
executing:<{}>", _label,
+ failed_try_consume_st.get_error_msg(), msg);
+ return failed_tracker->mem_limit_exceeded_log(detail);
+}
+
Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const
std::string& msg,
int64_t failed_alloc_size) {
Status rt = mem_limit_exceeded(msg, failed_alloc_size);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 1dcd655ea4..767e4fdafb 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -122,13 +122,15 @@ public:
Status try_gc_memory(int64_t bytes);
public:
+ // up to (but not including) end_tracker.
+ // This happens when we want to update tracking on a particular mem
tracker but the consumption
+ // against the limit recorded in one of its ancestors already happened.
// It is used for revise mem tracker consumption.
// If the location of memory alloc and free is different, the consumption
value of mem tracker will be inaccurate.
// But the consumption value of the process mem tracker is not affecte
- void consumption_revise(int64_t bytes) {
- DCHECK(_label != "Process");
- _consumption->add(bytes);
- }
+ void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker);
+
+ void enable_print_log_usage() { _print_log_usage = true; }
// Logs the usage of this tracker limiter and optionally its children
(recursively).
// If 'logged_consumption' is non-nullptr, sets the consumption value
logged.
@@ -143,9 +145,11 @@ public:
// If 'failed_allocation_size' is greater than zero, logs the allocation
size. If
// 'failed_allocation_size' is zero, nothing about the allocation size is
logged.
// If 'state' is non-nullptr, logs the error to 'state'.
- Status mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size);
- Status mem_limit_exceeded(RuntimeState* state, const std::string& msg =
std::string(),
- int64_t failed_consume_size = -1);
+ Status mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size = 0);
+ Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter*
failed_tracker,
+ Status failed_try_consume_st);
+ Status mem_limit_exceeded(RuntimeState* state, const std::string& msg,
+ int64_t failed_consume_size = 0);
std::string debug_string() {
std::stringstream msg;
@@ -186,6 +190,8 @@ private:
const std::list<MemTrackerLimiter*>& trackers,
int64_t* logged_consumption);
+ Status mem_limit_exceeded_log(const std::string& msg);
+
private:
// Limit on memory consumption, in bytes. If limit_ == -1, there is no
consumption limit. Used in log_usage。
int64_t _limit;
@@ -253,6 +259,15 @@ inline void MemTrackerLimiter::consume_cache(int64_t
bytes) {
}
}
+inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter*
end_tracker) {
+ DCHECK(end_tracker);
+ if (bytes == 0) return;
+ for (auto& tracker : _all_ancestors) {
+ if (tracker->label() == end_tracker->label()) return;
+ tracker->_consumption->add(bytes);
+ }
+}
+
inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
if (bytes <= 0) {
release(-bytes);
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 064fba6930..c080261056 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -93,7 +93,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
// In order to ensure that the query pool mem tracker is the sum
of all currently running query mem trackers,
// the effect of the ended query mem tracker on the query pool mem
tracker should be cleared, that is,
// the negative number of the current value of consume.
-
it->second->parent()->consumption_revise(-it->second->consumption());
+ it->second->parent()->consume_local(
+ -it->second->consumption(),
+ ExecEnv::GetInstance()->new_process_mem_tracker().get());
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={},
Limit={}, PeakUsed={}",
it->first, it->second->limit(),
it->second->peak_consumption());
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index aea89acb3b..27e66d3fb8 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -49,21 +49,15 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const
std::string& cancel_details
}
}
-void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) {
+void ThreadMemTrackerMgr::exceeded(Status failed_try_consume_st) {
if (_cb_func != nullptr) {
_cb_func();
}
if (is_attach_query()) {
- std::string cancel_msg;
- if (!_consumer_tracker_stack.empty()) {
- cancel_msg = fmt::format(
- "exec node:<name={}>, can change the limit by `set
exec_mem_limit=xxx`",
- _consumer_tracker_stack[-1]->label());
- } else {
- cancel_msg = "exec node:unknown, can change the limit by `set
exec_mem_limit=xxx`";
- }
- auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg,
failed_consume_size);
- exceeded_cancel_task(st.to_string());
+ auto st = _limiter_tracker->mem_limit_exceeded(fmt::format("exec
node:<{}>", ""),
+
_limiter_tracker->parent().get(),
+ failed_try_consume_st);
+ exceeded_cancel_task(st.get_error_msg());
_check_limit = false; // Make sure it will only be canceled once
}
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 0ef8eaf646..449a1e070c 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -112,7 +112,7 @@ private:
// If tryConsume fails due to task mem tracker exceeding the limit, the
task must be canceled
void exceeded_cancel_task(const std::string& cancel_details);
- void exceeded(int64_t failed_consume_size);
+ void exceeded(Status failed_try_consume_st);
private:
// Cache untracked mem, only update to _untracked_mems when switching mem
tracker.
@@ -191,7 +191,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// The memory has been allocated, so when TryConsume fails, need
to continue to complete
// the consume to ensure the accuracy of the statistics.
_limiter_tracker->consume(_untracked_mem);
- exceeded(_untracked_mem);
+ exceeded(st);
}
} else {
_limiter_tracker->consume(_untracked_mem);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 2107c74d65..791cdd213e 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -246,8 +246,8 @@ Status RuntimeState::init_mem_trackers(const TUniqueId&
query_id) {
}
_new_instance_mem_tracker = std::make_shared<MemTrackerLimiter>(
- bytes_limit, "RuntimeState:instance:" +
print_id(_fragment_instance_id),
- _new_query_mem_tracker);
+ -1, "RuntimeState:instance:" + print_id(_fragment_instance_id),
+ _new_query_mem_tracker, &_profile);
/*
// TODO: this is a stopgap until we implement ExprContext
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index e45fe0da78..2f752516c1 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -260,8 +260,9 @@ public:
doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size,
tracker)
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size,
tracker)
-#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
- return doris::thread_context() \
- ->_thread_mem_tracker_mgr->limiter_mem_tracker() \
- ->mem_limit_exceeded(state, msg, ##__VA_ARGS__);
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...)
\
+ return doris::thread_context()
\
+ ->_thread_mem_tracker_mgr->limiter_mem_tracker()
\
+ ->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "",
msg), \
+ ##__VA_ARGS__);
} // namespace doris
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 571911b45d..d80b1bc504 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -462,6 +462,7 @@ int main(int argc, char** argv) {
// 1s clear the expired task mem tracker, a query mem tracker is about
57 bytes.
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
+
doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage();
sleep(1);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]