This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1f3de0eae3 [fix](memory) fix invalid large memory check && fix memory
info thread safety (#22027)
1f3de0eae3 is described below
commit 1f3de0eae3f56068a3ea23a066df25454fe67cf5
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Jul 26 12:18:31 2023 +0800
[fix](memory) fix invalid large memory check && fix memory info thread
safety (#22027)
fix invalid large memory check
fix memory info thread safety
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 5 +++
be/src/runtime/memory/thread_mem_tracker_mgr.h | 17 ++++++----
be/src/runtime/thread_context.h | 27 ++++++++-------
be/src/util/mem_info.cpp | 46 +++++++++++++++-----------
be/src/util/mem_info.h | 31 +++++++++++------
be/src/vec/common/allocator.cpp | 20 +++++++++--
be/src/vec/common/allocator.h | 7 ++--
8 files changed, 102 insertions(+), 53 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9a0b2a0e3a..cb7f0f3698 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -128,6 +128,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");
DEFINE_mBool(disable_memory_gc, "false");
+DEFINE_mInt64(large_memory_check_bytes, "1073741824");
+
// The maximum time a thread waits for a full GC. Currently only query will
wait for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 169bbbac32..63cb352618 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -169,6 +169,11 @@ DECLARE_mBool(enable_query_memory_overcommit);
// default gc strategy is conservative, if you want to exclude the
interference of gc, let it be true
DECLARE_mBool(disable_memory_gc);
+// malloc or new large memory larger than large_memory_check_bytes and Doris
Allocator is not used,
+// will print a warning containing the stacktrace, but not prevent memory
alloc.
+// large memory alloc looking forward to using Allocator.
+DECLARE_mInt64(large_memory_check_bytes);
+
// The maximum time a thread waits for a full GC. Currently only query will
wait for full gc.
DECLARE_mInt32(thread_wait_gc_max_milliseconds);
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index ca4334ad45..2bf866cb9d 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -76,7 +76,7 @@ public:
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion,
otherwise it may cause crash or stuck,
// Returns whether the memory exceeds limit, and will consume mem trcker
no matter whether the limit is exceeded.
- void consume(int64_t size);
+ void consume(int64_t size, bool large_memory_check = false);
void flush_untracked_mem();
bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
@@ -160,23 +160,28 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
_consumer_tracker_stack.pop_back();
}
-inline void ThreadMemTrackerMgr::consume(int64_t size) {
+inline void ThreadMemTrackerMgr::consume(int64_t size, bool
large_memory_check) {
_untracked_mem += size;
+ if (!ExecEnv::GetInstance()->initialized()) {
+ return;
+ }
// When some threads `0 < _untracked_mem <
config::mem_tracker_consume_min_size_bytes`
// and some threads `_untracked_mem <=
-config::mem_tracker_consume_min_size_bytes` trigger consumption(),
// it will cause tracker->consumption to be temporarily less than 0.
// After the jemalloc hook is loaded, before ExecEnv init,
_limiter_tracker=nullptr.
if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
- !_stop_consume && ExecEnv::GetInstance()->initialized()) {
+ !_stop_consume) {
flush_untracked_mem();
}
// Large memory alloc should use allocator.h
// Direct malloc or new large memory, unable to catch std::bad_alloc, BE
may OOM.
- if (size > 1024l * 1024 * 1024 && !doris::config::disable_memory_gc) { //
1G
+ if (large_memory_check && size > doris::config::large_memory_check_bytes) {
_stop_consume = true;
- LOG(WARNING) << fmt::format("MemHook alloc large memory: {},
stacktrace:\n{}", size,
- get_stack_trace());
+ LOG(WARNING) << fmt::format(
+ "malloc or new large memory: {}, looking forward to using
Allocator, this is just "
+ "a warning, not prevent memory alloc, stacktrace:\n{}",
+ size, get_stack_trace());
_stop_consume = false;
}
}
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 2a46e22a18..c32a9d5653 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -86,11 +86,11 @@
#define SCOPED_TRACK_MEMORY_TO_UNKNOWN() (void)0
#endif
-#define SKIP_MEMORY_CHECK(...) \
- do { \
- doris::skip_memory_check++; \
- DEFER({ doris::skip_memory_check--; }); \
- __VA_ARGS__; \
+#define SKIP_MEMORY_CHECK(...) \
+ do { \
+ doris::thread_context()->skip_memory_check++; \
+ DEFER({ doris::thread_context()->skip_memory_check--; }); \
+ __VA_ARGS__; \
} while (0)
namespace doris {
@@ -137,7 +137,6 @@ public:
};
inline thread_local ThreadContextPtr thread_context_ptr;
-inline thread_local int skip_memory_check = 0;
// To avoid performance problems caused by frequently calling
`bthread_getspecific` to obtain bthread TLS
// in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS.
@@ -201,7 +200,13 @@ public:
return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
}
+ void consume_memory(const int64_t size) const {
+ thread_mem_tracker_mgr->consume(size, large_memory_check);
+ }
+
int switch_bthread_local_count = 0;
+ int skip_memory_check = 0;
+ bool large_memory_check = true;
private:
TUniqueId _task_id;
@@ -364,10 +369,8 @@ private:
// Basic macros for mem tracker, usually do not need to be modified and used.
#ifdef USE_MEM_TRACKER
// For the memory that cannot be counted by mem hook, manually count it into
the mem tracker, such as mmap.
-#define CONSUME_THREAD_MEM_TRACKER(size) \
- doris::thread_context()->thread_mem_tracker_mgr->consume(size)
-#define RELEASE_THREAD_MEM_TRACKER(size) \
- doris::thread_context()->thread_mem_tracker_mgr->consume(-size)
+#define CONSUME_THREAD_MEM_TRACKER(size)
doris::thread_context()->consume_memory(size)
+#define RELEASE_THREAD_MEM_TRACKER(size)
doris::thread_context()->consume_memory(-size)
// used to fix the tracking accuracy of caches.
#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)
\
@@ -385,7 +388,7 @@ private:
#define CONSUME_MEM_TRACKER(size)
\
do {
\
if (doris::thread_context_ptr.init) {
\
- doris::thread_context()->thread_mem_tracker_mgr->consume(size);
\
+ doris::thread_context()->consume_memory(size);
\
} else if (doris::ExecEnv::GetInstance()->initialized()) {
\
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size);
\
}
\
@@ -393,7 +396,7 @@ private:
#define RELEASE_MEM_TRACKER(size)
\
do {
\
if (doris::thread_context_ptr.init) {
\
- doris::thread_context()->thread_mem_tracker_mgr->consume(-size);
\
+ doris::thread_context()->consume_memory(-size);
\
} else if (doris::ExecEnv::GetInstance()->initialized()) {
\
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(
\
-size);
\
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 29c58ca23e..79341c1e77 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -60,14 +60,14 @@ std::string MemInfo::_s_mem_limit_str = "";
int64_t MemInfo::_s_soft_mem_limit = -1;
std::string MemInfo::_s_soft_mem_limit_str = "";
-int64_t MemInfo::_s_allocator_cache_mem = 0;
+std::atomic<int64_t> MemInfo::_s_allocator_cache_mem = 0;
std::string MemInfo::_s_allocator_cache_mem_str = "";
-int64_t MemInfo::_s_virtual_memory_used = 0;
-int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
+std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
+std::atomic<int64_t> MemInfo::_s_proc_mem_no_allocator_cache = -1;
std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;
static std::unordered_map<std::string, int64_t> _mem_info_bytes;
-int64_t MemInfo::_s_sys_mem_available = -1;
+std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1;
std::string MemInfo::_s_sys_mem_available_str = "";
int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1;
@@ -86,21 +86,26 @@ void MemInfo::refresh_allocator_mem() {
// https://jemalloc.net/jemalloc.3.html
// https://www.bookstack.cn/read/aliyun-rds-core/4a0cdf677f62feb3.md
- _s_allocator_cache_mem = get_je_all_arena_metrics("tcache_bytes") +
- get_je_metrics("stats.metadata") +
- get_je_all_arena_metrics("pdirty") *
get_page_size();
- _s_allocator_cache_mem_str =
-
PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem),
TUnit::BYTES);
- _s_virtual_memory_used = get_je_metrics("stats.mapped");
+ _s_allocator_cache_mem.store(get_je_all_arena_metrics("tcache_bytes") +
+ get_je_metrics("stats.metadata") +
+ get_je_all_arena_metrics("pdirty") *
get_page_size(),
+ std::memory_order_relaxed);
+ _s_allocator_cache_mem_str = PrettyPrinter::print(
+
static_cast<uint64_t>(_s_allocator_cache_mem.load(std::memory_order_relaxed)),
+ TUnit::BYTES);
+ _s_virtual_memory_used.store(get_je_metrics("stats.mapped"),
std::memory_order_relaxed);
#else
- _s_allocator_cache_mem = get_tc_metrics("tcmalloc.pageheap_free_bytes") +
-
get_tc_metrics("tcmalloc.central_cache_free_bytes") +
-
get_tc_metrics("tcmalloc.transfer_cache_free_bytes") +
-
get_tc_metrics("tcmalloc.thread_cache_free_bytes");
- _s_allocator_cache_mem_str =
-
PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem),
TUnit::BYTES);
- _s_virtual_memory_used = get_tc_metrics("generic.total_physical_bytes") +
-
get_tc_metrics("tcmalloc.pageheap_unmapped_bytes");
+
_s_allocator_cache_mem.store(get_tc_metrics("tcmalloc.pageheap_free_bytes") +
+
get_tc_metrics("tcmalloc.central_cache_free_bytes") +
+
get_tc_metrics("tcmalloc.transfer_cache_free_bytes") +
+
get_tc_metrics("tcmalloc.thread_cache_free_bytes"),
+ std::memory_order_relaxed);
+ _s_allocator_cache_mem_str = PrettyPrinter::print(
+
static_cast<uint64_t>(_s_allocator_cache_mem.load(std::memory_order_relaxed)),
+ TUnit::BYTES);
+
_s_virtual_memory_used.store(get_tc_metrics("generic.total_physical_bytes") +
+
get_tc_metrics("tcmalloc.pageheap_unmapped_bytes"),
+ std::memory_order_relaxed);
#endif
}
@@ -307,8 +312,9 @@ void MemInfo::refresh_proc_meminfo() {
if (meminfo.is_open()) meminfo.close();
if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
- _s_sys_mem_available = _mem_info_bytes["MemAvailable"];
- _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available,
TUnit::BYTES);
+ _s_sys_mem_available.store(_mem_info_bytes["MemAvailable"],
std::memory_order_relaxed);
+ _s_sys_mem_available_str = PrettyPrinter::print(
+ _s_sys_mem_available.load(std::memory_order_relaxed),
TUnit::BYTES);
}
}
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 179295fd69..98179dda2c 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -71,7 +71,8 @@ public:
static void refresh_proc_meminfo();
static inline int64_t sys_mem_available() {
- return _s_sys_mem_available - refresh_interval_memory_growth;
+ return _s_sys_mem_available.load(std::memory_order_relaxed) -
+ refresh_interval_memory_growth;
}
static inline std::string sys_mem_available_str() { return
_s_sys_mem_available_str; }
static inline int64_t sys_mem_available_low_water_mark() {
@@ -121,11 +122,16 @@ public:
#endif
}
- static inline size_t allocator_virtual_mem() { return
_s_virtual_memory_used; }
- static inline size_t allocator_cache_mem() { return
_s_allocator_cache_mem; }
+ static inline size_t allocator_virtual_mem() {
+ return _s_virtual_memory_used.load(std::memory_order_relaxed);
+ }
+ static inline size_t allocator_cache_mem() {
+ return _s_allocator_cache_mem.load(std::memory_order_relaxed);
+ }
static inline std::string allocator_cache_mem_str() { return
_s_allocator_cache_mem_str; }
static inline int64_t proc_mem_no_allocator_cache() {
- return _s_proc_mem_no_allocator_cache + refresh_interval_memory_growth;
+ return _s_proc_mem_no_allocator_cache.load(std::memory_order_relaxed) +
+ refresh_interval_memory_growth;
}
// Tcmalloc property `generic.total_physical_bytes` records the total
length of the virtual memory
@@ -140,8 +146,10 @@ public:
* that can be used at anytime via jemalloc.
*/
static inline void refresh_proc_mem_no_allocator_cache() {
- _s_proc_mem_no_allocator_cache =
- PerfCounters::get_vm_rss() -
static_cast<int64_t>(_s_allocator_cache_mem);
+ _s_proc_mem_no_allocator_cache.store(
+ PerfCounters::get_vm_rss() -
static_cast<int64_t>(_s_allocator_cache_mem.load(
+
std::memory_order_relaxed)),
+ std::memory_order_relaxed);
refresh_interval_memory_growth = 0;
}
@@ -162,7 +170,8 @@ public:
return _s_soft_mem_limit_str;
}
static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
- return proc_mem_no_allocator_cache() + bytes > soft_mem_limit();
+ return proc_mem_no_allocator_cache() + bytes >= soft_mem_limit() ||
+ sys_mem_available() < sys_mem_available_warning_water_mark();
}
static std::string debug_string();
@@ -185,12 +194,12 @@ private:
static int64_t _s_soft_mem_limit;
static std::string _s_soft_mem_limit_str;
- static int64_t _s_allocator_cache_mem;
+ static std::atomic<int64_t> _s_allocator_cache_mem;
static std::string _s_allocator_cache_mem_str;
- static int64_t _s_virtual_memory_used;
- static int64_t _s_proc_mem_no_allocator_cache;
+ static std::atomic<int64_t> _s_virtual_memory_used;
+ static std::atomic<int64_t> _s_proc_mem_no_allocator_cache;
- static int64_t _s_sys_mem_available;
+ static std::atomic<int64_t> _s_sys_mem_available;
static std::string _s_sys_mem_available_str;
static int64_t _s_sys_mem_available_low_water_mark;
static int64_t _s_sys_mem_available_warning_water_mark;
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 9167c7df9f..ed3a2440ee 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -38,7 +38,7 @@
template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t size) const {
- if (doris::skip_memory_check) return;
+ if (doris::thread_context()->skip_memory_check) return;
if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
// Only thread attach query, and has not completely waited for
thread_wait_gc_max_milliseconds,
// will wait for gc, asynchronous cancel or throw bad::alloc.
@@ -116,7 +116,7 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
template <bool clear_memory_, bool mmap_populate, bool use_mmap>
void Allocator<clear_memory_, mmap_populate,
use_mmap>::memory_tracker_check(size_t size) const {
- if (doris::skip_memory_check) return;
+ if (doris::thread_context()->skip_memory_check) return;
auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size);
if (!st) {
auto err_msg =
@@ -175,6 +175,22 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::throw_bad_alloc(
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
}
+template <bool clear_memory_, bool mmap_populate, bool use_mmap>
+void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size,
size_t alignment) {
+ doris::thread_context()->large_memory_check = false;
+ DEFER({ doris::thread_context()->large_memory_check = true; });
+ return alloc_impl(size, alignment);
+}
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap>
+void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf,
size_t old_size,
+ size_t
new_size,
+ size_t
alignment) {
+ doris::thread_context()->large_memory_check = false;
+ DEFER({ doris::thread_context()->large_memory_check = true; });
+ return realloc_impl(buf, old_size, new_size, alignment);
+}
+
template class Allocator<true, true, true>;
template class Allocator<true, true, false>;
template class Allocator<true, false, true>;
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index ae29eb916c..24fef16290 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -92,8 +92,11 @@ public:
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
+ void* alloc(size_t size, size_t alignment = 0);
+ void* realloc(void* buf, size_t old_size, size_t new_size, size_t
alignment = 0);
+
/// Allocate memory range.
- void* alloc(size_t size, size_t alignment = 0) {
+ void* alloc_impl(size_t size, size_t alignment = 0) {
memory_check(size);
void* buf;
@@ -155,7 +158,7 @@ public:
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
- void* realloc(void* buf, size_t old_size, size_t new_size, size_t
alignment = 0) {
+ void* realloc_impl(void* buf, size_t old_size, size_t new_size, size_t
alignment = 0) {
if (old_size == new_size) {
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]