This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new a7c0d5f7b2 [fix](streamload&sink) release and allocate memory in the 
same tracker (#12929)
a7c0d5f7b2 is described below

commit a7c0d5f7b2b652285b3cfe86c3ed338dfaf1e27a
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Sep 24 00:03:09 2022 +0800

    [fix](streamload&sink) release and allocate memory in the same tracker 
(#12929)
    
    1. HttpServer threads allocate bytebuffer and put them into streamload pipe,
    but scanner thread release them with query tracker.
    
    2. We can assume brpc allocate memory in doris thread.
    
    Above problems leads to wrong result of memtracker.
---
 be/src/exec/tablet_sink.cpp                   | 17 +++++++++++++----
 be/src/exec/tablet_sink.h                     |  3 +++
 be/src/runtime/stream_load/stream_load_pipe.h |  8 +++++++-
 be/src/runtime/thread_context.h               |  4 ----
 be/src/vec/common/pod_array.h                 | 19 +++++++++++++++++--
 5 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 021540cecb..b33bb8b1f8 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -184,6 +184,7 @@ Status NodeChannel::open_wait() {
     // add batch closure
     _add_batch_closure = 
ReusableClosure<PTabletWriterAddBatchResult>::create();
     _add_batch_closure->addFailedHandler([this](bool is_last_rpc) {
+        SCOPED_ATTACH_TASK(_state);
         std::lock_guard<std::mutex> l(this->_closed_lock);
         if (this->_is_closed) {
             // if the node channel is closed, no need to call `mark_as_failed`,
@@ -206,6 +207,7 @@ Status NodeChannel::open_wait() {
 
     _add_batch_closure->addSuccessHandler([this](const 
PTabletWriterAddBatchResult& result,
                                                  bool is_last_rpc) {
+        SCOPED_ATTACH_TASK(_state);
         std::lock_guard<std::mutex> l(this->_closed_lock);
         if (this->_is_closed) {
             // if the node channel is closed, no need to call the following 
logic,
@@ -567,12 +569,19 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
                 brpc_url + 
"/PInternalServiceImpl/tablet_writer_add_batch_by_http";
         
_add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
         
_add_batch_closure->cntl.http_request().set_content_type("application/json");
-        _brpc_http_stub->tablet_writer_add_batch_by_http(
-                &_add_batch_closure->cntl, NULL, &_add_batch_closure->result, 
_add_batch_closure);
+        {
+            SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+            
_brpc_http_stub->tablet_writer_add_batch_by_http(&_add_batch_closure->cntl, 
NULL,
+                                                             
&_add_batch_closure->result,
+                                                             
_add_batch_closure);
+        }
     } else {
         _add_batch_closure->cntl.http_request().Clear();
-        _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
-                                       &_add_batch_closure->result, 
_add_batch_closure);
+        {
+            SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+            _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
+                                           &_add_batch_closure->result, 
_add_batch_closure);
+        }
     }
     _next_packet_seq++;
 }
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index ac1dfe2c80..48c73ababc 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -95,6 +95,8 @@ public:
     ~ReusableClosure() {
         // shouldn't delete when Run() is calling or going to be called, wait 
for current Run() done.
         join();
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+        cntl.Reset();
     }
 
     static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
@@ -121,6 +123,7 @@ public:
 
     // plz follow this order: reset() -> set_in_flight() -> send brpc batch
     void reset() {
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
         cntl.Reset();
         cid = cntl.call_id();
         watch.reset();
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h 
b/be/src/runtime/stream_load/stream_load_pipe.h
index c0e573b078..ed41a63b05 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -24,6 +24,7 @@
 #include "exec/file_reader.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "runtime/message_body_sink.h"
+#include "runtime/thread_context.h"
 #include "util/bit_util.h"
 #include "util/byte_buffer.h"
 
@@ -43,7 +44,11 @@ public:
               _min_chunk_size(min_chunk_size),
               _total_length(total_length),
               _use_proto(use_proto) {}
-    virtual ~StreamLoadPipe() {}
+
+    virtual ~StreamLoadPipe() {
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+        while (!_buf_queue.empty()) _buf_queue.pop_front();
+    }
 
     Status open() override { return Status::OK(); }
 
@@ -115,6 +120,7 @@ public:
     }
 
     Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* 
eof) override {
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
         *bytes_read = 0;
         while (*bytes_read < data_size) {
             std::unique_lock<std::mutex> l(_lock);
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c5a4566c73..421c59811b 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -134,10 +134,6 @@ public:
     void attach_task(const TaskType& type, const std::string& task_id,
                      const TUniqueId& fragment_instance_id,
                      const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
-        DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && 
_task_id == "")
-                << ",new tracker label: " << mem_tracker->label() << ",old 
tracker label: "
-                << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label();
-        DCHECK(type != TaskType::UNKNOWN);
         _type = type;
         _task_id = task_id;
         _fragment_instance_id = fragment_instance_id;
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 9c8ee3f7f5..74426c311c 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -160,13 +160,15 @@ protected:
                                        
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
 
         ptrdiff_t end_diff = c_end - c_start;
+        ptrdiff_t peak_diff = c_end_peak - c_start;
 
         c_start = reinterpret_cast<char*>(TAllocator::realloc(
                           c_start - pad_left, allocated_bytes(), bytes,
                           
std::forward<TAllocatorParams>(allocator_params)...)) +
                   pad_left;
 
-        c_end = c_end_peak = c_start + end_diff;
+        c_end = c_start + end_diff;
+        c_end_peak = c_start + peak_diff;
         c_end_of_storage = c_start + bytes - pad_right - pad_left;
     }
 
@@ -503,9 +505,11 @@ public:
         auto swap_stack_heap = [this](PODArray& arr1, PODArray& arr2) {
             size_t stack_size = arr1.size();
             size_t stack_allocated = arr1.allocated_bytes();
+            size_t stack_peak_used = arr1.c_end_peak - arr1.c_start;
 
             size_t heap_size = arr2.size();
             size_t heap_allocated = arr2.allocated_bytes();
+            size_t heap_peak_used = arr2.c_end_peak - arr2.c_start;
 
             /// Keep track of the stack content we have to copy.
             char* stack_c_start = arr1.c_start;
@@ -514,13 +518,19 @@ public:
             arr1.c_start = arr2.c_start;
             arr1.c_end_of_storage = arr1.c_start + heap_allocated - 
arr1.pad_right;
             arr1.c_end = arr1.c_start + this->byte_size(heap_size);
-            arr1.c_end_peak = arr2.c_end_peak;
+            arr1.c_end_peak = arr1.c_end;
+            THREAD_MEM_TRACKER_TRANSFER_FROM(arr1.c_end_peak - arr1.c_start - 
heap_peak_used,
+                                             
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
+
 
             /// Allocate stack space for arr2.
             arr2.alloc(stack_allocated);
             /// Copy the stack content.
             memcpy(arr2.c_start, stack_c_start, this->byte_size(stack_size));
             arr2.c_end = arr2.c_end_peak = arr2.c_start + 
this->byte_size(stack_size);
+            THREAD_MEM_TRACKER_TRANSFER_FROM(arr2.c_end_peak - arr2.c_start - 
stack_peak_used,
+                                             
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
+
         };
 
         auto do_move = [this](PODArray& src, PODArray& dest) {
@@ -529,6 +539,11 @@ public:
                 dest.alloc(src.allocated_bytes());
                 memcpy(dest.c_start, src.c_start, this->byte_size(src.size()));
                 dest.c_end = dest.c_end_peak = dest.c_start + (src.c_end - 
src.c_start);
+                THREAD_MEM_TRACKER_TRANSFER_FROM(dest.c_end_peak - 
dest.c_start,
+                                                 
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
+ 
+                THREAD_MEM_TRACKER_TRANSFER_FROM(src.c_end_of_storage - 
src.c_end_peak,
+                                                 
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
 
                 src.c_start = Base::null;
                 src.c_end = Base::null;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to