This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c6b2cc7067b [fix](memory) Fix Allocator release memory to correct
MemTracker after TLS attach task ends (#39908)
c6b2cc7067b is described below
commit c6b2cc7067bb7868f5d1d766714f8ecbde1ad42f
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Aug 28 01:12:55 2024 +0800
[fix](memory) Fix Allocator release memory to correct MemTracker after TLS
attach task ends (#39908)
Allocator save TLS MemTracker during first alloc, which is used to
release memory after TLS attach task ends.
```
23:00:15 F20240824 22:56:49.773799 66432 thread_context.h:238] Check
failed: doris::k_doris_exit || !doris::config::enable_memory_orphan_check ||
thread_mem_tracker()->label() != "Orphan" If you crash here, it means that
SCOPED_ATTACH_TASK and SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used
correctly. starting position of each thread is expected to use
SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging to
Query/Load/Compaction/Other Tasks, otherwise memory alloc using Do [...]
23:00:15 *** Check failure stack trace: ***
23:00:15 @ 0x55645d3388e6
google::LogMessageFatal::~LogMessageFatal()
23:00:15 @ 0x5564439637c2 doris::ThreadContext::consume_memory()
23:00:15 @ 0x5564439914fe Allocator<>::release_memory()
23:00:15 @ 0x5564354be11e std::_Sp_counted_ptr<>::_M_dispose()
23:00:15 @ 0x55643557bc3b std::deque<>::pop_front()
23:00:15 @ 0x5564355756b1
doris::io::StreamLoadPipe::~StreamLoadPipe()
23:00:15 @ 0x5564354bfa77
doris::StreamLoadContext::~StreamLoadContext()
23:00:15 @ 0x556436ee5114 doris::HttpRequest::~HttpRequest()
```
---
be/src/common/config.cpp | 6 ++++-
be/src/common/config.h | 3 +++
be/src/http/action/http_stream.cpp | 4 +--
be/src/io/file_factory.cpp | 6 ++---
be/src/olap/page_cache.cpp | 10 +++----
be/src/olap/page_cache.h | 1 -
be/src/runtime/stream_load/stream_load_context.h | 17 +++++++-----
be/src/runtime/thread_context.h | 28 ++++++++++----------
be/src/service/internal_service.cpp | 3 ++-
be/src/util/byte_buffer.h | 6 ++++-
be/src/vec/common/allocator.cpp | 33 ++++++++++++++++++++++--
be/src/vec/common/allocator.h | 8 +++++-
12 files changed, 86 insertions(+), 39 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 216c2e133c8..b477247c669 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -142,7 +142,11 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes,
"2147483648");
DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");
-DEFINE_mBool(enable_memory_orphan_check, "false");
+// default is true. if any memory tracking in Orphan mem tracker will report
error.
+// !! not modify the default value of this conf!! otherwise memory errors
cannot be detected in time.
+// allocator free memory not need to check, because when the thread memory
tracker label is Orphan,
+// use the tracker saved in Allocator.
+DEFINE_mBool(enable_memory_orphan_check, "true");
// The maximum time a thread waits for full GC. Currently only query will wait
for full gc.
DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 78f94577d81..67048bc615e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -197,6 +197,9 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);
// default is true. if any memory tracking in Orphan mem tracker will report
error.
+// !! not modify the default value of this conf!! otherwise memory errors
cannot be detected in time.
+// allocator free memory not need to check, because when the thread memory
tracker label is Orphan,
+// use the tracker saved in Allocator.
DECLARE_mBool(enable_memory_orphan_check);
// The maximum time a thread waits for a full GC. Currently only query will
wait for full gc.
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index afeb251ca41..7dbae6df731 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -247,8 +247,8 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
- if (ctx->schema_buffer->pos + remove_bytes <
config::stream_tvf_buffer_size) {
- ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
+ if (ctx->schema_buffer()->pos + remove_bytes <
config::stream_tvf_buffer_size) {
+ ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to
obtain column information";
ctx->is_read_schema = false;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 7f64ea50710..f4ce573c535 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -209,9 +209,9 @@ Status FileFactory::create_pipe_reader(const TUniqueId&
load_id, io::FileReaderS
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024
/* min_chunk_size */,
- stream_load_ctx->schema_buffer->pos /* total_length */);
- stream_load_ctx->schema_buffer->flip();
- RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer));
+ stream_load_ctx->schema_buffer()->pos /* total_length */);
+ stream_load_ctx->schema_buffer()->flip();
+ RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
RETURN_IF_ERROR(pipe->finish());
*file_reader = std::move(pipe);
} else {
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 1f0556f4642..b70dadc5b43 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -28,12 +28,10 @@ template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool use_cache,
segment_v2::PageTypePB page_type)
: LRUCacheValueBase(), _size(b), _capacity(b) {
if (use_cache) {
- _mem_tracker_by_allocator =
StoragePageCache::instance()->mem_tracker(page_type);
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ StoragePageCache::instance()->mem_tracker(page_type));
+ _data = reinterpret_cast<char*>(TAllocator::alloc(_capacity,
ALLOCATOR_ALIGNMENT_16));
} else {
- _mem_tracker_by_allocator =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
- }
- {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity,
ALLOCATOR_ALIGNMENT_16));
}
}
@@ -42,7 +40,7 @@ template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_);
TAllocator::free(_data, _capacity);
}
}
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index 09fc689959c..ef25de7bc30 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -60,7 +60,6 @@ private:
// Effective size, smaller than capacity, such as data page remove
checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
- std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
};
using DataPage = PageBase<Allocator<false>>;
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index f7c4a0d474f..95e56e0b3fa 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -96,14 +96,9 @@ class StreamLoadContext {
public:
StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()),
_exec_env(exec_env) {
start_millis = UnixMillis();
- SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
- schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
}
~StreamLoadContext() {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->stream_load_pipe_tracker());
- schema_buffer.reset();
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
@@ -126,6 +121,15 @@ public:
bool is_mow_table() const;
+ ByteBufferPtr schema_buffer() {
+ if (_schema_buffer == nullptr) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->stream_load_pipe_tracker());
+ _schema_buffer =
ByteBuffer::allocate(config::stream_tvf_buffer_size);
+ }
+ return _schema_buffer;
+ }
+
public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
@@ -190,8 +194,6 @@ public:
std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;
- ByteBufferPtr schema_buffer;
-
TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;
@@ -253,6 +255,7 @@ public:
private:
ExecEnv* _exec_env = nullptr;
+ ByteBufferPtr _schema_buffer;
};
} // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index ff8f2c6b0b5..6158f0535be 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -50,7 +50,7 @@
// Used after SCOPED_ATTACH_TASK, in order to count the memory into another
// MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task.
#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \
- auto VARNAME_LINENUM(switch_mem_tracker) =
SwitchThreadMemTrackerLimiter(arg1)
+ auto VARNAME_LINENUM(switch_mem_tracker) =
doris::SwitchThreadMemTrackerLimiter(arg1)
// Looking forward to tracking memory during thread execution into MemTracker.
// Usually used to record query more detailed memory, including ExecNode
operators.
@@ -170,8 +170,7 @@ static std::string memory_orphan_check_msg =
"each thread is expected to use SCOPED_ATTACH_TASK to bind a
MemTrackerLimiter belonging "
"to Query/Load/Compaction/Other Tasks, otherwise memory alloc using
Doris Allocator in the "
"thread will crash. If you want to switch MemTrackerLimiter during
thread execution, "
- "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat
Attach. Of course, you "
- "can modify enable_memory_orphan_check=false in be.conf to avoid this
crash.";
+ "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat
Attach.";
// The thread context saves some info about a working thread.
// 2 required info:
@@ -222,9 +221,9 @@ public:
ss << std::this_thread::get_id();
return ss.str();
}
- // After thread_mem_tracker_mgr is initialized, the current thread Hook
starts to
- // consume/release mem_tracker.
- // Note that the use of shared_ptr will cause a crash. The guess is that
there is an
+ // Note that if set global Memory Hook, After thread_mem_tracker_mgr is
initialized,
+ // the current thread Hook starts to consume/release mem_tracker.
+ // the use of shared_ptr will cause a crash. The guess is that there is an
// intermediate state during the copy construction of shared_ptr.
Shared_ptr is not equal
// to nullptr, but the object it points to is not initialized. At this
time, when the memory
// is released somewhere, the hook is triggered to cause the crash.
@@ -318,7 +317,7 @@ public:
// The brpc server should respond as quickly as possible.
bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
// set the data so that next time bthread_getspecific in the
thread returns the data.
- CHECK(0 == bthread_setspecific(btls_key, bthread_context) ||
k_doris_exit);
+ CHECK(0 == bthread_setspecific(btls_key, bthread_context) ||
doris::k_doris_exit);
}
DCHECK(bthread_context != nullptr);
bthread_context->thread_local_handle_count++;
@@ -360,7 +359,7 @@ static ThreadContext* thread_context(bool allow_return_null
= false) {
// in bthread
// bthread switching pthread may be very frequent, remember not to use
lock or other time-consuming operations.
auto* bthread_context =
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
- DCHECK(bthread_context != nullptr);
+ DCHECK(bthread_context != nullptr &&
bthread_context->thread_local_handle_count > 0);
return bthread_context;
}
if (allow_return_null) {
@@ -449,15 +448,16 @@ public:
class SwitchThreadMemTrackerLimiter {
public:
- explicit SwitchThreadMemTrackerLimiter(const
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
+ explicit SwitchThreadMemTrackerLimiter(
+ const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
- ThreadLocalHandle::create_thread_local_if_not_exits();
+ doris::ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
}
- explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext&
query_thread_context) {
- ThreadLocalHandle::create_thread_local_if_not_exits();
+ explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext&
query_thread_context) {
+ doris::ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() ==
query_thread_context.query_id); // workload group alse not
change
DCHECK(query_thread_context.query_mem_tracker);
@@ -468,11 +468,11 @@ public:
~SwitchThreadMemTrackerLimiter() {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
- ThreadLocalHandle::del_thread_local_if_count_is_zero();
+ doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
}
private:
- std::shared_ptr<MemTrackerLimiter> _old_mem_tracker;
+ std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker;
};
class AddThreadMemTrackerConsumer {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index c2251c240ae..744024c940c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -767,7 +767,8 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
std::shared_ptr<MemTrackerLimiter> mem_tracker =
MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
- fmt::format("{}#{}", params.format_type, params.file_type));
+ fmt::format("InternalService::fetch_table_schema:{}#{}",
params.format_type,
+ params.file_type));
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
// make sure profile is desctructed after reader cause
PrefetchBufferedReader
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index e8eadf69e02..1499f51c053 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -24,6 +24,7 @@
#include "common/logging.h"
#include "common/status.h"
+#include "runtime/thread_context.h"
#include "vec/common/allocator.h"
#include "vec/common/allocator_fwd.h"
@@ -43,7 +44,10 @@ struct ByteBuffer : private Allocator<false> {
return Status::OK();
}
- ~ByteBuffer() { Allocator<false>::free(ptr, capacity); }
+ ~ByteBuffer() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
+ Allocator<false>::free(ptr, capacity);
+ }
void put_bytes(const char* data, size_t size) {
memcpy(ptr + pos, data, size);
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 82cd78a7fc1..ae5f27989b2 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -211,14 +211,43 @@ void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::memory_
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::consume_memory(
- size_t size) const {
+ size_t size) {
+ // Usually, an object that inherits Allocator has the same TLS tracker for
each alloc.
+ // If an object that inherits Allocator needs to be reused by multiple
queries,
+ // it is necessary to switch the same tracker to TLS when calling alloc.
+ // However, in ORC Reader, ORC DataBuffer will be reused, but we cannot
switch TLS tracker,
+ // so we update the Allocator tracker when the TLS tracker changes.
+ // note that the tracker in thread context when object that inherit
Allocator is constructed may be
+ // no attach memory tracker in tls. usually the memory tracker is attached
in tls only during the first alloc.
+ if (mem_tracker_ == nullptr ||
+ mem_tracker_->label() !=
doris::thread_context()->thread_mem_tracker()->label()) {
+ mem_tracker_ =
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+ }
CONSUME_THREAD_MEM_TRACKER(size);
}
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::release_memory(
size_t size) const {
- RELEASE_THREAD_MEM_TRACKER(size);
+ doris::ThreadContext* thread_context = doris::thread_context(true);
+ if ((thread_context && thread_context->thread_mem_tracker()->label() !=
"Orphan") ||
+ mem_tracker_ == nullptr) {
+ // If thread_context exist and the label of thread_mem_tracker not
equal to `Orphan`,
+ // this means that in the scope of SCOPED_ATTACH_TASK,
+ // so thread_mem_tracker should be used to release memory.
+ // If mem_tracker_ is nullptr there is a scenario where an object that
inherits Allocator
+ // has never called alloc, but free memory.
+ // in phmap, the memory alloced by an object may be transferred to
another object and then free.
+ // in this case, thread context must attach a memory tracker other
than Orphan,
+ // otherwise memory tracking will be wrong.
+ RELEASE_THREAD_MEM_TRACKER(size);
+ } else {
+ // if thread_context does not exist or the label of thread_mem_tracker
is equal to
+ // `Orphan`, it usually happens during object destruction. This means
that
+ // the scope of SCOPED_ATTACH_TASK has been left, so release memory
using Allocator tracker.
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
+ RELEASE_THREAD_MEM_TRACKER(size);
+ }
}
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 2bcce9a9c68..f393886cf0b 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -87,6 +87,10 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
// is always a multiple of sixteen.
(https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html)
static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;
+namespace doris {
+class MemTrackerLimiter;
+}
+
class DefaultMemoryAllocator {
public:
static void* malloc(size_t size) __THROW { return std::malloc(size); }
@@ -228,7 +232,7 @@ public:
// alloc will continue to execute, so the consume memtracker is forced.
void memory_check(size_t size) const;
// Increases consumption of this tracker by 'bytes'.
- void consume_memory(size_t size) const;
+ void consume_memory(size_t size);
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifndef NDEBUG
@@ -400,6 +404,8 @@ protected:
static constexpr bool clear_memory = clear_memory_;
+ std::shared_ptr<doris::MemTrackerLimiter> mem_tracker_ {nullptr};
+
// Freshly mmapped pages are copy-on-write references to a global zero
page.
// On the first write, a page fault occurs, and an actual writable page is
// allocated. If we are going to use this memory soon, such as when
resizing
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]