This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 146060d  [Bug]Fix result_writer may coredump (#6482)
146060d is described below

commit 146060dfc09bb299b2a214463716acbabd09f37e
Author: Zhengguo Yang <[email protected]>
AuthorDate: Sun Aug 22 22:04:00 2021 +0800

    [Bug]Fix result_writer may coredump (#6482)
    
    fix result_writer may coredump, let BufferControlBlock owns the memory
---
 be/src/runtime/buffer_control_block.cpp | 80 +++++++++++++--------------------
 be/src/runtime/buffer_control_block.h   |  6 +--
 be/src/runtime/file_result_writer.cpp   |  2 +-
 be/src/runtime/mysql_result_writer.cpp  | 18 ++------
 4 files changed, 40 insertions(+), 66 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 5041aa6..bcbe562 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -44,7 +44,8 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, 
QueryStatistics* statistics
     delete this;
 }
 
-void GetResultBatchCtx::on_data(TFetchDataResult* t_result, int64_t 
packet_seq, bool eos) {
+void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& 
t_result,
+                                int64_t packet_seq, bool eos) {
     Status st = Status::OK();
     if (t_result != nullptr) {
         uint8_t* buf = nullptr;
@@ -83,18 +84,13 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, 
int buffer_size)
 
 BufferControlBlock::~BufferControlBlock() {
     cancel();
-
-    for (ResultQueue::iterator iter = _batch_queue.begin(); _batch_queue.end() 
!= iter; ++iter) {
-        delete *iter;
-        *iter = NULL;
-    }
 }
 
 Status BufferControlBlock::init() {
     return Status::OK();
 }
 
-Status BufferControlBlock::add_batch(TFetchDataResult* result) {
+Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result) {
     std::unique_lock<std::mutex> l(_lock);
 
     if (_is_cancelled) {
@@ -113,61 +109,53 @@ Status BufferControlBlock::add_batch(TFetchDataResult* 
result) {
 
     if (_waiting_rpc.empty()) {
         _buffer_rows += num_rows;
-        _batch_queue.push_back(result);
+        _batch_queue.push_back(std::move(result));
         _data_arrival.notify_one();
     } else {
         auto ctx = _waiting_rpc.front();
         _waiting_rpc.pop_front();
         ctx->on_data(result, _packet_num);
-        delete result;
         _packet_num++;
     }
     return Status::OK();
 }
 
 Status BufferControlBlock::get_batch(TFetchDataResult* result) {
-    TFetchDataResult* item = NULL;
-    {
-        std::unique_lock<std::mutex> l(_lock);
+    std::unique_lock<std::mutex> l(_lock);
 
-        while (_batch_queue.empty() && !_is_close && !_is_cancelled) {
-            _data_arrival.wait(l);
-        }
+    while (_batch_queue.empty() && !_is_close && !_is_cancelled) {
+        _data_arrival.wait(l);
+    }
 
-        // if Status has been set, return fail;
-        RETURN_IF_ERROR(_status);
+    // if Status has been set, return fail;
+    RETURN_IF_ERROR(_status);
 
-        // cancelled
-        if (_is_cancelled) {
-            return Status::Cancelled("Cancelled");
-        }
+    // cancelled
+    if (_is_cancelled) {
+        return Status::Cancelled("Cancelled");
+    }
 
-        if (_batch_queue.empty()) {
-            if (_is_close) {
-                // no result, normal end
-                result->eos = true;
-                result->__set_packet_num(_packet_num);
-                _packet_num++;
-                return Status::OK();
-            } else {
-                // can not get here
-                return Status::InternalError("Internal error, can not Get 
here!");
-            }
+    if (_batch_queue.empty()) {
+        if (_is_close) {
+            // no result, normal end
+            result->eos = true;
+            result->__set_packet_num(_packet_num);
+            _packet_num++;
+            return Status::OK();
+        } else {
+            // can not get here
+            return Status::InternalError("Internal error, can not Get here!");
         }
-
-        // get result
-        item = _batch_queue.front();
-        _batch_queue.pop_front();
-        _buffer_rows -= item->result_batch.rows.size();
-        _data_removal.notify_one();
     }
-    *result = *item;
+
+    // get result
+    std::unique_ptr<TFetchDataResult> item = std::move(_batch_queue.front());
+    _batch_queue.pop_front();
+    _buffer_rows -= item->result_batch.rows.size();
+    _data_removal.notify_one();
+    *result = *(item.get());
     result->__set_packet_num(_packet_num);
     _packet_num++;
-    // destruct item new from Result writer
-    delete item;
-    item = NULL;
-
     return Status::OK();
 }
 
@@ -183,17 +171,13 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* 
ctx) {
     }
     if (!_batch_queue.empty()) {
         // get result
-        TFetchDataResult* result = _batch_queue.front();
+        std::unique_ptr<TFetchDataResult> result = 
std::move(_batch_queue.front());
         _batch_queue.pop_front();
         _buffer_rows -= result->result_batch.rows.size();
         _data_removal.notify_one();
 
         ctx->on_data(result, _packet_num);
         _packet_num++;
-
-        delete result;
-        result = nullptr;
-
         return;
     }
     if (_is_close) {
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 9f497c8..aad8928 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -59,7 +59,7 @@ struct GetResultBatchCtx {
 
     void on_failure(const Status& status);
     void on_close(int64_t packet_seq, QueryStatistics* statistics = nullptr);
-    void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = 
false);
+    void on_data(const std::unique_ptr<TFetchDataResult>& t_result, int64_t 
packet_seq, bool eos = false);
 };
 
 // buffer used for result customer and producer
@@ -69,7 +69,7 @@ public:
     ~BufferControlBlock();
 
     Status init();
-    Status add_batch(TFetchDataResult* result);
+    Status add_batch(std::unique_ptr<TFetchDataResult>& result);
 
     // get result from batch, use timeout?
     Status get_batch(TFetchDataResult* result);
@@ -98,7 +98,7 @@ public:
     }
 
 private:
-    typedef std::list<TFetchDataResult*> ResultQueue;
+    typedef std::list<std::unique_ptr<TFetchDataResult>> ResultQueue;
 
     // result's query id
     TUniqueId _fragment_id;
diff --git a/be/src/runtime/file_result_writer.cpp 
b/be/src/runtime/file_result_writer.cpp
index 80bbc81..8d382af 100644
--- a/be/src/runtime/file_result_writer.cpp
+++ b/be/src/runtime/file_result_writer.cpp
@@ -384,7 +384,7 @@ Status FileResultWriter::_send_result() {
     std::unique_ptr<TFetchDataResult> result = 
std::make_unique<TFetchDataResult>();
     result->result_batch.rows.resize(1);
     result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
-    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result.get()), "failed 
to send outfile result");
+    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send 
outfile result");
     return Status::OK();
 }
 
diff --git a/be/src/runtime/mysql_result_writer.cpp 
b/be/src/runtime/mysql_result_writer.cpp
index 74010cf..7896477 100644
--- a/be/src/runtime/mysql_result_writer.cpp
+++ b/be/src/runtime/mysql_result_writer.cpp
@@ -233,7 +233,7 @@ Status MysqlResultWriter::append_row_batch(const RowBatch* 
batch) {
 
     Status status;
     // convert one batch
-    TFetchDataResult* result = new (std::nothrow) TFetchDataResult();
+    std::unique_ptr<TFetchDataResult> result = 
std::make_unique<TFetchDataResult>();
     int num_rows = batch->num_rows();
     result->result_batch.rows.resize(num_rows);
 
@@ -255,20 +255,10 @@ Status MysqlResultWriter::append_row_batch(const 
RowBatch* batch) {
     if (status.ok()) {
         SCOPED_TIMER(_result_send_timer);
         // push this batch to back
-        status = _sinker->add_batch(result);
-
-        if (status.ok()) {
-            result = NULL;
-            _written_rows += num_rows;
-        } else {
-            LOG(WARNING) << "append result batch to sink failed.";
-        }
+        RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "fappend 
result batch to sink failed.");
+        _written_rows += num_rows;
     }
-
-    delete result;
-    result = NULL;
-
-    return status;
+    return Status::OK();
 }
 
 Status MysqlResultWriter::close() {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to