This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new a7c0d5f7b2 [fix](streamload&sink) release and allocate memory in the
same tracker (#12929)
a7c0d5f7b2 is described below
commit a7c0d5f7b2b652285b3cfe86c3ed338dfaf1e27a
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Sep 24 00:03:09 2022 +0800
[fix](streamload&sink) release and allocate memory in the same tracker
(#12929)
1. HttpServer threads allocate bytebuffer and put them into streamload pipe,
but scanner thread release them with query tracker.
2. We can assume brpc allocate memory in doris thread.
Above problems leads to wrong result of memtracker.
---
be/src/exec/tablet_sink.cpp | 17 +++++++++++++----
be/src/exec/tablet_sink.h | 3 +++
be/src/runtime/stream_load/stream_load_pipe.h | 8 +++++++-
be/src/runtime/thread_context.h | 4 ----
be/src/vec/common/pod_array.h | 19 +++++++++++++++++--
5 files changed, 40 insertions(+), 11 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 021540cecb..b33bb8b1f8 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -184,6 +184,7 @@ Status NodeChannel::open_wait() {
// add batch closure
_add_batch_closure =
ReusableClosure<PTabletWriterAddBatchResult>::create();
_add_batch_closure->addFailedHandler([this](bool is_last_rpc) {
+ SCOPED_ATTACH_TASK(_state);
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call `mark_as_failed`,
@@ -206,6 +207,7 @@ Status NodeChannel::open_wait() {
_add_batch_closure->addSuccessHandler([this](const
PTabletWriterAddBatchResult& result,
bool is_last_rpc) {
+ SCOPED_ATTACH_TASK(_state);
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call the following
logic,
@@ -567,12 +569,19 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
brpc_url +
"/PInternalServiceImpl/tablet_writer_add_batch_by_http";
_add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
_add_batch_closure->cntl.http_request().set_content_type("application/json");
- _brpc_http_stub->tablet_writer_add_batch_by_http(
- &_add_batch_closure->cntl, NULL, &_add_batch_closure->result,
_add_batch_closure);
+ {
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+
_brpc_http_stub->tablet_writer_add_batch_by_http(&_add_batch_closure->cntl,
NULL,
+
&_add_batch_closure->result,
+
_add_batch_closure);
+ }
} else {
_add_batch_closure->cntl.http_request().Clear();
- _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
- &_add_batch_closure->result,
_add_batch_closure);
+ {
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+ _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
+ &_add_batch_closure->result,
_add_batch_closure);
+ }
}
_next_packet_seq++;
}
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index ac1dfe2c80..48c73ababc 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -95,6 +95,8 @@ public:
~ReusableClosure() {
// shouldn't delete when Run() is calling or going to be called, wait
for current Run() done.
join();
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+ cntl.Reset();
}
static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
@@ -121,6 +123,7 @@ public:
// plz follow this order: reset() -> set_in_flight() -> send brpc batch
void reset() {
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
cntl.Reset();
cid = cntl.call_id();
watch.reset();
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h
b/be/src/runtime/stream_load/stream_load_pipe.h
index c0e573b078..ed41a63b05 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -24,6 +24,7 @@
#include "exec/file_reader.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/message_body_sink.h"
+#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/byte_buffer.h"
@@ -43,7 +44,11 @@ public:
_min_chunk_size(min_chunk_size),
_total_length(total_length),
_use_proto(use_proto) {}
- virtual ~StreamLoadPipe() {}
+
+ virtual ~StreamLoadPipe() {
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+ while (!_buf_queue.empty()) _buf_queue.pop_front();
+ }
Status open() override { return Status::OK(); }
@@ -115,6 +120,7 @@ public:
}
Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool*
eof) override {
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
*bytes_read = 0;
while (*bytes_read < data_size) {
std::unique_lock<std::mutex> l(_lock);
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c5a4566c73..421c59811b 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -134,10 +134,6 @@ public:
void attach_task(const TaskType& type, const std::string& task_id,
const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
- DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) &&
_task_id == "")
- << ",new tracker label: " << mem_tracker->label() << ",old
tracker label: "
- << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label();
- DCHECK(type != TaskType::UNKNOWN);
_type = type;
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 9c8ee3f7f5..74426c311c 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -160,13 +160,15 @@ protected:
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
ptrdiff_t end_diff = c_end - c_start;
+ ptrdiff_t peak_diff = c_end_peak - c_start;
c_start = reinterpret_cast<char*>(TAllocator::realloc(
c_start - pad_left, allocated_bytes(), bytes,
std::forward<TAllocatorParams>(allocator_params)...)) +
pad_left;
- c_end = c_end_peak = c_start + end_diff;
+ c_end = c_start + end_diff;
+ c_end_peak = c_start + peak_diff;
c_end_of_storage = c_start + bytes - pad_right - pad_left;
}
@@ -503,9 +505,11 @@ public:
auto swap_stack_heap = [this](PODArray& arr1, PODArray& arr2) {
size_t stack_size = arr1.size();
size_t stack_allocated = arr1.allocated_bytes();
+ size_t stack_peak_used = arr1.c_end_peak - arr1.c_start;
size_t heap_size = arr2.size();
size_t heap_allocated = arr2.allocated_bytes();
+ size_t heap_peak_used = arr2.c_end_peak - arr2.c_start;
/// Keep track of the stack content we have to copy.
char* stack_c_start = arr1.c_start;
@@ -514,13 +518,19 @@ public:
arr1.c_start = arr2.c_start;
arr1.c_end_of_storage = arr1.c_start + heap_allocated -
arr1.pad_right;
arr1.c_end = arr1.c_start + this->byte_size(heap_size);
- arr1.c_end_peak = arr2.c_end_peak;
+ arr1.c_end_peak = arr1.c_end;
+ THREAD_MEM_TRACKER_TRANSFER_FROM(arr1.c_end_peak - arr1.c_start -
heap_peak_used,
+
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
+
/// Allocate stack space for arr2.
arr2.alloc(stack_allocated);
/// Copy the stack content.
memcpy(arr2.c_start, stack_c_start, this->byte_size(stack_size));
arr2.c_end = arr2.c_end_peak = arr2.c_start +
this->byte_size(stack_size);
+ THREAD_MEM_TRACKER_TRANSFER_FROM(arr2.c_end_peak - arr2.c_start -
stack_peak_used,
+
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
+
};
auto do_move = [this](PODArray& src, PODArray& dest) {
@@ -529,6 +539,11 @@ public:
dest.alloc(src.allocated_bytes());
memcpy(dest.c_start, src.c_start, this->byte_size(src.size()));
dest.c_end = dest.c_end_peak = dest.c_start + (src.c_end -
src.c_start);
+ THREAD_MEM_TRACKER_TRANSFER_FROM(dest.c_end_peak -
dest.c_start,
+
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
+
+ THREAD_MEM_TRACKER_TRANSFER_FROM(src.c_end_of_storage -
src.c_end_peak,
+
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
src.c_start = Base::null;
src.c_end = Base::null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]