This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4b34f1f6c14 [fix](memory) Fix nested scoped tracker and nested reserve
memory (#35257)
4b34f1f6c14 is described below
commit 4b34f1f6c145d29998cefa3962835f83ee70d79b
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon May 27 18:11:37 2024 +0800
[fix](memory) Fix nested scoped tracker and nested reserve memory (#35257)
SCOPED_ATTACH_TASK cannot be nested, but
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be called, so
attach_limiter_tracker may be nested.
---
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 14 ++++++--
be/src/runtime/memory/thread_mem_tracker_mgr.h | 41 ++++++++++++++++--------
be/src/util/mem_info.cpp | 4 +--
3 files changed, 41 insertions(+), 18 deletions(-)
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 4f879852308..daa49548819 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -44,9 +44,16 @@ private:
void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
- DCHECK(_reserved_mem == 0);
CHECK(init());
flush_untracked_mem();
+ _reserved_mem_stack.push_back(_reserved_mem);
+ if (_reserved_mem != 0) {
+ // _untracked_mem temporary store bytes that not synchronized to
process reserved memory,
+ // but bytes have been subtracted from thread _reserved_mem.
+
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
+ _reserved_mem = 0;
+ _untracked_mem = 0;
+ }
_limiter_tracker = mem_tracker;
_limiter_tracker_raw = mem_tracker.get();
}
@@ -54,8 +61,11 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
void ThreadMemTrackerMgr::detach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) {
CHECK(init());
- release_reserved();
flush_untracked_mem();
+ release_reserved();
+ DCHECK(!_reserved_mem_stack.empty());
+ _reserved_mem = _reserved_mem_stack.back();
+ _reserved_mem_stack.pop_back();
_limiter_tracker = old_mem_tracker;
_limiter_tracker_raw = old_mem_tracker.get();
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 6081b013346..64c2190a149 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -48,6 +48,7 @@ public:
~ThreadMemTrackerMgr() {
// if _init == false, exec env is not initialized when init(). and
never consumed mem tracker once.
if (_init) {
+ DCHECK(_reserved_mem == 0);
flush_untracked_mem();
}
}
@@ -132,6 +133,9 @@ private:
int64_t _old_untracked_mem = 0;
int64_t _reserved_mem = 0;
+ // SCOPED_ATTACH_TASK cannot be nested, but
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used,
+ // so `attach_limiter_tracker` may be nested.
+ std::vector<int64_t> _reserved_mem_stack;
bool _count_scope_mem = false;
int64_t _scope_mem = 0;
@@ -178,6 +182,7 @@ inline bool
ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) {
inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
DCHECK(!_consumer_tracker_stack.empty());
+ flush_untracked_mem();
_consumer_tracker_stack.back()->consume(_untracked_mem);
_consumer_tracker_stack.back()->release(_reserved_mem);
_consumer_tracker_stack.pop_back();
@@ -191,9 +196,13 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int
skip_large_memory_che
// subtract size from process global reserved memory,
// because this part of the reserved memory has already been used
by BE process.
_reserved_mem -= size;
- // store bytes that not synchronized to process reserved memory.
+ // temporary store bytes that not synchronized to process reserved
memory.
_untracked_mem += size;
- if (_untracked_mem >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
+ // If _untracked_mem > 0, reserved memory that has been used, if
_untracked_mem greater than
+ // SYNC_PROC_RESERVED_INTERVAL_BYTES, release process reserved
memory.
+ // If _untracked_mem < 0, used reserved memory is returned, will
increase reserved memory,
+ // if _untracked_mem less than -SYNC_PROC_RESERVED_INTERVAL_BYTES,
increase process reserved memory.
+ if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES)
{
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
_untracked_mem = 0;
}
@@ -209,7 +218,9 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int
skip_large_memory_che
_untracked_mem = 0;
}
}
+ // store bytes that not consumed by thread mem tracker.
_untracked_mem += size;
+ DCHECK(_reserved_mem == 0);
if (!_init && !ExecEnv::ready()) {
return;
}
@@ -217,9 +228,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int
skip_large_memory_che
// and some threads `_untracked_mem <=
-config::mem_tracker_consume_min_size_bytes` trigger consumption(),
// it will cause tracker->consumption to be temporarily less than 0.
// After the jemalloc hook is loaded, before ExecEnv init,
_limiter_tracker=nullptr.
- if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
- _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
- !_stop_consume) {
+ if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes
&& !_stop_consume) {
flush_untracked_mem();
}
@@ -238,6 +247,12 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int
skip_large_memory_che
}
inline void ThreadMemTrackerMgr::flush_untracked_mem() {
+ // if during reserve memory, _untracked_mem temporary store bytes that not
synchronized
+ // to process reserved memory, but bytes have been subtracted from thread
_reserved_mem.
+ // so not need flush untracked_mem to consume mem tracker.
+ if (_reserved_mem != 0) {
+ return;
+ }
// Temporary memory may be allocated during the consumption of the mem
tracker, which will lead to entering
// the Memory Hook again, so suspend consumption to avoid falling into an
infinite loop.
if (_untracked_mem == 0 || !init()) {
@@ -264,9 +279,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
CHECK(init());
// if _reserved_mem not equal to 0, repeat reserve,
// _untracked_mem store bytes that not synchronized to process reserved
memory.
- if (_reserved_mem == 0) {
- flush_untracked_mem();
- }
+ flush_untracked_mem();
if (!_limiter_tracker_raw->try_consume(size)) {
return false;
}
@@ -281,21 +294,21 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t
size) {
tracker->consume(size);
}
_reserved_mem += size;
- DCHECK(_untracked_mem == 0);
return true;
}
inline void ThreadMemTrackerMgr::release_reserved() {
- flush_untracked_mem();
- if (_reserved_mem > 0) {
-
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem);
- _limiter_tracker_raw->consume(-_reserved_mem);
+ if (_reserved_mem != 0) {
+
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
+
_untracked_mem);
+ _limiter_tracker_raw->release(_reserved_mem);
if (_count_scope_mem) {
_scope_mem -= _reserved_mem;
}
for (auto* tracker : _consumer_tracker_stack) {
- tracker->consume(-_reserved_mem);
+ tracker->release(_reserved_mem);
}
+ _untracked_mem = 0;
_reserved_mem = 0;
}
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index d8a2ffbbf6c..3196115ef06 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -75,8 +75,8 @@ int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0;
static std::unordered_map<std::string, int64_t> _mem_info_bytes;
std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1;
-int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
-int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1;
+int64_t MemInfo::_s_sys_mem_available_low_water_mark =
std::numeric_limits<int64_t>::min();
+int64_t MemInfo::_s_sys_mem_available_warning_water_mark =
std::numeric_limits<int64_t>::min();
std::atomic<int64_t> MemInfo::_s_process_minor_gc_size = -1;
std::atomic<int64_t> MemInfo::_s_process_full_gc_size = -1;
std::mutex MemInfo::je_purge_dirty_pages_lock;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]