This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 146060d [Bug]Fix result_writer may coredump (#6482)
146060d is described below
commit 146060dfc09bb299b2a214463716acbabd09f37e
Author: Zhengguo Yang <[email protected]>
AuthorDate: Sun Aug 22 22:04:00 2021 +0800
[Bug]Fix result_writer may coredump (#6482)
fix result_writer may coredump, let BufferControlBlock owns the memory
---
be/src/runtime/buffer_control_block.cpp | 80 +++++++++++++--------------------
be/src/runtime/buffer_control_block.h | 6 +--
be/src/runtime/file_result_writer.cpp | 2 +-
be/src/runtime/mysql_result_writer.cpp | 18 ++------
4 files changed, 40 insertions(+), 66 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 5041aa6..bcbe562 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -44,7 +44,8 @@ void GetResultBatchCtx::on_close(int64_t packet_seq,
QueryStatistics* statistics
delete this;
}
-void GetResultBatchCtx::on_data(TFetchDataResult* t_result, int64_t
packet_seq, bool eos) {
+void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>&
t_result,
+ int64_t packet_seq, bool eos) {
Status st = Status::OK();
if (t_result != nullptr) {
uint8_t* buf = nullptr;
@@ -83,18 +84,13 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id,
int buffer_size)
BufferControlBlock::~BufferControlBlock() {
cancel();
-
- for (ResultQueue::iterator iter = _batch_queue.begin(); _batch_queue.end()
!= iter; ++iter) {
- delete *iter;
- *iter = NULL;
- }
}
Status BufferControlBlock::init() {
return Status::OK();
}
-Status BufferControlBlock::add_batch(TFetchDataResult* result) {
+Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result) {
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
@@ -113,61 +109,53 @@ Status BufferControlBlock::add_batch(TFetchDataResult*
result) {
if (_waiting_rpc.empty()) {
_buffer_rows += num_rows;
- _batch_queue.push_back(result);
+ _batch_queue.push_back(std::move(result));
_data_arrival.notify_one();
} else {
auto ctx = _waiting_rpc.front();
_waiting_rpc.pop_front();
ctx->on_data(result, _packet_num);
- delete result;
_packet_num++;
}
return Status::OK();
}
Status BufferControlBlock::get_batch(TFetchDataResult* result) {
- TFetchDataResult* item = NULL;
- {
- std::unique_lock<std::mutex> l(_lock);
+ std::unique_lock<std::mutex> l(_lock);
- while (_batch_queue.empty() && !_is_close && !_is_cancelled) {
- _data_arrival.wait(l);
- }
+ while (_batch_queue.empty() && !_is_close && !_is_cancelled) {
+ _data_arrival.wait(l);
+ }
- // if Status has been set, return fail;
- RETURN_IF_ERROR(_status);
+ // if Status has been set, return fail;
+ RETURN_IF_ERROR(_status);
- // cancelled
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
+ // cancelled
+ if (_is_cancelled) {
+ return Status::Cancelled("Cancelled");
+ }
- if (_batch_queue.empty()) {
- if (_is_close) {
- // no result, normal end
- result->eos = true;
- result->__set_packet_num(_packet_num);
- _packet_num++;
- return Status::OK();
- } else {
- // can not get here
- return Status::InternalError("Internal error, can not Get
here!");
- }
+ if (_batch_queue.empty()) {
+ if (_is_close) {
+ // no result, normal end
+ result->eos = true;
+ result->__set_packet_num(_packet_num);
+ _packet_num++;
+ return Status::OK();
+ } else {
+ // can not get here
+ return Status::InternalError("Internal error, can not Get here!");
}
-
- // get result
- item = _batch_queue.front();
- _batch_queue.pop_front();
- _buffer_rows -= item->result_batch.rows.size();
- _data_removal.notify_one();
}
- *result = *item;
+
+ // get result
+ std::unique_ptr<TFetchDataResult> item = std::move(_batch_queue.front());
+ _batch_queue.pop_front();
+ _buffer_rows -= item->result_batch.rows.size();
+ _data_removal.notify_one();
+ *result = *(item.get());
result->__set_packet_num(_packet_num);
_packet_num++;
- // destruct item new from Result writer
- delete item;
- item = NULL;
-
return Status::OK();
}
@@ -183,17 +171,13 @@ void BufferControlBlock::get_batch(GetResultBatchCtx*
ctx) {
}
if (!_batch_queue.empty()) {
// get result
- TFetchDataResult* result = _batch_queue.front();
+ std::unique_ptr<TFetchDataResult> result =
std::move(_batch_queue.front());
_batch_queue.pop_front();
_buffer_rows -= result->result_batch.rows.size();
_data_removal.notify_one();
ctx->on_data(result, _packet_num);
_packet_num++;
-
- delete result;
- result = nullptr;
-
return;
}
if (_is_close) {
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 9f497c8..aad8928 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -59,7 +59,7 @@ struct GetResultBatchCtx {
void on_failure(const Status& status);
void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
- void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos =
false);
+ void on_data(const std::unique_ptr<TFetchDataResult>& t_result, int64_t
packet_seq, bool eos = false);
};
// buffer used for result customer and producer
@@ -69,7 +69,7 @@ public:
~BufferControlBlock();
Status init();
- Status add_batch(TFetchDataResult* result);
+ Status add_batch(std::unique_ptr<TFetchDataResult>& result);
// get result from batch, use timeout?
Status get_batch(TFetchDataResult* result);
@@ -98,7 +98,7 @@ public:
}
private:
- typedef std::list<TFetchDataResult*> ResultQueue;
+ typedef std::list<std::unique_ptr<TFetchDataResult>> ResultQueue;
// result's query id
TUniqueId _fragment_id;
diff --git a/be/src/runtime/file_result_writer.cpp
b/be/src/runtime/file_result_writer.cpp
index 80bbc81..8d382af 100644
--- a/be/src/runtime/file_result_writer.cpp
+++ b/be/src/runtime/file_result_writer.cpp
@@ -384,7 +384,7 @@ Status FileResultWriter::_send_result() {
std::unique_ptr<TFetchDataResult> result =
std::make_unique<TFetchDataResult>();
result->result_batch.rows.resize(1);
result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
- RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result.get()), "failed
to send outfile result");
+ RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send
outfile result");
return Status::OK();
}
diff --git a/be/src/runtime/mysql_result_writer.cpp
b/be/src/runtime/mysql_result_writer.cpp
index 74010cf..7896477 100644
--- a/be/src/runtime/mysql_result_writer.cpp
+++ b/be/src/runtime/mysql_result_writer.cpp
@@ -233,7 +233,7 @@ Status MysqlResultWriter::append_row_batch(const RowBatch*
batch) {
Status status;
// convert one batch
- TFetchDataResult* result = new (std::nothrow) TFetchDataResult();
+ std::unique_ptr<TFetchDataResult> result =
std::make_unique<TFetchDataResult>();
int num_rows = batch->num_rows();
result->result_batch.rows.resize(num_rows);
@@ -255,20 +255,10 @@ Status MysqlResultWriter::append_row_batch(const
RowBatch* batch) {
if (status.ok()) {
SCOPED_TIMER(_result_send_timer);
// push this batch to back
- status = _sinker->add_batch(result);
-
- if (status.ok()) {
- result = NULL;
- _written_rows += num_rows;
- } else {
- LOG(WARNING) << "append result batch to sink failed.";
- }
+ RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "fappend
result batch to sink failed.");
+ _written_rows += num_rows;
}
-
- delete result;
- result = NULL;
-
- return status;
+ return Status::OK();
}
Status MysqlResultWriter::close() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]