This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 487fd3c1d7 [bugfix](memtracker)fix exceed memory limit log (#11485)
487fd3c1d7 is described below
commit 487fd3c1d715477c3d3f321b6e8ce0addc30bc12
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Aug 4 10:22:20 2022 +0800
[bugfix](memtracker)fix exceed memory limit log (#11485)
---
be/src/exec/cross_join_node.cpp | 1 -
be/src/exec/except_node.cpp | 1 -
be/src/exec/hash_join_node.cpp | 2 -
be/src/exec/intersect_node.cpp | 1 -
be/src/exec/set_operation_node.cpp | 1 -
be/src/runtime/memory/mem_tracker_limiter.cpp | 56 ++++++++++--------------
be/src/runtime/memory/mem_tracker_limiter.h | 9 ++--
be/src/runtime/memory/mem_tracker_task_pool.cpp | 8 ++--
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 29 ++++++------
be/src/runtime/memory/thread_mem_tracker_mgr.h | 41 +++--------------
be/src/runtime/plan_fragment_executor.cpp | 2 +-
be/src/runtime/thread_context.cpp | 15 -------
be/src/runtime/thread_context.h | 40 +++++------------
be/src/vec/exec/join/vhash_join_node.cpp | 1 -
be/src/vec/exec/vaggregation_node.cpp | 2 -
be/src/vec/exec/vcross_join_node.cpp | 1 -
be/src/vec/exec/vset_operation_node.cpp | 1 -
17 files changed, 64 insertions(+), 147 deletions(-)
diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp
index b26f4f2cd4..0743fe04c4 100644
--- a/be/src/exec/cross_join_node.cpp
+++ b/be/src/exec/cross_join_node.cpp
@@ -52,7 +52,6 @@ Status CrossJoinNode::close(RuntimeState* state) {
Status CrossJoinNode::construct_build_side(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Cross join, while getting next from
child 1");
while (true) {
RowBatch* batch = _build_batch_pool->add(
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 58a9b67f2f..dd92859671 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -40,7 +40,6 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState*
state) {
Status ExceptNode::open(RuntimeState* state) {
RETURN_IF_ERROR(SetOperationNode::open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Except Node, while probing the hash
table.");
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 7c572ff95d..02f52d2124 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -186,7 +186,6 @@ Status HashJoinNode::construct_hash_table(RuntimeState*
state) {
// The hash join node needs to keep in memory all build tuples, including
the tuple
// row ptrs. The row ptrs are copied into the hash table's internal
structure so they
// don't need to be stored in the _build_pool.
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash
table.");
RowBatch build_batch(child(1)->row_desc(), state->batch_size(),
mem_tracker().get());
RETURN_IF_ERROR(child(1)->open(state));
@@ -304,7 +303,6 @@ Status HashJoinNode::get_next(RuntimeState* state,
RowBatch* out_batch, bool* eo
// In most cases, no additional memory overhead will be applied for at
this stage,
// but if the expression calculation in this node needs to apply for
additional memory,
// it may cause the memory to exceed the limit.
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
if (reached_limit()) {
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index 9f5eb3ece1..a79810734d 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -43,7 +43,6 @@ Status IntersectNode::init(const TPlanNode& tnode,
RuntimeState* state) {
// 2 probe with child(1), then filter the hash table and find the matched
item, use them to rebuild a hash table
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Intersect Node, while probing the hash
table.");
RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
diff --git a/be/src/exec/set_operation_node.cpp
b/be/src/exec/set_operation_node.cpp
index e0bfbb199c..7a9d3a334f 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -138,7 +138,6 @@ Status SetOperationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("SetOperation, while constructing the
hash table.");
RETURN_IF_CANCELLED(state);
// open result expr lists.
for (const std::vector<ExprContext*>& exprs : _child_expr_lists) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 722495aee7..f5c825155b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -143,7 +143,8 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
if (UNLIKELY(gc_memory(_limit - bytes))) {
return Status::MemoryLimitExceeded(
- fmt::format("label={}, limit={}, used={}, failed consume
size={}", label(), _limit, _consumption->current_value(), bytes));
+ fmt::format("label={}, limit={}, used={}, failed consume
size={}", label(), _limit,
+ _consumption->current_value(), bytes));
}
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << _consumption->current_value() << "
limit=" << _limit;
@@ -196,9 +197,9 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth, int64_t* logge
std::vector<NewMemTracker::Snapshot> snapshots;
NewMemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label);
for (const auto& snapshot : snapshots) {
- child_trackers_usage += NewMemTracker::log_usage(snapshot);
+ child_trackers_usage += "\n " + NewMemTracker::log_usage(snapshot);
}
- if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
+ if (!child_trackers_usage.empty()) detail += child_trackers_usage;
return detail;
}
@@ -216,47 +217,36 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth,
return join(usage_strings, "\n");
}
-Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size, Status failed_try_consume_rt, const TUniqueId&
fragment_instance_id) {
+Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
- std::string detail;
- if (!failed_try_consume_rt) {
- DCHECK(failed_try_consume_rt.is_mem_limit_exceeded());
- detail += "in {} mem tracker consume log=<{}>.";
- // failed_try_consume_rt.to_string() starts with `Memory exceed limit:
`
- detail = fmt::format(detail, msg, failed_try_consume_rt.to_string());
- } else {
- detail += "in {} mem tracker consume log=<label={}, limit={}, used={},
failed consume size={}>.";
- detail = fmt::format(detail, msg, _label,
_consumption->current_value(), _limit,
- PrettyPrinter::print(failed_consume_size,
TUnit::BYTES));
- }
- detail += " fragment={}, backend={} free memory left={}.";
- detail = fmt::format(
- detail, print_id(fragment_instance_id),
- BackendOptions::get_localhost(),
+ std::string detail = fmt::format(
+ "{}, failed mem consume:<consume_size={}, mem_limit={},
mem_used={}, tracker_label={}, "
+ "in backend={} free memory left={}. details mem usage see
be.INFO.",
+ msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES),
_limit,
+ _consumption->current_value(), _label,
BackendOptions::get_localhost(),
PrettyPrinter::print(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity(),
TUnit::BYTES));
- detail += " If this is a query, can change the limit by session variable
exec_mem_limit.";
Status status = Status::MemoryLimitExceeded(detail);
- detail += "\n" +
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
// only print the tracker log_usage in be log.
- if (ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() <
failed_consume_size) {
- // Dumping the process NewMemTracker is expensive. Limiting the
recursive depth to two
- // levels limits the level of detail to a one-line summary for each
query NewMemTracker.
- detail += "\n" +
ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
- } else {
- detail += "\n" + log_usage();
+ if (_print_log_usage) {
+ if
(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() <
failed_consume_size) {
+ // Dumping the process MemTracker is expensive. Limiting the
recursive depth to two
+ // levels limits the level of detail to a one-line summary for
each query MemTracker.
+ detail += "\n" +
ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
+ } else {
+ detail += "\n" + log_usage();
+ }
+ detail += "\n" +
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
+ LOG(WARNING) << detail;
+ _print_log_usage = false;
}
-
- LOG(WARNING) << detail;
return status;
}
Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const
std::string& msg,
- int64_t failed_alloc_size,
- Status failed_try_consume_rt) {
- Status rt = mem_limit_exceeded(msg, failed_alloc_size,
failed_try_consume_rt,
- state->fragment_instance_id());
+ int64_t failed_alloc_size) {
+ Status rt = mem_limit_exceeded(msg, failed_alloc_size);
state->log_error(rt.to_string());
return rt;
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 104a1c20ca..0c003d6f3a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -143,12 +143,9 @@ public:
// If 'failed_allocation_size' is greater than zero, logs the allocation
size. If
// 'failed_allocation_size' is zero, nothing about the allocation size is
logged.
// If 'state' is non-nullptr, logs the error to 'state'.
- Status mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size,
- Status failed_try_consume_rt,
- const TUniqueId& fragment_instance_id =
TUniqueId());
+ Status mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size);
Status mem_limit_exceeded(RuntimeState* state, const std::string& msg =
std::string(),
- int64_t failed_consume_size = -1,
- Status failed_try_consume_rt = Status::OK());
+ int64_t failed_consume_size = -1);
std::string debug_string() {
std::stringstream msg;
@@ -208,6 +205,8 @@ private:
// The number of child trackers that have been added.
std::atomic_size_t _had_child_count = 0;
+ bool _print_log_usage = true;
+
// Lock to protect gc_memory(). This prevents many GCs from occurring at
once.
std::mutex _gc_lock;
// Functions to call after the limit is reached to free memory.
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 3c775db5ec..b63b25df17 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -31,15 +31,13 @@ std::shared_ptr<MemTrackerLimiter>
MemTrackerTaskPool::register_task_mem_tracker
// Combine new tracker and emplace into one operation to avoid the use of
locks
// Name for task MemTrackers. '$0' is replaced with the task id.
std::shared_ptr<MemTrackerLimiter> tracker;
- bool new_emplace = _task_mem_trackers.lazy_emplace_l(
+ bool new_emplace = _task_mem_trackers.try_emplace_l(
task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) {
tracker = v; },
- [&](const auto& ctor) {
- tracker = std::make_shared<MemTrackerLimiter>(mem_limit,
label, parent);
- ctor(task_id, tracker);
- });
+ std::make_shared<MemTrackerLimiter>(mem_limit, label, parent));
if (new_emplace) {
LOG(INFO) << "Register query/load memory tracker, query/load id: " <<
task_id
<< " limit: " << PrettyPrinter::print(mem_limit,
TUnit::BYTES);
+ return get_task_mem_tracker(task_id);
}
return tracker;
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index ab2d1fc1c0..aea89acb3b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -25,14 +25,12 @@
namespace doris {
void ThreadMemTrackerMgr::attach_limiter_tracker(
- const std::string& cancel_msg, const std::string& task_id,
- const TUniqueId& fragment_instance_id,
+ const std::string& task_id, const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
flush_untracked_mem<false>();
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
- _exceed_cb.cancel_msg = cancel_msg;
_limiter_tracker = mem_tracker;
}
@@ -40,7 +38,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() {
flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
- _exceed_cb.cancel_msg = "";
_limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
}
@@ -52,18 +49,22 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const
std::string& cancel_details
}
}
-void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size, Status
failed_try_consume_rt) {
- if (_exceed_cb.cb_func != nullptr) {
- _exceed_cb.cb_func();
+void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) {
+ if (_cb_func != nullptr) {
+ _cb_func();
}
- if (is_attach_task()) {
- if (_exceed_cb.cancel_task) {
- auto st = _limiter_tracker->mem_limit_exceeded(
- fmt::format("query mem limit exceeded and cancel it, {}",
_exceed_cb.cancel_msg),
- failed_consume_size, failed_try_consume_rt,
_fragment_instance_id);
- exceeded_cancel_task(st.to_string());
- _exceed_cb.cancel_task = false; // Make sure it will only be
canceled once
+ if (is_attach_query()) {
+ std::string cancel_msg;
+ if (!_consumer_tracker_stack.empty()) {
+ cancel_msg = fmt::format(
+ "exec node:<name={}>, can change the limit by `set
exec_mem_limit=xxx`",
+ _consumer_tracker_stack[-1]->label());
+ } else {
+ cancel_msg = "exec node:unknown, can change the limit by `set
exec_mem_limit=xxx`";
}
+ auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg,
failed_consume_size);
+ exceeded_cancel_task(st.to_string());
+ _check_limit = false; // Make sure it will only be canceled once
}
}
} // namespace doris
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 13eed946ba..2f5bfd2d9c 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -32,22 +32,6 @@ extern bthread_key_t btls_key;
static const bthread_key_t EMPTY_BTLS_KEY = {0, 0};
using ExceedCallBack = void (*)();
-struct MemExceedCallBackInfo {
- std::string cancel_msg;
- bool cancel_task; // Whether to cancel the task when the current tracker
exceeds the limit.
- ExceedCallBack cb_func;
-
- MemExceedCallBackInfo() { init(); }
-
- MemExceedCallBackInfo(const std::string& cancel_msg, bool cancel_task,
ExceedCallBack cb_func)
- : cancel_msg(cancel_msg), cancel_task(cancel_task),
cb_func(cb_func) {}
-
- void init() {
- cancel_msg = "";
- cancel_task = true;
- cb_func = nullptr;
- }
-};
// TCMalloc new/delete Hook is counted in the memory_tracker of the current
thread.
//
@@ -61,7 +45,6 @@ public:
~ThreadMemTrackerMgr() {
flush_untracked_mem<false>();
- _exceed_cb.init();
DCHECK(_consumer_tracker_stack.empty());
}
@@ -75,8 +58,7 @@ public:
void init();
// After attach, the current thread TCMalloc Hook starts to
consume/release task mem_tracker
- void attach_limiter_tracker(const std::string& cancel_msg, const
std::string& task_id,
- const TUniqueId& fragment_instance_id,
+ void attach_limiter_tracker(const std::string& task_id, const TUniqueId&
fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>&
mem_tracker);
void detach_limiter_tracker();
@@ -86,16 +68,7 @@ public:
void push_consumer_tracker(NewMemTracker* mem_tracker);
void pop_consumer_tracker();
- MemExceedCallBackInfo update_exceed_call_back(const std::string&
cancel_msg, bool cancel_task,
- ExceedCallBack cb_func) {
- _temp_exceed_cb = _exceed_cb;
- _exceed_cb.cancel_msg = cancel_msg;
- _exceed_cb.cancel_task = cancel_task;
- _exceed_cb.cb_func = cb_func;
- return _temp_exceed_cb;
- }
-
- void update_exceed_call_back(const MemExceedCallBackInfo& exceed_cb) {
_exceed_cb = exceed_cb; }
+ void set_exceed_call_back(ExceedCallBack cb_func) { _cb_func = cb_func; }
// Note that, If call the memory allocation operation in TCMalloc
new/delete Hook,
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
@@ -114,7 +87,7 @@ public:
template <bool CheckLimit>
void flush_untracked_mem();
- bool is_attach_task() { return _task_id != ""; }
+ bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return
_limiter_tracker; }
@@ -138,7 +111,7 @@ private:
// If tryConsume fails due to task mem tracker exceeding the limit, the
task must be canceled
void exceeded_cancel_task(const std::string& cancel_details);
- void exceeded(int64_t failed_consume_size, Status failed_try_consume_rt);
+ void exceeded(int64_t failed_consume_size);
private:
// Cache untracked mem, only update to _untracked_mems when switching mem
tracker.
@@ -155,14 +128,12 @@ private:
bool _check_attach = true;
std::string _task_id;
TUniqueId _fragment_instance_id;
- MemExceedCallBackInfo _exceed_cb;
- MemExceedCallBackInfo _temp_exceed_cb;
+ ExceedCallBack _cb_func = nullptr;
};
inline void ThreadMemTrackerMgr::init() {
DCHECK(_consumer_tracker_stack.empty());
_task_id = "";
- _exceed_cb.init();
_limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker();
_check_limit = true;
}
@@ -219,7 +190,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// The memory has been allocated, so when TryConsume fails, need
to continue to complete
// the consume to ensure the accuracy of the statistics.
_limiter_tracker->consume(_untracked_mem);
- exceeded(_untracked_mem, st);
+ exceeded(_untracked_mem);
}
} else {
_limiter_tracker->consume(_untracked_mem);
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index fcce7fa43c..8f8cdd62b7 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -271,7 +271,7 @@ Status PlanFragmentExecutor::open() {
if (_cancel_reason == PPlanFragmentCancelReason::CALL_RPC_ERROR) {
status = Status::RuntimeError(_cancel_msg);
} else if (_cancel_reason ==
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED) {
- status = Status::MemoryAllocFailed(_cancel_msg);
+ status = Status::MemoryLimitExceeded(_cancel_msg);
}
}
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 6ae9c644a0..e62074ae62 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -72,21 +72,6 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
#endif // USE_MEM_TRACKER
}
-UpdateMemExceedCallBack::UpdateMemExceedCallBack(const std::string&
cancel_msg, bool cancel_task,
- ExceedCallBack cb_func) {
-#ifdef USE_MEM_TRACKER
- DCHECK(cancel_msg != std::string());
- _old_cb =
thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(
- cancel_msg, cancel_task, cb_func);
-#endif
-}
-
-UpdateMemExceedCallBack::~UpdateMemExceedCallBack() {
-#ifdef USE_MEM_TRACKER
-
thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(_old_cb);
-#endif // USE_MEM_TRACKER
-}
-
SwitchBthread::SwitchBthread() {
#ifdef USE_MEM_TRACKER
_bthread_context =
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 254b465b00..d657e3b063 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -33,9 +33,11 @@
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
auto VARNAME_LINENUM(add_mem_consumer) =
doris::AddThreadMemTrackerConsumer(mem_tracker)
-#define SCOPED_UPDATE_MEM_EXCEED_CALL_BACK(cancel_msg, ...) \
- auto VARNAME_LINENUM(update_exceed_cb) = \
- doris::UpdateMemExceedCallBack(cancel_msg, ##__VA_ARGS__)
+// Attach to task when thread starts
+#define SCOPED_ATTACH_TASK(arg1, ...) \
+ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
+
+#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) =
SwitchBthread()
namespace doris {
@@ -104,8 +106,8 @@ public:
BRPC = 5
// to be added ...
};
- inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY",
"LOAD", "COMPACTION",
- "STORAGE"};
+ inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY",
"LOAD",
+ "COMPACTION", "STORAGE",
"BRPC"};
public:
ThreadContext() {
@@ -139,8 +141,7 @@ public:
_type = type;
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
- _thread_mem_tracker_mgr->attach_limiter_tracker(TaskTypeStr[_type],
task_id,
- fragment_instance_id,
mem_tracker);
+ _thread_mem_tracker_mgr->attach_limiter_tracker(task_id,
fragment_instance_id, mem_tracker);
}
void detach_task() {
@@ -225,19 +226,6 @@ public:
~AddThreadMemTrackerConsumer();
};
-class UpdateMemExceedCallBack {
-public:
- explicit UpdateMemExceedCallBack(const std::string& cancel_msg, bool
cancel_task = true,
- ExceedCallBack cb_func = nullptr);
-
- ~UpdateMemExceedCallBack();
-
-private:
-#ifdef USE_MEM_TRACKER
- MemExceedCallBackInfo _old_cb;
-#endif
-};
-
class SwitchBthread {
public:
explicit SwitchBthread();
@@ -261,15 +249,8 @@ public:
}
};
-#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) =
SwitchBthread()
-
-// Attach to task when thread starts
-#define SCOPED_ATTACH_TASK(arg1, ...) \
- auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
-
#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \
auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit()
-
#define CONSUME_THREAD_MEM_TRACKER(size) \
doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
#define RELEASE_THREAD_MEM_TRACKER(size) \
@@ -278,5 +259,8 @@ public:
doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size,
tracker)
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size,
tracker)
-
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
+ return doris::thread_context() \
+ ->_thread_mem_tracker_mgr->limiter_mem_tracker() \
+ ->mem_limit_exceeded(state, msg, ##__VA_ARGS__);
} // namespace doris
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index e6aa53b1da..98ffa85654 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1077,7 +1077,6 @@ Status HashJoinNode::open(RuntimeState* state) {
Status HashJoinNode::_hash_table_build(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash
table.");
SCOPED_TIMER(_build_timer);
MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 5907d3c802..5acfc1ff6b 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -351,7 +351,6 @@ Status AggregationNode::prepare(RuntimeState* state) {
Status AggregationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute open.");
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
@@ -386,7 +385,6 @@ Status AggregationNode::get_next(RuntimeState* state,
RowBatch* row_batch, bool*
Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos)
{
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute get_next.");
if (_is_streaming_preagg) {
bool child_eos = false;
diff --git a/be/src/vec/exec/vcross_join_node.cpp
b/be/src/vec/exec/vcross_join_node.cpp
index 4c79355326..a8564a6310 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -52,7 +52,6 @@ Status VCrossJoinNode::close(RuntimeState* state) {
Status VCrossJoinNode::construct_build_side(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Cross join, while getting next
from the child 1");
bool eos = false;
while (true) {
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 44c1638463..a20b88b767 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -227,7 +227,6 @@ void VSetOperationNode::hash_table_init() {
//build a hash table from child(0)
Status VSetOperationNode::hash_table_build(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Set Operation Node, while
constructing the hash table");
Block block;
MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]