This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 5d28575ee5 [enhancement](memtracker) Improve performance of tracking
real physical memory of PODArray #12168 (#12260)
5d28575ee5 is described below
commit 5d28575ee5f563f0f877a9fe722b7ca84ecd7020
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Sep 1 18:47:10 2022 +0800
[enhancement](memtracker) Improve performance of tracking real physical
memory of PODArray #12168 (#12260)
---
be/src/runtime/exec_env.h | 3 +++
be/src/runtime/exec_env_init.cpp | 1 +
be/src/runtime/mem_pool.cpp | 6 +++---
be/src/runtime/mem_pool.h | 4 ++--
be/src/runtime/memory/mem_tracker.cpp | 15 ++++++---------
be/src/runtime/memory/mem_tracker_limiter.cpp | 2 +-
be/src/runtime/memory/mem_tracker_limiter.h | 2 +-
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 4 +++-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 11 +++++++----
be/src/runtime/runtime_state.cpp | 9 +++++----
be/src/runtime/thread_context.h | 12 ++++++------
be/src/service/doris_main.cpp | 2 +-
be/src/vec/common/pod_array.h | 10 +++++-----
13 files changed, 44 insertions(+), 37 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f9338b448a..e73eaa1c1f 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -121,6 +121,8 @@ public:
PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; }
std::shared_ptr<MemTrackerLimiter> new_process_mem_tracker() { return
_process_mem_tracker; }
+ MemTrackerLimiter* process_mem_tracker_raw() { return
_process_mem_tracker_raw; }
+
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return
_query_pool_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return
_load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return
_task_pool_mem_tracker_registry; }
@@ -197,6 +199,7 @@ private:
// The ancestor for all trackers. Every tracker is visible from the
process down.
// Not limit total memory by process tracker, and it's just used to track
virtual memory of process.
std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
+ MemTrackerLimiter* _process_mem_tracker_raw;
// The ancestor for all querys tracker.
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
// The ancestor for all load tracker.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 00c453e91c..54640a6e70 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -205,6 +205,7 @@ Status ExecEnv::_init_mem_tracker() {
_process_mem_tracker =
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes,
"Process");
+ _process_mem_tracker_raw = _process_mem_tracker.get();
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) &&
!defined(ADDRESS_SANITIZER) && \
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index f108f2266b..96a0daad4d 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -60,7 +60,7 @@ MemPool::~MemPool() {
}
mem_tracker_->Release(total_bytes_released);
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released -
peak_allocated_bytes_,
-
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
ExecEnv::GetInstance()->process_mem_tracker_raw());
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
}
@@ -81,7 +81,7 @@ void MemPool::free_all() {
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released -
peak_allocated_bytes_,
-
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
ExecEnv::GetInstance()->process_mem_tracker_raw());
chunks_.clear();
next_chunk_size_ = INITIAL_CHUNK_SIZE;
current_chunk_idx_ = -1;
@@ -146,7 +146,7 @@ bool MemPool::find_chunk(size_t min_size, bool
check_limits) {
mem_tracker_->Release(chunk_size);
return false;
}
- THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size,
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+ THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
// Put it before the first free chunk. If no free chunks, it goes at the
end.
if (first_free_idx == static_cast<int>(chunks_.size())) {
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 128916007b..4c93e8568f 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -208,9 +208,9 @@ private:
bool check_integrity(bool check_current_chunk_empty);
void reset_peak() {
- if (total_allocated_bytes_ - peak_allocated_bytes_ > 4096) {
+ if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) {
THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ -
peak_allocated_bytes_,
-
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
ExecEnv::GetInstance()->process_mem_tracker_raw());
peak_allocated_bytes_ = total_allocated_bytes_;
}
}
diff --git a/be/src/runtime/memory/mem_tracker.cpp
b/be/src/runtime/memory/mem_tracker.cpp
index c4801d6611..8c0ae6ebba 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -58,15 +58,12 @@ NewMemTracker::NewMemTracker(const std::string& label,
RuntimeProfile* profile)
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME,
TUnit::BYTES);
}
- if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
- _label = fmt::format(
- "{} | {}", label,
-
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
- } else {
- _label = label + " | ";
- }
-
- _bind_group_num =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
+ DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() !=
nullptr);
+ _label = fmt::format(
+ "{} | {}", label,
+
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label());
+ _bind_group_num =
+
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num();
{
std::lock_guard<std::mutex>
l(mem_tracker_pool[_bind_group_num].group_lock);
_tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 1eb6166b9f..dccfb9db45 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -301,7 +301,7 @@ Status MemTrackerLimiter::mem_limit_exceeded(const
std::string& msg,
// The limit of the current tracker and parents is less than 0, the
consume will not fail,
// and the current process memory has no excess limit.
detail += fmt::format("unknown exceed reason, executing_msg:<{}>",
msg);
- print_log_usage_tracker =
ExecEnv::GetInstance()->new_process_mem_tracker().get();
+ print_log_usage_tracker =
ExecEnv::GetInstance()->process_mem_tracker_raw();
}
auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
if (print_log_usage_tracker != nullptr)
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 275e52375e..0543a992b1 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -73,7 +73,7 @@ public:
auto st = Status::MemoryLimitExceeded(
fmt::format("process memory used {} exceed limit {},
failed_alloc_size={}",
PerfCounters::get_vm_rss(), MemInfo::mem_limit(), bytes));
-
ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(st.get_error_msg());
+
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg());
return st;
}
return Status::OK();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 27e66d3fb8..30f7e7f10b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -32,6 +32,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
_limiter_tracker = mem_tracker;
+ _limiter_tracker_raw = mem_tracker.get();
}
void ThreadMemTrackerMgr::detach_limiter_tracker() {
@@ -39,6 +40,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() {
_task_id = "";
_fragment_instance_id = TUniqueId();
_limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
+ _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
}
void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string&
cancel_details) {
@@ -54,7 +56,7 @@ void ThreadMemTrackerMgr::exceeded(Status
failed_try_consume_st) {
_cb_func();
}
if (is_attach_query()) {
- auto st = _limiter_tracker->mem_limit_exceeded(fmt::format("exec
node:<{}>", ""),
+ auto st = _limiter_tracker_raw->mem_limit_exceeded(fmt::format("exec
node:<{}>", ""),
_limiter_tracker->parent().get(),
failed_try_consume_st);
exceeded_cancel_task(st.get_error_msg());
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 0cd371598a..5135d6222b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -50,7 +50,7 @@ public:
// only for tcmalloc hook
static void consume_no_attach(int64_t size) {
- ExecEnv::GetInstance()->new_process_mem_tracker()->consume(size);
+ ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size);
}
// After thread initialization, calling `init` again must call
`clear_untracked_mems` first
@@ -81,6 +81,7 @@ public:
bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return
_limiter_tracker; }
+ MemTrackerLimiter* limiter_mem_tracker_raw() { return
_limiter_tracker_raw; }
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -111,6 +112,7 @@ private:
std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
std::vector<NewMemTracker*> _consumer_tracker_stack;
+ MemTrackerLimiter* _limiter_tracker_raw;
// If true, call memtracker try_consume, otherwise call consume.
bool _check_limit = false;
@@ -126,6 +128,7 @@ inline void ThreadMemTrackerMgr::init() {
DCHECK(_consumer_tracker_stack.empty());
_task_id = "";
_limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
+ _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
_check_limit = true;
}
@@ -176,15 +179,15 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
// _limiter_tracker->label() != "Process");
#endif
- Status st = _limiter_tracker->try_consume(_untracked_mem);
+ Status st = _limiter_tracker_raw->try_consume(_untracked_mem);
if (!st) {
// The memory has been allocated, so when TryConsume fails, need
to continue to complete
// the consume to ensure the accuracy of the statistics.
- _limiter_tracker->consume(_untracked_mem);
+ _limiter_tracker_raw->consume(_untracked_mem);
exceeded(st);
}
} else {
- _limiter_tracker->consume(_untracked_mem);
+ _limiter_tracker_raw->consume(_untracked_mem);
}
for (auto tracker : _consumer_tracker_stack) {
tracker->consume(_untracked_mem);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index e98467f95b..556281e993 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -193,13 +193,14 @@ Status RuntimeState::init(const TUniqueId&
fragment_instance_id, const TQueryOpt
Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
bool has_query_mem_tracker = _query_options.__isset.mem_limit &&
(_query_options.mem_limit > 0);
int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit :
-1;
- if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) {
+ if (bytes_limit >
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) {
VLOG_NOTICE << "Query memory limit " <<
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
- <<
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(),
- TUnit::BYTES)
+ << PrettyPrinter::print(
+
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(),
+ TUnit::BYTES)
<< ". Using process memory limit instead";
- bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit();
+ bytes_limit =
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit();
}
// we do not use global query-map for now, to avoid mem-exceeded
different fragments
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 5f524996ae..c5a4566c73 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -136,7 +136,7 @@ public:
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) &&
_task_id == "")
<< ",new tracker label: " << mem_tracker->label() << ",old
tracker label: "
- << _thread_mem_tracker_mgr->limiter_mem_tracker()->label();
+ << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label();
DCHECK(type != TaskType::UNKNOWN);
_type = type;
_task_id = task_id;
@@ -256,15 +256,15 @@ public:
doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
#define RELEASE_THREAD_MEM_TRACKER(size) \
doris::thread_context()->_thread_mem_tracker_mgr->consume(-size)
-#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)
\
-
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to(size,
\
-
tracker)
+#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)
\
+
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to(
\
+ size, tracker)
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
tracker->transfer_to( \
- size,
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker().get())
+ size,
doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw())
#define RETURN_LIMIT_EXCEEDED(state, msg, ...)
\
return doris::thread_context()
\
- ->_thread_mem_tracker_mgr->limiter_mem_tracker()
\
+ ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()
\
->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "",
msg), \
##__VA_ARGS__);
} // namespace doris
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index d80b1bc504..37d08a2a38 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -462,7 +462,7 @@ int main(int argc, char** argv) {
// 1s clear the expired task mem tracker, a query mem tracker is about
57 bytes.
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
-
doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage();
+
doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage();
sleep(1);
}
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 5d2961ccea..5deb16d106 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -113,9 +113,9 @@ protected:
}
inline void reset_peak() {
- if (UNLIKELY(c_end - c_end_peak > 4096)) {
+ if (UNLIKELY(c_end - c_end_peak > 65536)) {
THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak,
-
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
ExecEnv::GetInstance()->process_mem_tracker_raw());
c_end_peak = c_end;
}
}
@@ -127,7 +127,7 @@ protected:
template <typename... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams&&... allocator_params) {
THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left,
-
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
ExecEnv::GetInstance()->process_mem_tracker_raw());
c_start = c_end = c_end_peak =
reinterpret_cast<char*>(TAllocator::alloc(
bytes,
std::forward<TAllocatorParams>(allocator_params)...)) +
@@ -144,7 +144,7 @@ protected:
TAllocator::free(c_start - pad_left, allocated_bytes());
THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak,
-
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
ExecEnv::GetInstance()->process_mem_tracker_raw());
}
template <typename... TAllocatorParams>
@@ -157,7 +157,7 @@ protected:
unprotect();
THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(),
-
ExecEnv::GetInstance()->new_process_mem_tracker().get());
+
ExecEnv::GetInstance()->process_mem_tracker_raw());
ptrdiff_t end_diff = c_end - c_start;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]