This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f42e7fc2dee [bugfix](memory) Reserve memory print failure reason
(#39862)
f42e7fc2dee is described below
commit f42e7fc2dee73f66af49e90070f0f4056d513417
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Aug 27 18:35:53 2024 +0800
[bugfix](memory) Reserve memory print failure reason (#39862)
1. reserve memory print failure reason
2. fix bugs
---
be/src/runtime/memory/thread_mem_tracker_mgr.h | 29 +++++++++++++------
be/src/runtime/thread_context.h | 2 +-
be/src/runtime/workload_group/workload_group.cpp | 15 ++++++++++
be/src/runtime/workload_group/workload_group.h | 4 ++-
.../runtime/memory/thread_mem_tracker_mgr_test.cpp | 33 ++++++++++++----------
5 files changed, 57 insertions(+), 26 deletions(-)
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 73cdd3243da..50a8a5e0f7e 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -95,7 +95,8 @@ public:
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();
- bool try_reserve(int64_t size);
+ doris::Status try_reserve(int64_t size);
+
void release_reserved();
bool is_attach_query() { return _query_id != TUniqueId(); }
@@ -295,7 +296,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
_stop_consume = false;
}
-inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
+inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
DCHECK(_limiter_tracker_raw);
DCHECK(size >= 0);
CHECK(init());
@@ -303,19 +304,29 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t
size) {
// _untracked_mem store bytes that not synchronized to process reserved
memory.
flush_untracked_mem();
if (!_limiter_tracker_raw->try_consume(size)) {
- return false;
+ auto err_msg = fmt::format(
+ "reserve memory failed, size: {}, because memory tracker
consumption: {}, limit: "
+ "{}",
+ size, _limiter_tracker_raw->consumption(),
_limiter_tracker_raw->limit());
+ return doris::Status::MemoryLimitExceeded(err_msg);
}
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
+ auto err_msg = fmt::format("reserve memory failed, size: {},
because {}", size,
+ wg_ptr->memory_debug_string());
_limiter_tracker_raw->release(size); // rollback
- return false;
+ return doris::Status::MemoryLimitExceeded(err_msg);
}
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
- _limiter_tracker_raw->release(size); // rollback
- wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
- return false;
+ auto err_msg = fmt::format("reserve memory failed, size: {}, because
{}", size,
+
GlobalMemoryArbitrator::process_mem_log_str());
+ _limiter_tracker_raw->release(size); // rollback
+ if (wg_ptr) {
+ wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
+ }
+ return doris::Status::MemoryLimitExceeded(err_msg);
}
if (_count_scope_mem) {
_scope_mem += size;
@@ -324,7 +335,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
tracker->consume(size);
}
_reserved_mem += size;
- return true;
+ return doris::Status::OK();
}
inline void ThreadMemTrackerMgr::release_reserved() {
@@ -333,7 +344,7 @@ inline void ThreadMemTrackerMgr::release_reserved() {
_untracked_mem);
_limiter_tracker_raw->release(_reserved_mem);
auto wg_ptr = _wg_wptr.lock();
- if (!wg_ptr) {
+ if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem);
}
if (_count_scope_mem) {
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c54b1a6892b..ff8f2c6b0b5 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -244,7 +244,7 @@ public:
thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
}
- bool try_reserve_memory(const int64_t size) const {
+ doris::Status try_reserve_memory(const int64_t size) const {
#ifdef USE_MEM_TRACKER
DCHECK(doris::k_doris_exit ||
!doris::config::enable_memory_orphan_check ||
thread_mem_tracker()->label() != "Orphan")
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index e37f83a00e8..e263685d07f 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -96,6 +96,21 @@ std::string WorkloadGroup::debug_string() const {
_scan_bytes_per_second, _remote_scan_bytes_per_second);
}
+std::string WorkloadGroup::memory_debug_string() const {
+ return fmt::format(
+ "TG[id = {}, name = {}, memory_limit = {},
enable_memory_overcommit = "
+ "{}, weighted_memory_limit = {}, total_mem_used = {}, "
+ "wg_refresh_interval_memory_growth = {}, spill_low_watermark = {},
"
+ "spill_high_watermark = {}, version = {}, is_shutdown = {},
query_num = {}]",
+ _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+ _enable_memory_overcommit ? "true" : "false",
+ PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES),
+ PrettyPrinter::print(_total_mem_used, TUnit::BYTES),
+ PrettyPrinter::print(_wg_refresh_interval_memory_growth,
TUnit::BYTES),
+ _spill_low_watermark, _spill_high_watermark, _version,
_is_shutdown,
+ _query_ctxs.size());
+}
+
void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
if (UNLIKELY(tg_info.id != _id)) {
return;
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 3561098b6ce..2fbb4dd3030 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -100,7 +100,8 @@ public:
void set_weighted_memory_ratio(double ratio);
bool add_wg_refresh_interval_memory_growth(int64_t size) {
- auto realtime_total_mem_used = _total_mem_used +
_wg_refresh_interval_memory_growth.load();
+ auto realtime_total_mem_used =
+ _total_mem_used + _wg_refresh_interval_memory_growth.load() +
size;
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
@@ -125,6 +126,7 @@ public:
}
std::string debug_string() const;
+ std::string memory_debug_string() const;
void check_and_update(const WorkloadGroupInfo& tg_info);
diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
index ab15fce05a7..bab94ace470 100644
--- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
+++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
@@ -264,7 +264,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
thread_context->consume_memory(size2);
EXPECT_EQ(t->consumption(), size1 + size2);
- thread_context->try_reserve_memory(size3);
+ auto st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
@@ -284,14 +285,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1
+ size1);
- std::cout << "11111 " <<
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
- << thread_context->thread_mem_tracker_mgr->reserved_mem() <<
std::endl;
thread_context->consume_memory(size1);
thread_context->consume_memory(size1);
- std::cout << "2222 " <<
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
- << thread_context->thread_mem_tracker_mgr->reserved_mem() <<
std::endl;
- std::cout << "3333 " <<
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
- << thread_context->thread_mem_tracker_mgr->reserved_mem() <<
std::endl;
// reserved memory used done
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
@@ -308,7 +303,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
EXPECT_EQ(t->consumption(), size1 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
- thread_context->try_reserve_memory(size3);
+ st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
@@ -358,7 +354,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
int64_t size3 = size2 * 1024;
thread_context->attach_task(TUniqueId(), t, workload_group);
- thread_context->try_reserve_memory(size3);
+ auto st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
EXPECT_EQ(t->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
@@ -369,15 +366,18 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
EXPECT_EQ(t->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
- thread_context->try_reserve_memory(size2);
+ st = thread_context->try_reserve_memory(size2);
+ EXPECT_TRUE(st.ok());
// ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2
// ThreadMemTrackerMgr _untracked_mem = 0
EXPECT_EQ(t->consumption(), size3 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
size3); // size3 - size2 + size2
- thread_context->try_reserve_memory(size3);
- thread_context->try_reserve_memory(size3);
+ st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
+ st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
thread_context->consume_memory(size3);
thread_context->consume_memory(size2);
thread_context->consume_memory(size3);
@@ -411,13 +411,15 @@ TEST_F(ThreadMemTrackerMgrTest,
NestedSwitchMemTrackerReserveMemory) {
int64_t size3 = size2 * 1024;
thread_context->attach_task(TUniqueId(), t1, workload_group);
- thread_context->try_reserve_memory(size3);
+ auto st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
thread_context->consume_memory(size2);
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
- thread_context->try_reserve_memory(size3);
+ st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2 + size3);
@@ -428,7 +430,8 @@ TEST_F(ThreadMemTrackerMgrTest,
NestedSwitchMemTrackerReserveMemory) {
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
- thread_context->try_reserve_memory(size3);
+ st = thread_context->try_reserve_memory(size3);
+ EXPECT_TRUE(st.ok());
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3 + size2);
EXPECT_EQ(t3->consumption(), size3);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]