This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab3fc1df5e [chore](profile) Fix 'BlocksProduced' in
plan_fragment_executor (#22637)
ab3fc1df5e is described below
commit ab3fc1df5e6f8782b3c3a998c9dfbec4d30240fb
Author: Jerry Hu <[email protected]>
AuthorDate: Sun Aug 6 12:42:39 2023 +0800
[chore](profile) Fix 'BlocksProduced' in plan_fragment_executor (#22637)
---
be/src/runtime/plan_fragment_executor.cpp | 2 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 6 ++++++
be/src/vec/runtime/vdata_stream_recvr.h | 5 +++++
3 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 110b82d189..aba0893046 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -361,7 +361,7 @@ Status
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
if (block->rows() > 0) {
COUNTER_UPDATE(_rows_produced_counter, block->rows());
// Not very sure, if should contain empty block
- COUNTER_UPDATE(_blocks_produced_counter, block->rows());
+ COUNTER_UPDATE(_blocks_produced_counter, 1);
break;
}
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index e577fb341f..2e0ffaf9a3 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -156,6 +156,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
+ COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+ COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
_block_queue.emplace_back(std::move(block), block_byte_size);
// if done is nullptr, this function can't delay this response
@@ -200,6 +202,8 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
return;
}
COUNTER_UPDATE(_recvr->_local_bytes_received_counter,
block_bytes_received);
+ COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+ COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_data_arrival_cv.notify_one();
@@ -333,6 +337,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_first_batch_wait_total_timer = ADD_TIMER(_profile,
"FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
+ _rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced",
TUnit::UNIT);
+ _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced",
TUnit::UNIT);
}
VDataStreamRecvr::~VDataStreamRecvr() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 229d87d638..03bf6f9db2 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -156,6 +156,11 @@ private:
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
RuntimeProfile::Counter* _peak_memory_usage_counter;
+ // Number of rows received
+ RuntimeProfile::Counter* _rows_produced_counter;
+ // Number of blocks received
+ RuntimeProfile::Counter* _blocks_produced_counter;
+
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
bool _enable_pipeline;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]