This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1fd701493 [fix](memtracker) Improve memory tracking accuracy for exec
nodes (#11947)
b1fd701493 is described below
commit b1fd701493919e89936662f84f151b3071cc4578
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Aug 22 08:56:05 2022 +0800
[fix](memtracker) Improve memory tracking accuracy for exec nodes (#11947)
---
be/src/exec/blocking_join_node.cpp | 1 +
be/src/exec/hash_join_node.cpp | 1 +
be/src/exec/olap_scan_node.cpp | 2 ++
be/src/exec/tablet_sink.cpp | 4 +++-
be/src/runtime/memory/mem_tracker_limiter.cpp | 8 ++++----
be/src/service/internal_service.cpp | 3 +++
be/src/vec/exec/join/vhash_join_node.cpp | 1 +
be/src/vec/exec/vblocking_join_node.cpp | 1 +
be/src/vec/exec/volap_scan_node.cpp | 2 ++
be/src/vec/sink/vtablet_sink.cpp | 1 +
10 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/be/src/exec/blocking_join_node.cpp
b/be/src/exec/blocking_join_node.cpp
index 10419a3499..5e5aee3714 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -93,6 +93,7 @@ Status BlockingJoinNode::close(RuntimeState* state) {
void BlockingJoinNode::build_side_thread(RuntimeState* state,
std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
status->set_value(construct_build_side(state));
}
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index cbba3c607c..f5a7fec132 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -179,6 +179,7 @@ Status HashJoinNode::close(RuntimeState* state) {
void HashJoinNode::build_side_thread(RuntimeState* state,
std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
status->set_value(construct_hash_table(state));
}
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 048a922bae..99caa8afb9 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1480,6 +1480,7 @@ Status
OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
void OlapScanNode::transfer_thread(RuntimeState* state) {
// scanner open pushdown to scanThread
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
Status status = Status::OK();
for (auto scanner : _olap_scanners) {
status = Expr::clone_if_not_exists(_conjunct_ctxs, state,
scanner->conjunct_ctxs());
@@ -1663,6 +1664,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
void OlapScanNode::scanner_thread(OlapScanner* scanner) {
SCOPED_ATTACH_TASK(_runtime_state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
Thread::set_self_name("olap_scanner");
if (UNLIKELY(_transfer_done)) {
_scanner_done = true;
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index cdb503551b..a1213d348d 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -468,8 +468,9 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState*
state,
}
void NodeChannel::try_send_batch(RuntimeState* state) {
- SCOPED_ATTACH_TASK(state);
SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
+ SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
AddBatchReq send_batch;
{
debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
@@ -1321,6 +1322,7 @@ Status OlapTableSink::_validate_data(RuntimeState* state,
RowBatch* batch, Bitma
void OlapTableSink::_send_batch_process(RuntimeState* state) {
SCOPED_TIMER(_non_blocking_send_timer);
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
do {
int running_channels_num = 0;
for (auto index_channel : _channels) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 50eb011028..84209888ef 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -226,11 +226,11 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth,
Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) {
DCHECK(_limit != -1);
std::string detail = fmt::format(
- "{}, backend={} free memory left={}. If is query, can change the
limit by `set "
- "exec_mem_limit=xxx`, details mem usage see be.INFO.",
+ "{}, backend={} memory used={}, free memory left={}. If is query,
can change the limit "
+ "by `set exec_mem_limit=xxx`, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(),
-
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(),
- TUnit::BYTES));
+ PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES),
+ PrettyPrinter::print(MemInfo::mem_limit() -
PerfCounters::get_vm_rss(), TUnit::BYTES));
Status status = Status::MemoryLimitExceeded(detail);
// only print the tracker log_usage in be log.
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 02a52ce0e5..87f89f48e0 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -263,6 +263,7 @@ void
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
+ SCOPED_SWITCH_BTHREAD_TLS();
auto st = _exec_env->load_channel_mgr()->add_batch(*request,
response);
if (!st.ok()) {
@@ -320,6 +321,8 @@ void
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
+ SCOPED_SWITCH_BTHREAD_TLS();
+
// TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request,
cntl);
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index d27c0aea06..228e0020e8 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1114,6 +1114,7 @@ Status HashJoinNode::open(RuntimeState* state) {
void HashJoinNode::_hash_table_build_thread(RuntimeState* state,
std::promise<Status>* status) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"HashJoinNode::_hash_table_build_thread");
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
status->set_value(_hash_table_build(state));
}
diff --git a/be/src/vec/exec/vblocking_join_node.cpp
b/be/src/vec/exec/vblocking_join_node.cpp
index f98005ca9c..c6d548de91 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -75,6 +75,7 @@ Status VBlockingJoinNode::close(RuntimeState* state) {
void VBlockingJoinNode::build_side_thread(RuntimeState* state,
std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
status->set_value(construct_build_side(state));
// Release the thread token as soon as possible (before the main thread
joins
// on it). This way, if we had a chain of 10 joins using 1 additional
thread,
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index ab2f9eda38..5542eefeb0 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -274,6 +274,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
// scanner open pushdown to scanThread
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VOlapScanNode::transfer_thread");
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
Status status = Status::OK();
if (_vconjunct_ctx_ptr) {
@@ -386,6 +387,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
SCOPED_ATTACH_TASK(_runtime_state);
+ SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
Thread::set_self_name("volap_scanner");
int64_t wait_time = scanner->update_wait_worker_timer();
// Do not use ScopedTimer. There is no guarantee that, the counter
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 7a0faee0d6..3cbc73348c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -235,6 +235,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState*
state,
void VNodeChannel::try_send_block(RuntimeState* state) {
SCOPED_ATTACH_TASK(state);
+ SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
AddBlockReq send_block;
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]