This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 059a0219e9 [fix](insert) fix memory leak for insert transaction
(#17512)
059a0219e9 is described below
commit 059a0219e988f8c7d1471320fd06910c7ee5e36f
Author: Xin Liao <[email protected]>
AuthorDate: Tue Mar 7 18:55:12 2023 +0800
[fix](insert) fix memory leak for insert transaction (#17512)
void CsvReader::_split_line(const Slice& line) {
_split_values.clear();
if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
PDataRow* row = *ptr;
for (const PDataColumn& col : (row)->col()) {
int len = col.value().size();
uint8_t* buf = new uint8_t[len];
memcpy(buf, col.value().c_str(), len);
_split_values.emplace_back(buf, len);
}
delete row;
delete[] ptr;
}
...
}
PInternalServiceImpl::send_data(google::protobuf::RpcController* controller,
const PSendDataRequest* request,
PSendDataResult* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
TUniqueId fragment_instance_id;
fragment_instance_id.hi = request->fragment_instance_id().hi();
fragment_instance_id.lo = request->fragment_instance_id().lo();
auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
if (pipe == nullptr) {
response->mutable_status()->set_status_code(1);
response->mutable_status()->add_error_msgs("pipe is null");
} else {
for (int i = 0; i < request->data_size(); ++i) {
PDataRow* row = new PDataRow();
row->CopyFrom(request->data(i));
pipe->append_and_flush(reinterpret_cast<char*>(&row),
sizeof(row),
sizeof(row) + row->ByteSizeLong());
}
response->mutable_status()->set_status_code(0);
}
}
There are two problems when using begin, insert into, and commit operations.
The memory of buf(uint8_t* buf = new uint8_t[len]) in _split_line function
didn't be released when clear _split_values.
The memory of PDataRow may leak when the load fails. The memory of
row(PDataRow* row = new PDataRow()) in the send_data function can't be released
when some error occurs.
---
be/src/exec/plain_binary_line_reader.cpp | 7 ++++---
be/src/exec/plain_binary_line_reader.h | 4 ++++
be/src/runtime/stream_load/stream_load_pipe.h | 18 ++++++++++++++++--
be/src/service/internal_service.cpp | 10 +++++++---
be/src/vec/exec/format/csv/csv_reader.cpp | 13 ++++---------
5 files changed, 35 insertions(+), 17 deletions(-)
diff --git a/be/src/exec/plain_binary_line_reader.cpp
b/be/src/exec/plain_binary_line_reader.cpp
index f63671c622..138ab79f99 100644
--- a/be/src/exec/plain_binary_line_reader.cpp
+++ b/be/src/exec/plain_binary_line_reader.cpp
@@ -31,13 +31,14 @@ PlainBinaryLineReader::~PlainBinaryLineReader() {
void PlainBinaryLineReader::close() {}
Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size,
bool* eof) {
- std::unique_ptr<uint8_t[]> file_buf;
int64_t read_size = 0;
- RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size));
- *ptr = file_buf.release();
+ RETURN_IF_ERROR(_file_reader->read_one_message(&_file_buf, &read_size));
+ *ptr = _file_buf.get();
*size = read_size;
if (read_size == 0) {
*eof = true;
+ } else {
+ _cur_row.reset(*reinterpret_cast<PDataRow**>(_file_buf.get()));
}
return Status::OK();
}
diff --git a/be/src/exec/plain_binary_line_reader.h
b/be/src/exec/plain_binary_line_reader.h
index 9e1143b60c..b3dd9215c3 100644
--- a/be/src/exec/plain_binary_line_reader.h
+++ b/be/src/exec/plain_binary_line_reader.h
@@ -17,6 +17,8 @@
#pragma once
+#include <gen_cpp/internal_service.pb.h>
+
#include "exec/line_reader.h"
namespace doris {
@@ -35,6 +37,8 @@ public:
private:
FileReader* _file_reader;
+ std::unique_ptr<uint8_t[]> _file_buf;
+ std::unique_ptr<PDataRow> _cur_row;
};
} // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h
b/be/src/runtime/stream_load/stream_load_pipe.h
index fac16b81ff..abb0945f36 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -60,6 +60,16 @@ public:
return _append(buf, proto_byte_size);
}
+ Status append(std::unique_ptr<PDataRow>&& row) {
+ PDataRow* row_ptr = row.get();
+ {
+ std::unique_lock<std::mutex> l(_lock);
+ _data_row_ptrs.emplace_back(std::move(row));
+ }
+ return append_and_flush(reinterpret_cast<char*>(&row_ptr),
sizeof(row_ptr),
+ sizeof(PDataRow*) + row_ptr->ByteSizeLong());
+ }
+
Status append(const char* data, size_t size) override {
size_t pos = 0;
if (_write_buf != nullptr) {
@@ -219,8 +229,11 @@ private:
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
if (_use_proto) {
- PDataRow** ptr = reinterpret_cast<PDataRow**>(data->get());
- _proto_buffered_bytes -= (sizeof(PDataRow*) +
(*ptr)->GetCachedSize());
+ auto row_ptr = std::move(_data_row_ptrs.front());
+ _proto_buffered_bytes -= (sizeof(PDataRow*) +
row_ptr->GetCachedSize());
+ _data_row_ptrs.pop_front();
+ // PlainBinaryLineReader will hold the PDataRow
+ row_ptr.release();
}
_put_cond.notify_one();
return Status::OK();
@@ -271,6 +284,7 @@ private:
int64_t _total_length = -1;
bool _use_proto = false;
std::deque<ByteBufferPtr> _buf_queue;
+ std::deque<std::unique_ptr<PDataRow>> _data_row_ptrs;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 24b2a098e4..1f658da09c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -619,10 +619,14 @@ void
PInternalServiceImpl::send_data(google::protobuf::RpcController* controller
response->mutable_status()->add_error_msgs("pipe is null");
} else {
for (int i = 0; i < request->data_size(); ++i) {
- PDataRow* row = new PDataRow();
+ std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
- pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
- sizeof(row) + row->ByteSizeLong());
+ Status s = pipe->append(std::move(row));
+ if (!s.ok()) {
+ response->mutable_status()->set_status_code(1);
+ response->mutable_status()->add_error_msgs(s.to_string());
+ return;
+ }
}
response->mutable_status()->set_status_code(0);
}
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index ce537d93cf..29be96f4d1 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -402,16 +402,11 @@ Status CsvReader::_line_split_to_values(const Slice&
line, bool* success) {
void CsvReader::_split_line(const Slice& line) {
_split_values.clear();
if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
- PDataRow** ptr = reinterpret_cast<PDataRow**>(line.data);
- PDataRow* row = *ptr;
- for (const PDataColumn& col : (row)->col()) {
- int len = col.value().size();
- uint8_t* buf = new uint8_t[len];
- memcpy(buf, col.value().c_str(), len);
- _split_values.emplace_back(buf, len);
+ PDataRow** row_ptr = reinterpret_cast<PDataRow**>(line.data);
+ PDataRow* row = *row_ptr;
+ for (const PDataColumn& col : row->col()) {
+ _split_values.emplace_back(col.value());
}
- delete row;
- delete[] ptr;
} else {
const char* value = line.data;
size_t start = 0; // point to the start pos of next col value.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]