This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 19b34c09b1 [fix] (mem tracker) Fix runtime instance tracker null
pointer (#11272)
19b34c09b1 is described below
commit 19b34c09b1d9301daf5ed76eea89bd031eb12bbe
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Jul 28 14:58:13 2022 +0800
[fix] (mem tracker) Fix runtime instance tracker null pointer (#11272)
---
be/src/runtime/data_stream_mgr.cpp | 6 +++---
be/src/runtime/data_stream_recvr.cc | 8 +++++---
be/src/runtime/data_stream_recvr.h | 5 +++--
be/src/runtime/memory/mem_tracker_limiter.h | 1 +
be/src/runtime/memory/mem_tracker_task_pool.cpp | 2 +-
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 10 ++--------
be/src/runtime/memory/thread_mem_tracker_mgr.h | 4 ++--
be/src/runtime/runtime_state.cpp | 1 +
be/src/vec/runtime/vdata_stream_mgr.cpp | 4 ++--
be/src/vec/runtime/vdata_stream_recvr.cpp | 9 ++++++---
be/src/vec/runtime/vdata_stream_recvr.h | 6 +++---
11 files changed, 29 insertions(+), 27 deletions(-)
diff --git a/be/src/runtime/data_stream_mgr.cpp
b/be/src/runtime/data_stream_mgr.cpp
index b0d1dbd8f2..3e519f8987 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
DCHECK(profile != nullptr);
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
<< ", node=" << dest_node_id;
- shared_ptr<DataStreamRecvr> recvr(
- new DataStreamRecvr(this, row_desc, fragment_instance_id,
dest_node_id, num_senders,
- is_merging, buffer_size, profile,
sub_plan_query_statistics_recvr));
+ shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(
+ this, row_desc, state->query_mem_tracker(), fragment_instance_id,
dest_node_id,
+ num_senders, is_merging, buffer_size, profile,
sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
lock_guard<mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
diff --git a/be/src/runtime/data_stream_recvr.cc
b/be/src/runtime/data_stream_recvr.cc
index a89c39cea9..ffd20c58d7 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -447,8 +447,9 @@ void DataStreamRecvr::transfer_all_resources(RowBatch*
transfer_batch) {
DataStreamRecvr::DataStreamRecvr(
DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int
num_senders,
- bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
+ MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id, int num_senders, bool is_merging, int
total_buffer_limit,
+ RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
: _mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
@@ -459,7 +460,8 @@ DataStreamRecvr::DataStreamRecvr(
_num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
- _mem_tracker = std::make_unique<MemTracker>("DataStreamRecvr", nullptr,
_profile);
+ _mem_tracker = std::make_unique<MemTracker>(
+ "DataStreamRecvr:" + print_id(_fragment_instance_id),
query_mem_tracker, _profile);
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
diff --git a/be/src/runtime/data_stream_recvr.h
b/be/src/runtime/data_stream_recvr.h
index efb036b5dd..31af9f2e37 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -116,8 +116,9 @@ private:
class SenderQueue;
DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- const TUniqueId& fragment_instance_id, PlanNodeId
dest_node_id, int num_senders,
- bool is_merging, int total_buffer_limit, RuntimeProfile*
profile,
+ MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id, int num_senders, bool is_merging,
+ int total_buffer_limit, RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr>
sub_plan_query_statistics_recvr);
// If receive queue is full, done is enqueue pending, and return with
*done is nullptr
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 786ad945bf..3852cbe52d 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -111,6 +111,7 @@ public:
// Note that 'f' must be valid for the lifetime of this tracker limiter.
void add_gc_function(GcFunction f) { _gc_functions.push_back(f); }
+ // TODO Should be managed in a separate process_mem_mgr, not in MemTracker
// If consumption is higher than max_consumption, attempts to free memory
by calling
// any added GC functions. Returns true if max_consumption is still
exceeded. Takes gc_lock.
// Note: If the cache of segment/chunk is released due to insufficient
query memory at a certain moment,
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index a4831114cf..02b38acdb5 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -34,7 +34,7 @@ MemTrackerLimiter*
MemTrackerTaskPool::register_task_mem_tracker_impl(const std:
bool new_emplace = _task_mem_trackers.lazy_emplace_l(
task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
[&](const auto& ctor) {
- ctor(task_id, std::make_unique<MemTrackerLimiter>(mem_limit,
label, parent));
+ ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit,
label, parent));
});
if (new_emplace) {
LOG(INFO) << "Register query/load memory tracker, query/load id: " <<
task_id
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 53a47202b1..7841a7cb6a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -37,14 +37,8 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const
std::string& cancel_msg,
}
void ThreadMemTrackerMgr::detach_limiter_tracker() {
-#ifndef BE_TEST
- // Unexpectedly, the runtime state is destructed before the end of the
query sub-thread,
- // (_hash_table_build_thread has appeared) which is not a graceful exit.
- // consider replacing CHECK with a conditional statement and checking for
runtime state survival.
- CHECK(_task_id == "" ||
-
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(_task_id));
-#endif
- flush_untracked_mem<false>();
+ // Do not flush untracked mem, instance executor thread may exit after
instance fragment executor thread,
+ // `instance_mem_tracker` will be null pointer, which is not a graceful
exit.
_task_id = "";
_fragment_instance_id = TUniqueId();
_exceed_cb.cancel_msg = "";
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 8ccb6f70b1..1862c2830a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -210,8 +210,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// If you do not want this check, set_check_attach=true
// TODO(zxy) The current p0 test cannot guarantee that all threads are
checked,
// so disable it and try to open it when memory tracking is not on
time.
- DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
- _limiter_tracker->label() != "Process");
+ // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
+ // _limiter_tracker->label() != "Process");
#endif
Status st = _limiter_tracker->try_consume(_untracked_mem);
if (!st) {
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 02660730c3..f3284d05f6 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -265,6 +265,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId&
query_id) {
}
Status RuntimeState::init_instance_mem_tracker() {
+ _query_mem_tracker = nullptr;
_instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1,
"RuntimeState:instance");
return Status::OK();
}
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 511fbbe19d..bee1fcf10c 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr>
VDataStreamMgr::create_recvr(
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
<< ", node=" << dest_node_id;
std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
- this, row_desc, fragment_instance_id, dest_node_id, num_senders,
is_merging,
- buffer_size, profile, sub_plan_query_statistics_recvr));
+ this, row_desc, state->query_mem_tracker(), fragment_instance_id,
dest_node_id,
+ num_senders, is_merging, buffer_size, profile,
sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::lock_guard<std::mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 19393b4499..bce11043ac 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -249,8 +249,9 @@ void VDataStreamRecvr::SenderQueue::close() {
VDataStreamRecvr::VDataStreamRecvr(
VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int
num_senders,
- bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
+ MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id, int num_senders, bool is_merging, int
total_buffer_limit,
+ RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
: _mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
@@ -262,8 +263,10 @@ VDataStreamRecvr::VDataStreamRecvr(
_num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
+ // DataStreamRecvr may be destructed after the instance execution thread
ends, `instance_mem_tracker`
+ // will be a null pointer, and remove_child fails when _mem_tracker is
destructed.
_mem_tracker = std::make_unique<MemTracker>(
- "VDataStreamRecvr:" + print_id(_fragment_instance_id), nullptr,
_profile);
+ "VDataStreamRecvr:" + print_id(_fragment_instance_id),
query_mem_tracker, _profile);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// Create one queue per sender if is_merging is true.
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index bedd18bbce..87024a917a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -51,9 +51,9 @@ class VExprContext;
class VDataStreamRecvr {
public:
VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- const TUniqueId& fragment_instance_id, PlanNodeId
dest_node_id,
- int num_senders, bool is_merging, int total_buffer_limit,
- RuntimeProfile* profile,
+ MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id, int num_senders, bool is_merging,
+ int total_buffer_limit, RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr>
sub_plan_query_statistics_recvr);
~VDataStreamRecvr();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]