This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 059a0219e9 [fix](insert) fix memory leak for insert transaction 
(#17512)
059a0219e9 is described below

commit 059a0219e988f8c7d1471320fd06910c7ee5e36f
Author: Xin Liao <[email protected]>
AuthorDate: Tue Mar 7 18:55:12 2023 +0800

    [fix](insert) fix memory leak for insert transaction (#17512)
    
    void CsvReader::_split_line(const Slice& line) {
        _split_values.clear();
        if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
            PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
            PDataRow* row = *ptr;
            for (const PDataColumn& col : (row)->col()) {
                int len = col.value().size();
                uint8_t* buf = new uint8_t[len];
                memcpy(buf, col.value().c_str(), len);
                _split_values.emplace_back(buf, len);
            }
            delete row;
            delete[] ptr;
        }
     ...
    }
    PInternalServiceImpl::send_data(google::protobuf::RpcController* controller,
                                         const PSendDataRequest* request, 
PSendDataResult* response,
                                         google::protobuf::Closure* done) {
        brpc::ClosureGuard closure_guard(done);
        TUniqueId fragment_instance_id;
        fragment_instance_id.hi = request->fragment_instance_id().hi();
        fragment_instance_id.lo = request->fragment_instance_id().lo();
        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
        if (pipe == nullptr) {
            response->mutable_status()->set_status_code(1);
            response->mutable_status()->add_error_msgs("pipe is null");
        } else {
            for (int i = 0; i < request->data_size(); ++i) {
                PDataRow* row = new PDataRow();
                row->CopyFrom(request->data(i));
                pipe->append_and_flush(reinterpret_cast<char*>(&row), 
sizeof(row),
                                       sizeof(row) + row->ByteSizeLong());
            }
            response->mutable_status()->set_status_code(0);
        }
    }
    There are two problems when using begin, insert into, and commit operations.
    
    The memory of buf(uint8_t* buf = new uint8_t[len]) in _split_line function 
didn't be released when clear _split_values.
    The memory of PDataRow may leak when the load fails. The memory of 
row(PDataRow* row = new PDataRow()) in the send_data function can't be released 
when some error occurs.
---
 be/src/exec/plain_binary_line_reader.cpp      |  7 ++++---
 be/src/exec/plain_binary_line_reader.h        |  4 ++++
 be/src/runtime/stream_load/stream_load_pipe.h | 18 ++++++++++++++++--
 be/src/service/internal_service.cpp           | 10 +++++++---
 be/src/vec/exec/format/csv/csv_reader.cpp     | 13 ++++---------
 5 files changed, 35 insertions(+), 17 deletions(-)

diff --git a/be/src/exec/plain_binary_line_reader.cpp 
b/be/src/exec/plain_binary_line_reader.cpp
index f63671c622..138ab79f99 100644
--- a/be/src/exec/plain_binary_line_reader.cpp
+++ b/be/src/exec/plain_binary_line_reader.cpp
@@ -31,13 +31,14 @@ PlainBinaryLineReader::~PlainBinaryLineReader() {
 void PlainBinaryLineReader::close() {}
 
 Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, 
bool* eof) {
-    std::unique_ptr<uint8_t[]> file_buf;
     int64_t read_size = 0;
-    RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size));
-    *ptr = file_buf.release();
+    RETURN_IF_ERROR(_file_reader->read_one_message(&_file_buf, &read_size));
+    *ptr = _file_buf.get();
     *size = read_size;
     if (read_size == 0) {
         *eof = true;
+    } else {
+        _cur_row.reset(*reinterpret_cast<PDataRow**>(_file_buf.get()));
     }
     return Status::OK();
 }
diff --git a/be/src/exec/plain_binary_line_reader.h 
b/be/src/exec/plain_binary_line_reader.h
index 9e1143b60c..b3dd9215c3 100644
--- a/be/src/exec/plain_binary_line_reader.h
+++ b/be/src/exec/plain_binary_line_reader.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <gen_cpp/internal_service.pb.h>
+
 #include "exec/line_reader.h"
 
 namespace doris {
@@ -35,6 +37,8 @@ public:
 
 private:
     FileReader* _file_reader;
+    std::unique_ptr<uint8_t[]> _file_buf;
+    std::unique_ptr<PDataRow> _cur_row;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h 
b/be/src/runtime/stream_load/stream_load_pipe.h
index fac16b81ff..abb0945f36 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -60,6 +60,16 @@ public:
         return _append(buf, proto_byte_size);
     }
 
+    Status append(std::unique_ptr<PDataRow>&& row) {
+        PDataRow* row_ptr = row.get();
+        {
+            std::unique_lock<std::mutex> l(_lock);
+            _data_row_ptrs.emplace_back(std::move(row));
+        }
+        return append_and_flush(reinterpret_cast<char*>(&row_ptr), 
sizeof(row_ptr),
+                                sizeof(PDataRow*) + row_ptr->ByteSizeLong());
+    }
+
     Status append(const char* data, size_t size) override {
         size_t pos = 0;
         if (_write_buf != nullptr) {
@@ -219,8 +229,11 @@ private:
         _buf_queue.pop_front();
         _buffered_bytes -= buf->limit;
         if (_use_proto) {
-            PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
-            _proto_buffered_bytes -= (sizeof(PDataRow*) + 
(*ptr)->GetCachedSize());
+            auto row_ptr = std::move(_data_row_ptrs.front());
+            _proto_buffered_bytes -= (sizeof(PDataRow*) + 
row_ptr->GetCachedSize());
+            _data_row_ptrs.pop_front();
+            // PlainBinaryLineReader will hold the PDataRow
+            row_ptr.release();
         }
         _put_cond.notify_one();
         return Status::OK();
@@ -271,6 +284,7 @@ private:
     int64_t _total_length = -1;
     bool _use_proto = false;
     std::deque<ByteBufferPtr> _buf_queue;
+    std::deque<std::unique_ptr<PDataRow>> _data_row_ptrs;
     std::condition_variable _put_cond;
     std::condition_variable _get_cond;
 
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 24b2a098e4..1f658da09c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -619,10 +619,14 @@ void 
PInternalServiceImpl::send_data(google::protobuf::RpcController* controller
         response->mutable_status()->add_error_msgs("pipe is null");
     } else {
         for (int i = 0; i < request->data_size(); ++i) {
-            PDataRow* row = new PDataRow();
+            std::unique_ptr<PDataRow> row(new PDataRow());
             row->CopyFrom(request->data(i));
-            pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
-                                   sizeof(row) + row->ByteSizeLong());
+            Status s = pipe->append(std::move(row));
+            if (!s.ok()) {
+                response->mutable_status()->set_status_code(1);
+                response->mutable_status()->add_error_msgs(s.to_string());
+                return;
+            }
         }
         response->mutable_status()->set_status_code(0);
     }
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index ce537d93cf..29be96f4d1 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -402,16 +402,11 @@ Status CsvReader::_line_split_to_values(const Slice& 
line, bool* success) {
 void CsvReader::_split_line(const Slice& line) {
     _split_values.clear();
     if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
-        PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
-        PDataRow* row = *ptr;
-        for (const PDataColumn& col : (row)->col()) {
-            int len = col.value().size();
-            uint8_t* buf = new uint8_t[len];
-            memcpy(buf, col.value().c_str(), len);
-            _split_values.emplace_back(buf, len);
+        PDataRow** row_ptr = reinterpret_cast<PDataRow**>(line.data);
+        PDataRow* row = *row_ptr;
+        for (const PDataColumn& col : row->col()) {
+            _split_values.emplace_back(col.value());
         }
-        delete row;
-        delete[] ptr;
     } else {
         const char* value = line.data;
         size_t start = 0;     // point to the start pos of next col value.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to