This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 80c65f2ce0c [fix](memory) When Load ends, check memory tracker value
returns is equal to 0 (#40016)
80c65f2ce0c is described below
commit 80c65f2ce0cfacf82f5b9715f8ac8ececcdbacb3
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Sep 3 12:43:46 2024 +0800
[fix](memory) When Load ends, check memory tracker value returns is equal
to 0 (#40016)
Check all memory is freed when Load is finished.
---
be/src/pipeline/pipeline_fragment_context.cpp | 8 +++
be/src/runtime/memory/mem_tracker_limiter.cpp | 71 ++++++++++++++++-----------
be/src/runtime/memory/mem_tracker_limiter.h | 2 +
3 files changed, 52 insertions(+), 29 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 51cfd73e549..3bf54ed7ece 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1031,6 +1031,10 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
+#ifndef NDEBUG
+ DCHECK(state->get_query_ctx() != nullptr);
+ state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
+#endif
_sink.reset(
new GroupCommitBlockSinkOperatorX(next_sink_operator_id(),
row_desc, output_exprs));
break;
@@ -1177,6 +1181,10 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
break;
}
case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
+#ifndef NDEBUG
+ DCHECK(_query_ctx != nullptr);
+ _query_ctx->query_mem_tracker->is_group_commit_load = true;
+#endif
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(),
descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
if (request.__isset.parallel_instances) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index cc695a6fdd5..6df577f8a50 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() {
"mem tracker not equal to 0 when mem tracker destruct, this
usually means that "
"memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
+ "If the log is truncated, search for `Address Sanitizer` in the
be.INFO log to see "
+ "more information."
"1. For query and load, memory leaks may have occurred, it is
expected that the query "
"mem tracker will be bound to the thread context using
SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc
and free. "
@@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_consumption->current_value() != 0) {
// TODO, expect mem tracker equal to 0 at the load/compaction/etc.
task end.
#ifndef NDEBUG
- if (_type == Type::QUERY) {
+ if (_type == Type::QUERY || (_type == Type::LOAD &&
!is_group_commit_load)) {
std::string err_msg =
fmt::format("mem tracker label: {}, consumption: {}, peak
consumption: {}, {}.",
label(), _consumption->current_value(),
_consumption->peak_value(),
@@ -140,11 +142,11 @@ MemTrackerLimiter::~MemTrackerLimiter() {
}
_consumption->set(0);
#ifndef NDEBUG
- } else if (!_address_sanitizers.empty()) {
- LOG(INFO) << "[Address Sanitizer] consumption is 0, but address
sanitizers not empty. "
- << ", mem tracker label: " << _label
- << ", peak consumption: " << _consumption->peak_value()
- << print_address_sanitizers();
+ } else if (!_address_sanitizers.empty() && !is_group_commit_load) {
+ LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address
sanitizers not empty. "
+ << ", mem tracker label: " << _label
+ << ", peak consumption: " << _consumption->peak_value()
+ << print_address_sanitizers();
#endif
}
memory_memtrackerlimiter_cnt << -1;
@@ -152,17 +154,17 @@ MemTrackerLimiter::~MemTrackerLimiter() {
#ifndef NDEBUG
void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
- if (_type == Type::QUERY) {
+ if (_type == Type::QUERY || (_type == Type::LOAD &&
!is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
- LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem
tracker label: " << _label
- << ", consumption: " << _consumption->current_value()
- << ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf
- << ", size: " << size << ", old buf: " << it->first
- << ", old size: " << it->second.size
- << ", new stack_trace: " << get_stack_trace(1,
"DISABLED")
- << ", old stack_trace: " << it->second.stack_trace;
+ _error_address_sanitizers.emplace_back(
+ fmt::format("[Address Sanitizer] memory buf repeat add,
mem tracker label: {}, "
+ "consumption: {}, peak consumption: {}, buf:
{}, size: {}, old "
+ "buf: {}, old size: {}, new stack_trace: {},
old stack_trace: {}.",
+ _label, _consumption->current_value(),
_consumption->peak_value(),
+ buf, size, it->first, it->second.size,
+ get_stack_trace(1, "FULL_WITH_INLINE"),
it->second.stack_trace));
}
// if alignment not equal to 0, maybe usable_size > size.
@@ -174,26 +176,26 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf,
size_t size) {
}
void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
- if (_type == Type::QUERY) {
+ if (_type == Type::QUERY || (_type == Type::LOAD &&
!is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size) {
- LOG(INFO) << "[Address Sanitizer] free memory buf size
inaccurate, mem tracker "
- "label: "
- << _label << ", consumption: " <<
_consumption->current_value()
- << ", peak consumption: " <<
_consumption->peak_value()
- << ", buf: " << buf << ", size: " << size << ", old
buf: " << it->first
- << ", old size: " << it->second.size
- << ", new stack_trace: " << get_stack_trace(1,
"DISABLED")
- << ", old stack_trace: " << it->second.stack_trace;
+ _error_address_sanitizers.emplace_back(fmt::format(
+ "[Address Sanitizer] free memory buf size inaccurate,
mem tracker label: "
+ "{}, consumption: {}, peak consumption: {}, buf: {},
size: {}, old buf: "
+ "{}, old size: {}, new stack_trace: {}, old
stack_trace: {}.",
+ _label, _consumption->current_value(),
_consumption->peak_value(), buf,
+ size, it->first, it->second.size, get_stack_trace(1,
"FULL_WITH_INLINE"),
+ it->second.stack_trace));
}
_address_sanitizers.erase(buf);
} else {
- LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem
tracker label: " << _label
- << ", consumption: " << _consumption->current_value()
- << ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf
- << ", size: " << size << ", stack_trace: " <<
get_stack_trace(1, "DISABLED");
+ _error_address_sanitizers.emplace_back(fmt::format(
+ "[Address Sanitizer] memory buf not exist, mem tracker
label: {}, consumption: "
+ "{}, peak consumption: {}, buf: {}, size: {}, stack_trace:
{}.",
+ _label, _consumption->current_value(),
_consumption->peak_value(), buf, size,
+ get_stack_trace(1, "FULL_WITH_INLINE")));
}
}
}
@@ -201,9 +203,20 @@ void MemTrackerLimiter::remove_address_sanitizers(void*
buf, size_t size) {
std::string MemTrackerLimiter::print_address_sanitizers() {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
std::string detail = "[Address Sanitizer]:";
+ detail += "\n memory not be freed:";
for (const auto& it : _address_sanitizers) {
- detail += fmt::format("\n {}, size {}, strack trace: {}", it.first,
it.second.size,
- it.second.stack_trace);
+ auto msg = fmt::format(
+ "\n [Address Sanitizer] buf not be freed, mem tracker
label: {}, consumption: "
+ "{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
+ _label, _consumption->current_value(),
_consumption->peak_value(), it.first,
+ it.second.size, it.second.stack_trace);
+ LOG(INFO) << msg;
+ detail += msg;
+ }
+ detail += "\n incorrect memory alloc and free:";
+ for (const auto& err_msg : _error_address_sanitizers) {
+ LOG(INFO) << err_msg;
+ detail += fmt::format("\n {}", err_msg);
}
return detail;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index e5c5cb1bc03..344f3dc92b6 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -209,6 +209,7 @@ public:
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
std::string print_address_sanitizers();
+ bool is_group_commit_load {false};
#endif
std::string debug_string() override {
@@ -260,6 +261,7 @@ private:
std::mutex _address_sanitizers_mtx;
std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
+ std::vector<std::string> _error_address_sanitizers;
#endif
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]