This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1cda120c8c885acf6abda0468091fb762dc6363a
Author: morningman <[email protected]>
AuthorDate: Sat Apr 30 00:55:28 2022 +0800

    [Bug] fix memory leak in VDataStreamRecvr::SenderQueue (#8643)
---
 be/src/runtime/data_stream_recvr.cc       | 2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp | 5 ++---
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/data_stream_recvr.cc 
b/be/src/runtime/data_stream_recvr.cc
index 962395ad26..f38049dc6f 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -197,7 +197,7 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** 
next_batch) {
 void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int 
be_number,
                                              int64_t packet_seq,
                                              ::google::protobuf::Closure** 
done) {
-    unique_lock<mutex> l(_lock);
+    lock_guard<mutex> l(_lock);
     if (_is_cancelled) {
         return;
     }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index e3eb3d4e8f..24244312cc 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -90,7 +90,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** 
next_block) {
 void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int 
be_number,
                                               int64_t packet_seq,
                                               ::google::protobuf::Closure** 
done) {
-    std::unique_lock<std::mutex> l(_lock);
+    std::lock_guard<std::mutex> l(_lock);
     if (_is_cancelled) {
         return;
     }
@@ -140,6 +140,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
 }
 
 void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
+    std::unique_lock<std::mutex> l(_lock);
     if (_is_cancelled) {
         return;
     }
@@ -158,8 +159,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
     }
     materialize_block_inplace(*nblock);
 
-
-    std::unique_lock<std::mutex> l(_lock);
     size_t block_size = nblock->bytes();
     _block_queue.emplace_back(block_size, nblock);
     _recvr->_mem_tracker->Consume(nblock->bytes());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to