This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 16bb5cb430 [enhancement](memory) Jemalloc performance optimization and
compatibility with MemTracker #12496
16bb5cb430 is described below
commit 16bb5cb4306836e52acc2b609586e67324acc2ef
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Sep 28 12:04:29 2022 +0800
[enhancement](memory) Jemalloc performance optimization and compatibility
with MemTracker #12496
---
be/CMakeLists.txt | 6 +-
be/src/runtime/CMakeLists.txt | 6 ++
be/src/runtime/exec_env_init.cpp | 2 +-
be/src/runtime/memory/jemalloc_hook.cpp | 141 +++++++++++++++++++++++++
be/src/runtime/memory/tcmalloc_hook.h | 21 +---
be/src/runtime/memory/thread_mem_tracker_mgr.h | 35 ++++--
be/src/runtime/thread_context.h | 26 +++++
bin/start_be.sh | 2 +
8 files changed, 208 insertions(+), 31 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index edcbebdc0e..0674bc7922 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -223,7 +223,10 @@ add_library(leveldb STATIC IMPORTED)
set_target_properties(leveldb PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/libleveldb.a)
add_library(jemalloc STATIC IMPORTED)
-set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib64/libjemalloc.a)
+set_target_properties(jemalloc PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/libjemalloc_doris.a)
+
+add_library(jemalloc_arrow STATIC IMPORTED)
+set_target_properties(jemalloc_arrow PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib64/libjemalloc.a)
add_library(brotlicommon STATIC IMPORTED)
set_target_properties(brotlicommon PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib64/libbrotlicommon.a)
@@ -681,6 +684,7 @@ set(COMMON_THIRDPARTY
roaring
fmt
jemalloc
+ jemalloc_arrow
brotlicommon
brotlidec
brotlienc
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 210fb844de..cdd77455b4 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -115,6 +115,12 @@ if (WITH_MYSQL)
)
endif()
+if (USE_JEMALLOC)
+ set(RUNTIME_FILES ${RUNTIME_FILES}
+ memory/jemalloc_hook.cpp
+ )
+endif()
+
add_library(Runtime STATIC
${RUNTIME_FILES}
)
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 1afb147b05..828830ebeb 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -208,7 +208,7 @@ Status ExecEnv::_init_mem_tracker() {
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes,
"Process");
_orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan",
_process_mem_tracker);
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
- thread_context()->_thread_mem_tracker_mgr->init();
+ thread_context()->_thread_mem_tracker_mgr->init_impl();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) &&
!defined(ADDRESS_SANITIZER) && \
!defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) &&
!defined(USE_JEMALLOC)
diff --git a/be/src/runtime/memory/jemalloc_hook.cpp
b/be/src/runtime/memory/jemalloc_hook.cpp
new file mode 100644
index 0000000000..11dee8eab6
--- /dev/null
+++ b/be/src/runtime/memory/jemalloc_hook.cpp
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "jemalloc/jemalloc.h"
+#include "runtime/thread_context.h"
+
+extern "C" {
+void* doris_malloc(size_t size) __THROW {
+ MEM_MALLOC_HOOK(je_nallocx(size, 0));
+ void* ptr = je_malloc(size);
+ if (UNLIKELY(ptr == nullptr)) {
+ MEM_FREE_HOOK(je_nallocx(size, 0));
+ }
+ return ptr;
+}
+
+void doris_free(void* p) __THROW {
+ MEM_FREE_HOOK(je_malloc_usable_size(p));
+ je_free(p);
+}
+
+void* doris_realloc(void* p, size_t size) __THROW {
+ if (UNLIKELY(size == 0)) {
+ return nullptr;
+ }
+ int64_t old_size = je_malloc_usable_size(p);
+ MEM_MALLOC_HOOK(je_nallocx(size, 0) - old_size);
+ void* ptr = je_realloc(p, size);
+ if (UNLIKELY(ptr == nullptr)) {
+ MEM_FREE_HOOK(je_nallocx(size, 0) - old_size);
+ }
+ return ptr;
+}
+
+void* doris_calloc(size_t n, size_t size) __THROW {
+ if (UNLIKELY(size == 0)) {
+ return nullptr;
+ }
+
+ MEM_MALLOC_HOOK(n * size);
+ void* ptr = je_calloc(n, size);
+ if (UNLIKELY(ptr == nullptr)) {
+ MEM_FREE_HOOK(n * size);
+ } else {
+ MEM_FREE_HOOK(je_malloc_usable_size(ptr) - n * size);
+ }
+ return ptr;
+}
+
+void doris_cfree(void* ptr) __THROW {
+ MEM_FREE_HOOK(je_malloc_usable_size(ptr));
+ je_free(ptr);
+}
+
+void* doris_memalign(size_t align, size_t size) __THROW {
+ MEM_MALLOC_HOOK(size);
+ void* ptr = je_aligned_alloc(align, size);
+ if (UNLIKELY(ptr == nullptr)) {
+ MEM_FREE_HOOK(size);
+ } else {
+ MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ }
+ return ptr;
+}
+
+void* doris_aligned_alloc(size_t align, size_t size) __THROW {
+ MEM_MALLOC_HOOK(size);
+ void* ptr = je_aligned_alloc(align, size);
+ if (UNLIKELY(ptr == nullptr)) {
+ MEM_FREE_HOOK(size);
+ } else {
+ MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ }
+ return ptr;
+}
+
+void* doris_valloc(size_t size) __THROW {
+ MEM_MALLOC_HOOK(size);
+ void* ptr = je_valloc(size);
+ if (UNLIKELY(ptr == nullptr)) {
+ MEM_FREE_HOOK(size);
+ } else {
+ MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ }
+ return ptr;
+}
+
+void* doris_pvalloc(size_t size) __THROW {
+ MEM_MALLOC_HOOK(size);
+ void* ptr = je_valloc(size);
+ if (UNLIKELY(ptr == nullptr)) {
+ MEM_FREE_HOOK(size);
+ } else {
+ MEM_MALLOC_HOOK(je_malloc_usable_size(ptr) - size);
+ }
+ return ptr;
+}
+
+int doris_posix_memalign(void** r, size_t align, size_t size) __THROW {
+ MEM_MALLOC_HOOK(size);
+ int ret = je_posix_memalign(r, align, size);
+ if (UNLIKELY(ret != 0)) {
+ MEM_FREE_HOOK(size);
+ } else {
+ MEM_MALLOC_HOOK(je_malloc_usable_size(*r) - size);
+ }
+ return ret;
+}
+
+size_t doris_malloc_usable_size(void* ptr) __THROW {
+ size_t ret = je_malloc_usable_size(ptr);
+ return ret;
+}
+
+#define ALIAS(doris_fn) __attribute__((alias(#doris_fn), used))
+void* malloc(size_t size) __THROW ALIAS(doris_malloc);
+void free(void* p) __THROW ALIAS(doris_free);
+void* realloc(void* p, size_t size) __THROW ALIAS(doris_realloc);
+void* calloc(size_t n, size_t size) __THROW ALIAS(doris_calloc);
+void cfree(void* ptr) __THROW ALIAS(doris_cfree);
+void* memalign(size_t align, size_t size) __THROW ALIAS(doris_memalign);
+void* aligned_alloc(size_t align, size_t size) __THROW
ALIAS(doris_aligned_alloc);
+void* valloc(size_t size) __THROW ALIAS(doris_valloc);
+void* pvalloc(size_t size) __THROW ALIAS(doris_pvalloc);
+int posix_memalign(void** r, size_t a, size_t s) __THROW
ALIAS(doris_posix_memalign);
+size_t malloc_usable_size(void* ptr) __THROW ALIAS(doris_malloc_usable_size);
+}
diff --git a/be/src/runtime/memory/tcmalloc_hook.h
b/be/src/runtime/memory/tcmalloc_hook.h
index 627f42795d..6ec9352ad3 100644
--- a/be/src/runtime/memory/tcmalloc_hook.h
+++ b/be/src/runtime/memory/tcmalloc_hook.h
@@ -36,28 +36,11 @@
// destructor to control the behavior of consume can lead to unexpected
behavior,
// like this: if (LIKELY(doris::start_thread_mem_tracker)) {
void new_hook(const void* ptr, size_t size) {
- if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context !=
nullptr) {
- // Currently in bthread, consume thread context mem tracker in bthread
tls.
- doris::update_bthread_context();
-
doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
- } else if (doris::thread_context_ptr._init) {
-
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size,
0));
- } else {
- doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0));
- }
+ MEM_MALLOC_HOOK(tc_nallocx(size, 0));
}
void delete_hook(const void* ptr) {
- if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context !=
nullptr) {
- doris::update_bthread_context();
- doris::bthread_context->_thread_mem_tracker_mgr->consume(
- -tc_malloc_size(const_cast<void*>(ptr)));
- } else if (doris::thread_context_ptr._init) {
- doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(
- -tc_malloc_size(const_cast<void*>(ptr)));
- } else {
-
doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast<void*>(ptr)));
- }
+ MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
}
void init_hook() {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 399265bc51..4c9099528d 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -51,12 +51,15 @@ public:
// only for tcmalloc hook
static void consume_no_attach(int64_t size) {
- ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+ if (ExecEnv::GetInstance()->initialized()) {
+ ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+ }
}
// After thread initialization, calling `init` again must call
`clear_untracked_mems` first
// to avoid memory tracking loss.
void init();
+ void init_impl();
// After attach, the current thread TCMalloc Hook starts to
consume/release task mem_tracker
void attach_limiter_tracker(const std::string& task_id, const TUniqueId&
fragment_instance_id,
@@ -85,9 +88,13 @@ public:
bool is_attach_query() { return _fragment_instance_id_stack.back() !=
TUniqueId(); }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
+ if (_limiter_tracker_raw == nullptr) init_impl();
return _limiter_tracker_stack.back();
}
- MemTrackerLimiter* limiter_mem_tracker_raw() { return
_limiter_tracker_raw; }
+ MemTrackerLimiter* limiter_mem_tracker_raw() {
+ if (_limiter_tracker_raw == nullptr) init_impl();
+ return _limiter_tracker_raw;
+ }
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -120,7 +127,7 @@ private:
// _limiter_tracker_stack[0] = orphan_mem_tracker
std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
- MemTrackerLimiter* _limiter_tracker_raw;
+ MemTrackerLimiter* _limiter_tracker_raw = nullptr;
std::vector<MemTracker*> _consumer_tracker_stack;
// If true, call memtracker try_consume, otherwise call consume.
@@ -138,12 +145,18 @@ inline void ThreadMemTrackerMgr::init() {
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " <<
_limiter_tracker_stack.size();
- if (_limiter_tracker_stack.size() == 0) {
-
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
- _limiter_tracker_raw =
ExecEnv::GetInstance()->orphan_mem_tracker_raw();
- _task_id_stack.push_back("");
- _fragment_instance_id_stack.push_back(TUniqueId());
+ if (_limiter_tracker_raw == nullptr &&
ExecEnv::GetInstance()->initialized()) {
+ init_impl();
}
+}
+
+inline void ThreadMemTrackerMgr::init_impl() {
+ DCHECK(_limiter_tracker_stack.size() == 0);
+ DCHECK(_limiter_tracker_raw == nullptr);
+
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
+ _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
+ _task_id_stack.push_back("");
+ _fragment_instance_id_stack.push_back(TUniqueId());
_check_limit = true;
}
@@ -166,9 +179,10 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) {
// When some threads `0 < _untracked_mem <
config::mem_tracker_consume_min_size_bytes`
// and some threads `_untracked_mem <=
-config::mem_tracker_consume_min_size_bytes` trigger consumption(),
// it will cause tracker->consumption to be temporarily less than 0.
+ // After the jemalloc hook is loaded, before ExecEnv init,
_limiter_tracker=nullptr.
if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
- !_stop_consume) {
+ !_stop_consume && ExecEnv::GetInstance()->initialized()) {
if (_check_limit) {
flush_untracked_mem<true>();
} else {
@@ -182,8 +196,9 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem
tracker, which will lead to entering
// the TCMalloc Hook again, so suspend consumption to avoid falling into
an infinite loop.
_stop_consume = true;
- old_untracked_mem = _untracked_mem;
+ if (_limiter_tracker_raw == nullptr) init_impl();
DCHECK(_limiter_tracker_raw);
+ old_untracked_mem = _untracked_mem;
if (CheckLimit) {
#ifndef BE_TEST
// When all threads are started, `attach_limiter_tracker` is expected
to be called to bind the limiter tracker.
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 670b3ff613..fbfac1be59 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -268,4 +268,30 @@ public:
->_thread_mem_tracker_mgr->last_consumer_tracker(), \
msg),
\
##__VA_ARGS__);
+
+// Mem Hook to consume thread mem tracker
+#define MEM_MALLOC_HOOK(size)
\
+ do {
\
+ if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context
!= nullptr) { \
+ doris::update_bthread_context();
\
+ doris::bthread_context->_thread_mem_tracker_mgr->consume(size);
\
+ } else if (LIKELY(doris::thread_context_ptr._init)) {
\
+
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(size);
\
+ } else {
\
+ doris::ThreadMemTrackerMgr::consume_no_attach(size);
\
+ }
\
+ } while (0)
+
+#define MEM_FREE_HOOK(size)
\
+ do {
\
+ if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context
!= nullptr) { \
+ doris::update_bthread_context();
\
+ doris::bthread_context->_thread_mem_tracker_mgr->consume(-size);
\
+ } else if (doris::thread_context_ptr._init) {
\
+
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(-size);
\
+ } else {
\
+ doris::ThreadMemTrackerMgr::consume_no_attach(-size);
\
+ }
\
+ } while (0)
+
} // namespace doris
diff --git a/bin/start_be.sh b/bin/start_be.sh
index f60795d756..69d36cfaa3 100755
--- a/bin/start_be.sh
+++ b/bin/start_be.sh
@@ -207,6 +207,8 @@ export UBSAN_OPTIONS=print_stacktrace=1
## set hdfs conf
export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml"
+export
MALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16"
+
if [[ "${RUN_DAEMON}" -eq 1 ]]; then
nohup ${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/doris_be" "$@"
>>"${LOG_DIR}/be.out" 2>&1 </dev/null &
else
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]