This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a34cc6ed23 [Refactor](exchange) Remove unless variable and change
block mem count way (#16668)
a34cc6ed23 is described below
commit a34cc6ed2326836551beeebb94bd1d0873c4c10c
Author: HappenLee <[email protected]>
AuthorDate: Mon Feb 13 19:14:01 2023 +0800
[Refactor](exchange) Remove unless variable and change block mem count way
(#16668)
---
be/src/vec/runtime/vdata_stream_recvr.cpp | 20 +++++++-------------
be/src/vec/runtime/vdata_stream_recvr.h | 14 +++++---------
2 files changed, 12 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a1e0ea4c32..79cb7b5cfd 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -82,9 +82,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block*
block, bool* eos)
_received_first_batch = true;
DCHECK(!_block_queue.empty());
- BlockUPtr next_block = std::move(_block_queue.front());
- auto block_byte_size = block->allocated_bytes();
- _recvr->_num_buffered_bytes -= block_byte_size;
+ auto [next_block, block_byte_size] = std::move(_block_queue.front());
_recvr->_blocks_memory_usage->add(-block_byte_size);
_block_queue.pop_front();
_update_block_queue_empty();
@@ -156,9 +154,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
- _recvr->_blocks_memory_usage->add(block_byte_size);
- _block_queue.emplace_back(std::move(block));
+ _block_queue.emplace_back(std::move(block), block_byte_size);
_update_block_queue_empty();
// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
@@ -168,7 +165,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
_pending_closures.emplace_back(*done, monotonicStopWatch);
*done = nullptr;
}
- _recvr->_num_buffered_bytes += block_byte_size;
+ _recvr->_blocks_memory_usage->add(block_byte_size);
_data_arrival_cv.notify_one();
}
@@ -199,20 +196,18 @@ void VDataStreamRecvr::SenderQueue::add_block(Block*
block, bool use_move) {
}
materialize_block_inplace(*nblock);
- size_t block_size = nblock->bytes();
-
+ size_t block_mem_size = nblock->allocated_bytes();
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
COUNTER_UPDATE(_recvr->_local_bytes_received_counter,
block_bytes_received);
- _recvr->_blocks_memory_usage->add(nblock->allocated_bytes());
- _block_queue.emplace_back(std::move(nblock));
+ _block_queue.emplace_back(std::move(nblock), block_mem_size);
_update_block_queue_empty();
_data_arrival_cv.notify_one();
- if (_recvr->exceeds_limit(block_size)) {
+ if (_recvr->exceeds_limit(block_mem_size)) {
// yiguolei
// It is too tricky here, if the running thread is bthread then the
tid may be wrong.
std::thread::id tid = std::this_thread::get_id();
@@ -227,7 +222,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
iter->second->wait(l);
}
- _recvr->_num_buffered_bytes += block_size;
+ _recvr->_blocks_memory_usage->add(block_mem_size);
}
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
@@ -304,7 +299,6 @@ VDataStreamRecvr::VDataStreamRecvr(
_row_desc(row_desc),
_is_merging(is_merging),
_is_closed(false),
- _num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 523d364fdf..1fc635a7f7 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -89,7 +89,8 @@ public:
void close();
bool exceeds_limit(int batch_size) {
- return _num_buffered_bytes + batch_size >
config::exchg_node_buffer_size_bytes;
+ return _blocks_memory_usage->current_value() + batch_size >
+ config::exchg_node_buffer_size_bytes;
}
bool is_closed() const { return _is_closed; }
@@ -119,7 +120,6 @@ private:
bool _is_merging;
bool _is_closed;
- std::atomic<int> _num_buffered_bytes;
std::unique_ptr<MemTracker> _mem_tracker;
// Managed by object pool
std::vector<SenderQueue*> _sender_queues;
@@ -185,7 +185,7 @@ protected:
std::atomic_int _num_remaining_senders;
std::condition_variable _data_arrival_cv;
std::condition_variable _data_removal_cv;
- std::list<BlockUPtr> _block_queue;
+ std::list<std::pair<BlockUPtr, size_t>> _block_queue;
std::atomic_bool _block_queue_empty = true;
bool _received_first_batch;
@@ -238,19 +238,15 @@ public:
}
materialize_block_inplace(*nblock);
- size_t block_size = nblock->bytes();
auto block_mem_size = nblock->allocated_bytes();
{
std::unique_lock<std::mutex> l(_lock);
- _block_queue.emplace_back(std::move(nblock));
+ _block_queue.emplace_back(std::move(nblock), block_mem_size);
}
- COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size);
+ COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->_blocks_memory_usage->add(block_mem_size);
_update_block_queue_empty();
_data_arrival_cv.notify_one();
-
- _recvr->_num_buffered_bytes += block_size;
- COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size);
}
};
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]