This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 85362a907e [fix](mem tracker) Fix some memory leaks, inaccurate
statistics, core dump, deadlock bugs (#10072)
85362a907e is described below
commit 85362a907e2c55f0a856daaaa34dd9e0d461d736
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Jun 14 21:38:37 2022 +0800
[fix](mem tracker) Fix some memory leaks, inaccurate statistics, core dump,
deadlock bugs (#10072)
1. Fix the memory leak. When the load task is canceled, the `IndexChannel`
and `NodeChannel` mem trackers cannot be destructed in time.
2. Fix Load task being frequently canceled by oom and inaccurate
`LoadChannel` mem tracker limit, and rewrite the variable name of `mem limit`
in `LoadChannel`.
3. Fix core dump, when logout task mem tracker, phmap erase fails,
resulting in repeated logout of the same tracker.
4. Fix the deadlock, when add_child_tracker mem limit exceeds, calling
log_usage causes `_child_trackers_lock` deadlock.
5. Fix frequent log printing when thread mem tracker limit exceeds, which
will affect readability and performance.
6. Optimize some details of mem tracker display.
---
be/src/exec/tablet_sink.cpp | 1 -
be/src/exec/tablet_sink.h | 1 -
be/src/gutil/strings/numbers.cc | 2 +-
be/src/olap/task/engine_alter_tablet_task.cpp | 2 +-
be/src/olap/task/engine_batch_load_task.cpp | 2 +-
be/src/olap/task/engine_checksum_task.cpp | 2 +-
be/src/olap/task/engine_clone_task.cpp | 2 +-
.../olap/task/engine_storage_migration_task_v2.cpp | 2 +-
be/src/runtime/load_channel.cpp | 9 +++--
be/src/runtime/load_channel.h | 5 ++-
be/src/runtime/load_channel_mgr.cpp | 44 +++++++++++-----------
be/src/runtime/load_channel_mgr.h | 7 ++--
be/src/runtime/mem_tracker.cpp | 6 ++-
be/src/runtime/mem_tracker.h | 3 +-
be/src/runtime/mem_tracker_task_pool.cpp | 30 ++++++++++++---
be/src/runtime/thread_context.cpp | 8 ++--
be/src/runtime/thread_context.h | 3 +-
be/src/runtime/thread_mem_tracker_mgr.cpp | 17 ++++-----
be/src/runtime/thread_mem_tracker_mgr.h | 17 ++++++---
19 files changed, 98 insertions(+), 65 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index e1d60f694c..c15f303200 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -558,7 +558,6 @@ Status NodeChannel::none_of(std::initializer_list<bool>
vars) {
}
void NodeChannel::clear_all_batches() {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
std::lock_guard<std::mutex> lg(_pending_batches_lock);
std::queue<AddBatchReq> empty;
std::swap(_pending_batches, empty);
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8b3186e0c1..d797a1b80c 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -330,7 +330,6 @@ public:
void for_each_node_channel(
const std::function<void(const std::shared_ptr<NodeChannel>&)>&
func) {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
for (auto& it : _node_channels) {
func(it.second);
}
diff --git a/be/src/gutil/strings/numbers.cc b/be/src/gutil/strings/numbers.cc
index 46be289fb8..24c993b86a 100644
--- a/be/src/gutil/strings/numbers.cc
+++ b/be/src/gutil/strings/numbers.cc
@@ -1488,7 +1488,7 @@ string AccurateItoaKMGT(int64 i) {
i = -i;
}
- string ret = std::to_string(i) + " = " + StringPrintf("%s", sign);
+ string ret = StringPrintf("%s", sign) + std::to_string(i) + " = " +
StringPrintf("%s", sign);
int64 val;
if ((val = (i >> 40)) > 1) {
ret += StringPrintf("%" PRId64
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp
b/be/src/olap/task/engine_alter_tablet_task.cpp
index 7686a632ae..24496822d3 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const
TAlterTabletReqV2& request)
: _alter_tablet_req(request) {
_mem_tracker = MemTracker::create_tracker(
config::memory_limitation_per_thread_for_schema_change_bytes,
- fmt::format("EngineAlterTabletTask:baseTabletId={}:newTabletId={}",
+ fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(_alter_tablet_req.base_tablet_id),
std::to_string(_alter_tablet_req.new_tablet_id)),
StorageEngine::instance()->schema_change_mem_tracker(),
MemTrackerLevel::TASK);
diff --git a/be/src/olap/task/engine_batch_load_task.cpp
b/be/src/olap/task/engine_batch_load_task.cpp
index 9c507d362d..7add215823 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -55,7 +55,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req,
std::vector<TTablet
_download_status = Status::OK();
_mem_tracker = MemTracker::create_tracker(
-1,
- fmt::format("EngineBatchLoadTask:pushType={}:tabletId={}",
_push_req.push_type,
+ fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}",
_push_req.push_type,
std::to_string(_push_req.tablet_id)),
StorageEngine::instance()->batch_load_mem_tracker(),
MemTrackerLevel::TASK);
}
diff --git a/be/src/olap/task/engine_checksum_task.cpp
b/be/src/olap/task/engine_checksum_task.cpp
index 30ef01bc7e..37efd52e1d 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -27,7 +27,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id,
TSchemaHash schema_h
TVersion version, uint32_t* checksum)
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version),
_checksum(checksum) {
_mem_tracker = MemTracker::create_tracker(
- -1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id),
+ -1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id),
StorageEngine::instance()->consistency_mem_tracker(),
MemTrackerLevel::TASK);
}
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 5fcdd86e1f..04a7df7199 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -58,7 +58,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req,
const TMasterInfo&
_signature(signature),
_master_info(master_info) {
_mem_tracker = MemTracker::create_tracker(
- -1, "EngineCloneTask:tabletId=" +
std::to_string(_clone_req.tablet_id),
+ -1, "EngineCloneTask#tabletId=" +
std::to_string(_clone_req.tablet_id),
StorageEngine::instance()->clone_mem_tracker(),
MemTrackerLevel::TASK);
}
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp
b/be/src/olap/task/engine_storage_migration_task_v2.cpp
index 5e865c9849..fe00536662 100644
--- a/be/src/olap/task/engine_storage_migration_task_v2.cpp
+++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp
@@ -28,7 +28,7 @@
EngineStorageMigrationTaskV2::EngineStorageMigrationTaskV2(const TStorageMigrati
: _storage_migration_req(request) {
_mem_tracker = MemTracker::create_tracker(
config::memory_limitation_per_thread_for_storage_migration_bytes,
- fmt::format("EngineStorageMigrationTaskV2: {}-{}",
+
fmt::format("EngineStorageMigrationTaskV2#baseTabletId{}:newTabletId{}",
std::to_string(_storage_migration_req.base_tablet_id),
std::to_string(_storage_migration_req.new_tablet_id)),
StorageEngine::instance()->storage_migration_mem_tracker(),
MemTrackerLevel::TASK);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 1c270a3473..42fbe9dac5 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,17 +25,18 @@
namespace doris {
-LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t
timeout_s,
- bool is_high_priority, const std::string& sender_ip,
bool is_vec)
+LoadChannel::LoadChannel(const UniqueId& load_id, int64_t load_mem_limit,
int64_t channel_mem_limit,
+ int64_t timeout_s, bool is_high_priority, const
std::string& sender_ip,
+ bool is_vec)
: _load_id(load_id),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
_is_vec(is_vec) {
_mem_tracker = MemTracker::create_tracker(
- mem_limit, "LoadChannel:tabletId=" + _load_id.to_string(),
+ channel_mem_limit, "LoadChannel#senderIp=" + sender_ip,
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->register_load_mem_tracker(
- _load_id.to_string(), mem_limit),
+ _load_id.to_string(), load_mem_limit),
MemTrackerLevel::TASK);
// _last_updated_time should be set before being inserted to
// _load_channels in load_channel_mgr, or it may be erased
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 644e546524..38cc2ac89f 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,8 +39,9 @@ class Cache;
// corresponding to a certain load job
class LoadChannel {
public:
- LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s,
- bool is_high_priority, const std::string& sender_ip, bool
is_vec);
+ LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t
channel_mem_limit,
+ int64_t timeout_s, bool is_high_priority, const std::string&
sender_ip,
+ bool is_vec);
~LoadChannel();
// open a new load channel if not exist
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 2259eb2403..aea5479aa6 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -42,22 +42,22 @@ static int64_t calc_process_max_load_memory(int64_t
process_mem_limit) {
return std::min<int64_t>(max_load_memory_bytes,
config::load_process_max_memory_limit_bytes);
}
-// Calculate the memory limit for a single load job.
-static int64_t calc_job_max_load_memory(int64_t mem_limit_in_req, int64_t
total_mem_limit) {
+// Calculate the memory limit for a single load channel.
+static int64_t calc_channel_max_load_memory(int64_t load_mem_limit, int64_t
total_mem_limit) {
// default mem limit is used to be compatible with old request.
// new request should be set load_mem_limit.
- constexpr int64_t default_load_mem_limit = 2 * 1024 * 1024 * 1024L; // 2GB
- int64_t load_mem_limit = default_load_mem_limit;
- if (mem_limit_in_req != -1) {
+ constexpr int64_t default_channel_mem_limit = 2 * 1024 * 1024 * 1024L; //
2GB
+ int64_t channel_mem_limit = default_channel_mem_limit;
+ if (load_mem_limit != -1) {
// mem-limit of a certain load should between config::write_buffer_size
// and total-memory-limit
- load_mem_limit = std::max<int64_t>(mem_limit_in_req,
config::write_buffer_size);
- load_mem_limit = std::min<int64_t>(load_mem_limit, total_mem_limit);
+ channel_mem_limit = std::max<int64_t>(load_mem_limit,
config::write_buffer_size);
+ channel_mem_limit = std::min<int64_t>(channel_mem_limit,
total_mem_limit);
}
- return load_mem_limit;
+ return channel_mem_limit;
}
-static int64_t calc_job_timeout_s(int64_t timeout_in_req_s) {
+static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) {
int64_t load_channel_timeout_s =
config::streaming_load_rpc_max_alive_time_sec;
if (timeout_in_req_s > 0) {
load_channel_timeout_s = std::max<int64_t>(load_channel_timeout_s,
timeout_in_req_s);
@@ -83,8 +83,8 @@ LoadChannelMgr::~LoadChannelMgr() {
}
Status LoadChannelMgr::init(int64_t process_mem_limit) {
- int64_t load_mem_limit = calc_process_max_load_memory(process_mem_limit);
- _mem_tracker = MemTracker::create_tracker(load_mem_limit, "LoadChannelMgr",
+ int64_t load_mgr_mem_limit =
calc_process_max_load_memory(process_mem_limit);
+ _mem_tracker = MemTracker::create_tracker(load_mgr_mem_limit,
"LoadChannelMgr",
MemTracker::get_process_tracker(),
MemTrackerLevel::OVERVIEW);
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
@@ -95,10 +95,12 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
return Status::OK();
}
-LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id,
int64_t mem_limit,
- int64_t timeout_s, bool
is_high_priority,
+LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id,
int64_t load_mem_limit,
+ int64_t channel_mem_limit,
int64_t timeout_s,
+ bool is_high_priority,
const std::string&
sender_ip, bool is_vec) {
- return new LoadChannel(load_id, mem_limit, timeout_s, is_high_priority,
sender_ip, is_vec);
+ return new LoadChannel(load_id, load_mem_limit, channel_mem_limit,
timeout_s, is_high_priority,
+ sender_ip, is_vec);
}
Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
@@ -112,18 +114,18 @@ Status LoadChannelMgr::open(const
PTabletWriterOpenRequest& params) {
channel = it->second;
} else {
// create a new load channel
- int64_t mem_limit_in_req = params.has_load_mem_limit() ?
params.load_mem_limit() : -1;
- int64_t job_max_memory =
- calc_job_max_load_memory(mem_limit_in_req,
_mem_tracker->limit());
+ int64_t load_mem_limit = params.has_load_mem_limit() ?
params.load_mem_limit() : -1;
+ int64_t channel_mem_limit =
+ calc_channel_max_load_memory(load_mem_limit,
_mem_tracker->limit());
int64_t timeout_in_req_s =
params.has_load_channel_timeout_s() ?
params.load_channel_timeout_s() : -1;
- int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s);
+ int64_t channel_timeout_s =
calc_channel_timeout_s(timeout_in_req_s);
bool is_high_priority = (params.has_is_high_priority() &&
params.is_high_priority());
- channel.reset(_create_load_channel(load_id, job_max_memory,
job_timeout_s,
- is_high_priority,
params.sender_ip(),
- params.is_vectorized()));
+ channel.reset(_create_load_channel(load_id, load_mem_limit,
channel_mem_limit,
+ channel_timeout_s,
is_high_priority,
+ params.sender_ip(),
params.is_vectorized()));
_load_channels.insert({load_id, channel});
}
}
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 7e1c4450f0..65d72534f4 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -61,9 +61,10 @@ public:
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
private:
- static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t
mem_limit,
- int64_t timeout_s, bool
is_high_priority,
- const std::string& sender_ip,
bool is_vec);
+ static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t
load_mem_limit,
+ int64_t channel_mem_limit,
int64_t timeout_s,
+ bool is_high_priority, const
std::string& sender_ip,
+ bool is_vec);
template <typename Request>
Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool&
is_eof,
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index 359b0f5368..824c8b8530 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -124,14 +124,17 @@ std::shared_ptr<MemTracker>
MemTracker::create_tracker_impl(
std::string reset_label;
MemTracker* task_parent_tracker = reset_parent->parent_task_mem_tracker();
if (task_parent_tracker) {
- reset_label = fmt::format("{}:{}", label,
split(task_parent_tracker->label(), ":")[1]);
+ reset_label = fmt::format("{}#{}", label,
split(task_parent_tracker->label(), "#")[1]);
} else {
reset_label = label;
}
+ if (byte_limit == -1) byte_limit = reset_parent->limit();
std::shared_ptr<MemTracker> tracker(
new MemTracker(byte_limit, reset_label, reset_parent,
level > reset_parent->_level ? level :
reset_parent->_level, profile));
+ // Do not check limit exceed when add_child_tracker, otherwise it will
cause deadlock when log_usage is called.
+ STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
reset_parent->add_child_tracker(tracker);
return tracker;
}
@@ -285,6 +288,7 @@ std::string MemTracker::log_usage(int max_recursive_depth,
Status MemTracker::mem_limit_exceeded(RuntimeState* state, const std::string&
details,
int64_t failed_allocation_size, Status
failed_alloc) {
+ STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
MemTracker* process_tracker = MemTracker::get_raw_process_tracker();
std::string detail =
"Memory exceed limit. fragment={}, details={}, on backend={}.
Memory left in process "
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index c21d6d3db5..85a6550f7a 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -401,8 +401,7 @@ public:
/// 'failed_allocation_size' is zero, nothing about the allocation size is
logged.
/// If 'state' is non-nullptr, logs the error to 'state'.
Status mem_limit_exceeded(RuntimeState* state, const std::string& details
= std::string(),
- int64_t failed_allocation = -1,
- Status failed_alloc = Status::OK())
WARN_UNUSED_RESULT;
+ int64_t failed_allocation = -1, Status
failed_alloc = Status::OK());
// Usually, a negative values means that the statistics are not accurate,
// 1. The released memory is not consumed.
diff --git a/be/src/runtime/mem_tracker_task_pool.cpp
b/be/src/runtime/mem_tracker_task_pool.cpp
index 551d904111..84f1a951e0 100644
--- a/be/src/runtime/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/mem_tracker_task_pool.cpp
@@ -41,16 +41,17 @@ std::shared_ptr<MemTracker>
MemTrackerTaskPool::register_query_mem_tracker(
VLOG_FILE << "Register Query memory tracker, query id: " << query_id
<< " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
return register_task_mem_tracker_impl(query_id, mem_limit,
- fmt::format("Query:queryId={}",
query_id),
+ fmt::format("Query#queryId={}",
query_id),
ExecEnv::GetInstance()->query_pool_mem_tracker());
}
std::shared_ptr<MemTracker> MemTrackerTaskPool::register_load_mem_tracker(
const std::string& load_id, int64_t mem_limit) {
+ // In load, the query id of the fragment is executed, which is the same as
the load id of the load channel.
VLOG_FILE << "Register Load memory tracker, load id: " << load_id
<< " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
return register_task_mem_tracker_impl(load_id, mem_limit,
- fmt::format("Load:loadId={}",
load_id),
+ fmt::format("Load#loadId={}",
load_id),
ExecEnv::GetInstance()->load_pool_mem_tracker());
}
@@ -66,8 +67,13 @@ std::shared_ptr<MemTracker>
MemTrackerTaskPool::get_task_mem_tracker(const std::
void MemTrackerTaskPool::logout_task_mem_tracker() {
std::vector<std::string> expired_tasks;
for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end();
it++) {
- // No RuntimeState uses this task MemTracker, it is only referenced by
this map, delete it
- if (it->second.use_count() == 1) {
+ if (!it->second) {
+ // when parallel querying, after phmap _task_mem_trackers.erase,
+ // there have been cases where the key still exists in
_task_mem_trackers.
+ // https://github.com/apache/incubator-doris/issues/10006
+ expired_tasks.emplace_back(it->first);
+ } else if (it->second.use_count() == 1) {
+ // No RuntimeState uses this task MemTracker, it is only
referenced by this map, delete it
if (config::memory_leak_detection && it->second->consumption() !=
0) {
// If consumption is not equal to 0 before query mem tracker
is destructed,
// there are two possibilities in theory.
@@ -86,6 +92,14 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
it->second->parent()->consume_local(-it->second->consumption(),
MemTracker::get_process_tracker().get());
expired_tasks.emplace_back(it->first);
+ } else {
+ // Log limit exceeded query tracker.
+ if (it->second->limit_exceeded()) {
+ it->second->mem_limit_exceeded(
+ nullptr,
+ fmt::format("Task mem limit exceeded but no cancel,
queryId:{}", it->first),
+ 0, Status::OK());
+ }
}
}
for (auto tid : expired_tasks) {
@@ -93,9 +107,13 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
// there are still task mem trackers that are get or register.
// The only known case: after an load task ends all fragments on a
BE,`tablet_writer_open` is still
// called to create a channel, and the load task tracker will be
re-registered in the channel open.
- if (_task_mem_trackers[tid].use_count() == 1) {
+ // https://github.com/apache/incubator-doris/issues/9905
+ if (!_task_mem_trackers[tid]) {
+ _task_mem_trackers.erase(tid);
+ VLOG_FILE << "Deregister null task mem tracker, task id: " << tid;
+ } else if (_task_mem_trackers[tid].use_count() == 1) {
_task_mem_trackers.erase(tid);
- VLOG_FILE << "Deregister task memory tracker, task id: " << tid;
+ VLOG_FILE << "Deregister not used task mem tracker, task id: " <<
tid;
}
}
}
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index b08642bf12..2139469393 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -136,12 +136,14 @@
SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() {
#endif // USE_MEM_TRACKER
}
-SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(
- const std::string& action_type, bool cancel_work, ERRCALLBACK
err_call_back_func) {
+SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(const
std::string& action_type,
+ bool
cancel_work,
+
ERRCALLBACK err_call_back_func,
+ bool
log_limit_exceeded) {
#ifdef USE_MEM_TRACKER
DCHECK(action_type != std::string());
_old_tracker_cb =
tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(
- action_type, cancel_work, err_call_back_func);
+ action_type, cancel_work, err_call_back_func, log_limit_exceeded);
#endif
}
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 0572c2b08d..30952692d6 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -300,7 +300,8 @@ class SwitchThreadMemTrackerErrCallBack {
public:
explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type,
bool cancel_work = true,
- ERRCALLBACK err_call_back_func
= nullptr);
+ ERRCALLBACK err_call_back_func
= nullptr,
+ bool log_limit_exceeded = true);
~SwitchThreadMemTrackerErrCallBack();
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp
b/be/src/runtime/thread_mem_tracker_mgr.cpp
index e9768bf86b..d3715b58df 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -60,25 +60,24 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const
std::string& cancel_details
ExecEnv::GetInstance()->fragment_mgr()->cancel(
_fragment_instance_id,
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_details);
- _fragment_instance_id = TUniqueId(); // Make sure it will only be
canceled once
}
}
void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) {
- auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
- nullptr, fmt::format("In TCMalloc Hook, {}",
_consume_err_cb.cancel_msg), mem_usage,
- st);
if (_consume_err_cb.cb_func != nullptr) {
_consume_err_cb.cb_func();
}
if (is_attach_task()) {
- if (_consume_err_cb.cancel_task == true) {
+ if (_consume_err_cb.cancel_task) {
+ auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
+ nullptr,
+ fmt::format("Task mem limit exceeded and cancel it,
msg:{}",
+ _consume_err_cb.cancel_msg),
+ mem_usage, st);
exceeded_cancel_task(rst.to_string());
- } else {
- // TODO(zxy) Need other processing, or log (not too often).
+ _consume_err_cb.cancel_task = false; // Make sure it will only be
canceled once
+ _consume_err_cb.log_limit_exceeded = false;
}
- } else {
- // TODO(zxy) Need other processing, or log (not too often).
}
}
} // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h
b/be/src/runtime/thread_mem_tracker_mgr.h
index 754e231747..b476715612 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -28,18 +28,24 @@ typedef void (*ERRCALLBACK)();
struct ConsumeErrCallBackInfo {
std::string cancel_msg;
- bool cancel_task; // Whether to cancel the task when the current tracker
exceeds the limit
+ bool cancel_task; // Whether to cancel the task when the current tracker
exceeds the limit.
ERRCALLBACK cb_func;
+ bool log_limit_exceeded; // Whether to print log_usage of mem tracker when
mem limit exceeded.
ConsumeErrCallBackInfo() { init(); }
- ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task,
ERRCALLBACK cb_func)
- : cancel_msg(cancel_msg), cancel_task(cancel_task),
cb_func(cb_func) {}
+ ConsumeErrCallBackInfo(const std::string& cancel_msg, bool cancel_task,
ERRCALLBACK cb_func,
+ bool log_limit_exceeded)
+ : cancel_msg(cancel_msg),
+ cancel_task(cancel_task),
+ cb_func(cb_func),
+ log_limit_exceeded(log_limit_exceeded) {}
void init() {
cancel_msg = "";
- cancel_task = false;
+ cancel_task = true;
cb_func = nullptr;
+ log_limit_exceeded = true;
}
};
@@ -94,11 +100,12 @@ public:
void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
ConsumeErrCallBackInfo update_consume_err_cb(const std::string&
cancel_msg, bool cancel_task,
- ERRCALLBACK cb_func) {
+ ERRCALLBACK cb_func, bool
log_limit_exceeded) {
_temp_consume_err_cb = _consume_err_cb;
_consume_err_cb.cancel_msg = cancel_msg;
_consume_err_cb.cancel_task = cancel_task;
_consume_err_cb.cb_func = cb_func;
+ _consume_err_cb.log_limit_exceeded = log_limit_exceeded;
return _temp_consume_err_cb;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]