This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 1cda120c8c885acf6abda0468091fb762dc6363a Author: morningman <[email protected]> AuthorDate: Sat Apr 30 00:55:28 2022 +0800 [Bug] fix memory leak in VDataStreamRecvr::SenderQueue (#8643) --- be/src/runtime/data_stream_recvr.cc | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 962395ad26..f38049dc6f 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -197,7 +197,7 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) { void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - unique_lock<mutex> l(_lock); + lock_guard<mutex> l(_lock); if (_is_cancelled) { return; } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index e3eb3d4e8f..24244312cc 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -90,7 +90,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) { void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - std::unique_lock<std::mutex> l(_lock); + std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { return; } @@ -140,6 +140,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe } void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { + std::unique_lock<std::mutex> l(_lock); if (_is_cancelled) { return; } @@ -158,8 +159,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { } materialize_block_inplace(*nblock); - - std::unique_lock<std::mutex> l(_lock); size_t block_size = nblock->bytes(); _block_queue.emplace_back(block_size, nblock); _recvr->_mem_tracker->Consume(nblock->bytes()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
