This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new ff3b091964 [fix](memtracker) Improve performance of tracking real
physical memory of PodArray #12021 (#12055)
ff3b091964 is described below
commit ff3b0919646fa2cc80441917ab6f34ad9d3c7a6e
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Aug 25 10:13:46 2022 +0800
[fix](memtracker) Improve performance of tracking real physical memory of
PodArray #12021 (#12055)
---
be/src/runtime/mem_pool.h | 2 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 9 ++++----
be/src/runtime/memory/mem_tracker_limiter.h | 30 ++++++++++++-------------
be/src/runtime/memory/mem_tracker_task_pool.cpp | 4 +---
be/src/runtime/memory/thread_mem_tracker_mgr.h | 10 ---------
be/src/runtime/thread_context.h | 8 ++++---
be/src/vec/common/pod_array.h | 2 +-
7 files changed, 28 insertions(+), 37 deletions(-)
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 1f8f5aaf67..128916007b 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -208,7 +208,7 @@ private:
bool check_integrity(bool check_current_chunk_empty);
void reset_peak() {
- if (total_allocated_bytes_ - peak_allocated_bytes_ > 1024) {
+ if (total_allocated_bytes_ - peak_allocated_bytes_ > 4096) {
THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ -
peak_allocated_bytes_,
ExecEnv::GetInstance()->new_process_mem_tracker().get());
peak_allocated_bytes_ = total_allocated_bytes_;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index b64d2899cb..64f19873c2 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -249,20 +249,21 @@ Status MemTrackerLimiter::mem_limit_exceeded_log(const
std::string& msg) {
return status;
}
-Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size) {
+Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
+ int64_t failed_allocation_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
DCHECK(!_limited_ancestors.empty());
std::string detail = fmt::format("memory limit
exceeded:<consumed_tracker={}, ", _label);
- if (failed_consume_size != 0)
+ if (failed_allocation_size != 0)
detail += fmt::format("need_size={}, ",
- PrettyPrinter::print(failed_consume_size,
TUnit::BYTES));
+ PrettyPrinter::print(failed_allocation_size,
TUnit::BYTES));
MemTrackerLimiter* exceeded_tracker = this;
int64_t free_size = INT_MAX;
for (const auto& tracker : _limited_ancestors) {
int64_t max_consumption = tracker->peak_consumption() >
tracker->consumption()
? tracker->peak_consumption()
: tracker->consumption();
- if (tracker->has_limit() && tracker->limit() < max_consumption +
failed_consume_size) {
+ if (tracker->has_limit() && tracker->limit() < max_consumption +
failed_allocation_size) {
exceeded_tracker = tracker;
break;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 767e4fdafb..885acffc11 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -128,7 +128,13 @@ public:
// It is used for revise mem tracker consumption.
// If the location of memory alloc and free is different, the consumption
value of mem tracker will be inaccurate.
// But the consumption value of the process mem tracker is not affecte
- void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker);
+ void cache_consume_local(int64_t bytes);
+
+ // Will not change the value of process_mem_tracker, even though
mem_tracker == process_mem_tracker.
+ void transfer_to(int64_t size, MemTrackerLimiter* dst) {
+ cache_consume_local(-size);
+ dst->cache_consume_local(size);
+ }
void enable_print_log_usage() { _print_log_usage = true; }
@@ -145,11 +151,11 @@ public:
// If 'failed_allocation_size' is greater than zero, logs the allocation
size. If
// 'failed_allocation_size' is zero, nothing about the allocation size is
logged.
// If 'state' is non-nullptr, logs the error to 'state'.
- Status mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size = 0);
+ Status mem_limit_exceeded(const std::string& msg, int64_t
failed_allocation_size = 0);
Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter*
failed_tracker,
Status failed_try_consume_st);
Status mem_limit_exceeded(RuntimeState* state, const std::string& msg,
- int64_t failed_consume_size = 0);
+ int64_t failed_allocation_size = 0);
std::string debug_string() {
std::stringstream msg;
@@ -181,7 +187,6 @@ private:
// the current value is returned and set to 0.
// Thread safety.
int64_t add_untracked_mem(int64_t bytes);
- void consume_cache(int64_t bytes);
// Log consumption of all the trackers provided. Returns the sum of
consumption in
// 'logged_consumption'. 'max_recursive_depth' specifies the maximum
number of levels
@@ -252,19 +257,14 @@ inline int64_t
MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
return 0;
}
-inline void MemTrackerLimiter::consume_cache(int64_t bytes) {
+inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) {
+ if (bytes == 0) return;
int64_t consume_bytes = add_untracked_mem(bytes);
if (consume_bytes != 0) {
- consume(consume_bytes);
- }
-}
-
-inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter*
end_tracker) {
- DCHECK(end_tracker);
- if (bytes == 0) return;
- for (auto& tracker : _all_ancestors) {
- if (tracker->label() == end_tracker->label()) return;
- tracker->_consumption->add(bytes);
+ for (auto& tracker : _all_ancestors) {
+ if (tracker->label() == "Process") return;
+ tracker->_consumption->add(bytes);
+ }
}
}
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index c080261056..a08d876370 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -93,9 +93,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
// In order to ensure that the query pool mem tracker is the sum
of all currently running query mem trackers,
// the effect of the ended query mem tracker on the query pool mem
tracker should be cleared, that is,
// the negative number of the current value of consume.
- it->second->parent()->consume_local(
- -it->second->consumption(),
- ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
it->second->parent()->cache_consume_local(-it->second->consumption());
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={},
Limit={}, PeakUsed={}",
it->first, it->second->limit(),
it->second->peak_consumption());
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 449a1e070c..0cd371598a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -75,16 +75,6 @@ public:
// must increase the control to avoid entering infinite recursion,
otherwise it may cause crash or stuck,
void consume(int64_t size);
- // Will not change the value of process_mem_tracker, even though
mem_tracker == process_mem_tracker.
- void transfer_to(int64_t size, MemTrackerLimiter* mem_tracker) {
- consume(-size);
- mem_tracker->consume_cache(size);
- }
- void transfer_from(int64_t size, MemTrackerLimiter* mem_tracker) {
- mem_tracker->consume_cache(-size);
- consume(size);
- }
-
template <bool CheckLimit>
void flush_untracked_mem();
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 2f752516c1..5f524996ae 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -256,10 +256,12 @@ public:
doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
#define RELEASE_THREAD_MEM_TRACKER(size) \
doris::thread_context()->_thread_mem_tracker_mgr->consume(-size)
-#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \
- doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size,
tracker)
+#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)
\
+
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to(size,
\
+
tracker)
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
- doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size,
tracker)
+ tracker->transfer_to( \
+ size,
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker().get())
#define RETURN_LIMIT_EXCEEDED(state, msg, ...)
\
return doris::thread_context()
\
->_thread_mem_tracker_mgr->limiter_mem_tracker()
\
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 9b80473032..5d2961ccea 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -113,7 +113,7 @@ protected:
}
inline void reset_peak() {
- if (UNLIKELY(c_end - c_end_peak > 1024)) {
+ if (UNLIKELY(c_end - c_end_peak > 4096)) {
THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak,
ExecEnv::GetInstance()->new_process_mem_tracker().get());
c_end_peak = c_end;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]