This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e801e3b737 [fix](memory) Fix crash at `bthread_setspecific` in
`brpc::Socket::CheckHealth()` (#20450)
e801e3b737 is described below
commit e801e3b7377bdf1c30e2a62e433830444ab993a7
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Jun 8 19:48:19 2023 +0800
[fix](memory) Fix crash at `bthread_setspecific` in
`brpc::Socket::CheckHealth()` (#20450)
Only switch to bthread local when modifying the mem tracker in the thread
context. No longer switches to bthread local by default when bthread starts
mem tracker increases brpc IOBufBlockMemory memory
remove thread mem tracker metrics
---
be/src/common/daemon.cpp | 4 ++
be/src/runtime/exec_env.h | 2 +
be/src/runtime/exec_env_init.cpp | 2 +
be/src/runtime/load_channel_mgr.cpp | 3 +-
be/src/runtime/thread_context.cpp | 23 +++++-----
be/src/runtime/thread_context.h | 89 +++++++++++++++++++++++++------------
be/src/service/internal_service.cpp | 1 -
be/src/util/doris_metrics.cpp | 10 -----
be/src/util/doris_metrics.h | 6 ---
be/src/util/mem_info.cpp | 1 -
10 files changed, 82 insertions(+), 59 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index af5628f4b9..29eabf0470 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -22,6 +22,7 @@
#include <gflags/gflags.h>
#include <gperftools/malloc_extension.h> // IWYU pragma: keep
// IWYU pragma: no_include <bits/std_abs.h>
+#include <butil/iobuf.h>
#include <math.h>
#include <signal.h>
#include <stdint.h>
@@ -210,6 +211,9 @@ void Daemon::memory_maintenance_thread() {
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
}
#endif
+
+
ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
+ butil::IOBuf::block_memory());
LOG(INFO) << MemTrackerLimiter::
process_mem_log_str(); // print mem log when
memory state by 256M
}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index aab93e1d99..0bd463ffb3 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -118,6 +118,7 @@ public:
MemTrackerLimiter* orphan_mem_tracker_raw() { return
_orphan_mem_tracker_raw; }
MemTrackerLimiter* experimental_mem_tracker() { return
_experimental_mem_tracker.get(); }
MemTracker* page_no_cache_mem_tracker() { return
_page_no_cache_mem_tracker.get(); }
+ MemTracker* brpc_iobuf_block_memory_tracker() { return
_brpc_iobuf_block_memory_tracker.get(); }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
ThreadPool* download_cache_thread_pool() { return
_download_cache_thread_pool.get(); }
@@ -211,6 +212,7 @@ private:
std::shared_ptr<MemTrackerLimiter> _experimental_mem_tracker;
// page size not in cache, data page/index page/etc.
std::shared_ptr<MemTracker> _page_no_cache_mem_tracker;
+ std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
std::unique_ptr<ThreadPool> _send_batch_thread_pool;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 944dad9cf5..fb14db11f3 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -338,6 +338,8 @@ void ExecEnv::init_mem_tracker() {
MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet");
_page_no_cache_mem_tracker =
std::make_shared<MemTracker>("PageNoCache",
_orphan_mem_tracker_raw);
+ _brpc_iobuf_block_memory_tracker =
+ std::make_shared<MemTracker>("IOBufBlockMemory",
_orphan_mem_tracker_raw);
}
void ExecEnv::init_download_cache_buf() {
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 3fe4c7986d..f99d7413ad 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -433,8 +433,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
<< PrettyPrinter::print_bytes(process_soft_mem_limit)
<< ", total load mem consumption: "
<< PrettyPrinter::print_bytes(_mem_tracker->consumption())
- << ", vm_rss: " << PerfCounters::get_vm_rss_str()
- << ", tc/jemalloc allocator cache: " <<
MemInfo::allocator_cache_mem_str();
+ << ", vm_rss: " << PerfCounters::get_vm_rss_str();
}
LOG(INFO) << oss.str();
}
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 15efa37b88..c2dd13770c 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -19,7 +19,6 @@
#include "common/signal_handler.h"
#include "runtime/runtime_state.h"
-#include "util/doris_metrics.h" // IWYU pragma: keep
namespace doris {
class MemTracker;
@@ -33,10 +32,12 @@ ThreadContextPtr::ThreadContextPtr() {
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const TUniqueId& task_id, const TUniqueId&
fragment_instance_id) {
+ SwitchBthreadLocal::switch_to_bthread_local();
thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
}
AttachTask::AttachTask(RuntimeState* runtime_state) {
+ SwitchBthreadLocal::switch_to_bthread_local();
doris::signal::query_id_hi = runtime_state->query_id().hi;
doris::signal::query_id_lo = runtime_state->query_id().lo;
thread_context()->attach_task(runtime_state->query_id(),
runtime_state->fragment_instance_id(),
@@ -45,29 +46,31 @@ AttachTask::AttachTask(RuntimeState* runtime_state) {
AttachTask::~AttachTask() {
thread_context()->detach_task();
-#ifndef NDEBUG
- DorisMetrics::instance()->attach_task_thread_count->increment(1);
-#endif // NDEBUG
+ SwitchBthreadLocal::switch_back_pthread_local();
}
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker*
mem_tracker) {
- if (mem_tracker)
+ SwitchBthreadLocal::switch_to_bthread_local();
+ if (mem_tracker) {
_need_pop =
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
+ }
}
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
const std::shared_ptr<MemTracker>& mem_tracker)
: _mem_tracker(mem_tracker) {
- if (_mem_tracker)
+ SwitchBthreadLocal::switch_to_bthread_local();
+ if (_mem_tracker) {
_need_pop =
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
+ }
}
AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
-#ifndef NDEBUG
-
DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
-#endif // NDEBUG
- if (_need_pop)
thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
+ if (_need_pop) {
+ thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
+ }
+ SwitchBthreadLocal::switch_back_pthread_local();
}
} // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 80178a8b68..4071947171 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -37,7 +37,7 @@
#include "util/defer_op.h" // IWYU pragma: keep
// Used to observe the memory usage of the specified code segment
-#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
+#if defined(USE_MEM_TRACKER)
// Count a code segment memory (memory malloc - memory free) to int64_t
// Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx;
xxx; }
#define SCOPED_MEM_COUNT(scope_mem) \
@@ -56,7 +56,7 @@
#endif
// Used to observe query/load/compaction/e.g. execution thread memory usage
and respond when memory exceeds the limit.
-#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
+#if defined(USE_MEM_TRACKER)
// Attach to query/load/compaction/e.g. when thread starts.
// This will save some info about a working thread in the thread context.
// And count the memory during thread execution (is actually also the code
segment that executes the function)
@@ -192,51 +192,80 @@ public:
return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
}
+ int switch_bthread_local_count = 0;
+
private:
TUniqueId _task_id;
TUniqueId _fragment_instance_id;
};
-#if defined(UNDEFINED_BEHAVIOR_SANITIZER)
-static ThreadContext* thread_context() {
- return thread_context_ptr._ptr;
-}
-#else
+// Switch thread context from pthread local to bthread local context.
// Cache the pointer of bthread local in pthead local,
// Avoid calling bthread_getspecific frequently to get bthread local, which
has performance problems.
-static void pthread_attach_bthread() {
- bthread_id = bthread_self();
- bthread_context =
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
- if (bthread_context == nullptr) {
- // A new bthread starts, two scenarios:
- // 1. First call to bthread_getspecific (and before any
bthread_setspecific) returns NULL
- // 2. There are not enough reusable btls in btls pool.
- // else, two scenarios:
- // 1. A new bthread starts, but get a reuses btls.
- // 2. A pthread switch occurs. Because the pthread switch cannot be
accurately identified at the moment.
- // So tracker call reset 0 like reuses btls.
- bthread_context = new ThreadContext;
- // The brpc server should respond as quickly as possible.
- bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
- // set the data so that next time bthread_getspecific in the thread
returns the data.
- CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
+class SwitchBthreadLocal {
+public:
+ static void switch_to_bthread_local() {
+ if (bthread_self() != 0) {
+ // Very frequent bthread_getspecific will slow, but
switch_to_bthread_local is not expected to be much.
+ bthread_context =
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+ if (bthread_context == nullptr) {
+ // A new bthread starts, two scenarios:
+ // 1. First call to bthread_getspecific (and before any
bthread_setspecific) returns NULL
+ // 2. There are not enough reusable btls in btls pool.
+ // else, two scenarios:
+ // 1. A new bthread starts, but get a reuses btls.
+ // 2. A pthread switch occurs. Because the pthread switch
cannot be accurately identified at the moment.
+ // So tracker call reset 0 like reuses btls.
+ // during this period, stop the use of thread_context.
+ thread_context_ptr.init = false;
+ bthread_id = bthread_self();
+ bthread_context = new ThreadContext;
+ // The brpc server should respond as quickly as possible.
+ bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
+ // set the data so that next time bthread_getspecific in the
thread returns the data.
+ CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
+ thread_context_ptr.init = true;
+ }
+ bthread_context->switch_bthread_local_count++;
+ }
}
-}
+
+ // `switch_to_bthread_local` and `switch_back_pthread_local` should be
used in pairs,
+ // `switch_to_bthread_local` should only be called if
`switch_to_bthread_local` returns true
+ static void switch_back_pthread_local() {
+ if (bthread_self() != 0) {
+ if (bthread_self() != bthread_id) {
+ bthread_id = bthread_self();
+ bthread_context =
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+ DCHECK(bthread_context != nullptr);
+ }
+ bthread_context->switch_bthread_local_count--;
+ if (bthread_context->switch_bthread_local_count == 0) {
+ bthread_context = thread_context_ptr._ptr;
+ }
+ }
+ }
+};
static ThreadContext* thread_context() {
if (bthread_self() != 0) {
+ // in bthread
if (bthread_self() != bthread_id) {
- // A new bthread starts or pthread switch occurs, during this
period, stop the use of thread_context.
- thread_context_ptr.init = false;
- pthread_attach_bthread();
- thread_context_ptr.init = true;
+ // bthread switching pthread may be very frequent, remember not to
use lock or other time-consuming operations.
+ bthread_id = bthread_self();
+ bthread_context =
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+ // if nullptr, a new bthread task start or bthread switch pthread
but not call switch_to_bthread_local, use pthread local context
+ // else, bthread switch pthread and called
switch_to_bthread_local, use bthread local context.
+ if (bthread_context == nullptr) {
+ bthread_context = thread_context_ptr._ptr;
+ }
}
return bthread_context;
} else {
+ // in pthread
return thread_context_ptr._ptr;
}
}
-#endif
class ScopeMemCount {
public:
@@ -267,12 +296,14 @@ public:
class SwitchThreadMemTrackerLimiter {
public:
explicit SwitchThreadMemTrackerLimiter(const
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
+ SwitchBthreadLocal::switch_to_bthread_local();
_old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker,
TUniqueId());
}
~SwitchThreadMemTrackerLimiter() {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
+ SwitchBthreadLocal::switch_back_pthread_local();
}
private:
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index dc28571ce1..971302b810 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1625,7 +1625,6 @@ void
PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcContr
const
PGetTabletVersionsRequest* request,
PGetTabletVersionsResponse* response,
google::protobuf::Closure* done) {
- //SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
VLOG_DEBUG << "receive get tablet versions request: " <<
request->DebugString();
ExecEnv::GetInstance()->storage_engine()->get_tablet_rowset_versions(request,
response);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 97ca3de938..6b46c0cab6 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -135,11 +135,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes,
MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us,
MetricUnit::MICROSECONDS);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count,
MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(add_thread_mem_tracker_consumer_count,
MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_mem_tracker_exceed_call_back_count,
MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_bthread_count, MetricUnit::NOUNIT);
-
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES);
DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num,
MetricUnit::NOUNIT);
DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_fd_num_used,
MetricUnit::NOUNIT);
@@ -298,11 +293,6 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes);
- INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
attach_task_thread_count);
- INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
add_thread_mem_tracker_consumer_count);
- INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
thread_mem_tracker_exceed_call_back_count);
- INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_bthread_count);
-
INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, upload_total_byte);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, upload_rowset_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, upload_fail_count);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 86b52e32be..646a4449c0 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -121,12 +121,6 @@ public:
IntCounter* memtable_flush_total;
IntCounter* memtable_flush_duration_us;
- IntCounter* attach_task_thread_count;
- IntCounter* add_thread_mem_tracker_consumer_count;
- IntCounter* thread_mem_tracker_exceed_call_back_count;
- // brpc server response count
- IntCounter* switch_bthread_count;
-
IntGauge* memory_pool_bytes_total;
IntGauge* process_thread_num;
IntGauge* process_fd_num_used;
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 12038f7c16..8d81089323 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -77,7 +77,6 @@ int64_t MemInfo::_s_process_full_gc_size = -1;
void MemInfo::refresh_allocator_mem() {
#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) ||
defined(THREAD_SANITIZER)
- LOG(INFO) << "Memory tracking is not available with address sanitizer
builds.";
#elif defined(USE_JEMALLOC)
uint64_t epoch = 0;
size_t sz = sizeof(epoch);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]