This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 52108afadb [chore](profile) Fix 'BlocksProduced' in
plan_fragment_executor (#22637)
52108afadb is described below
commit 52108afadb314fd6bb837ecdd1c6711226de9f77
Author: Jerry Hu <[email protected]>
AuthorDate: Sun Aug 6 12:42:39 2023 +0800
[chore](profile) Fix 'BlocksProduced' in plan_fragment_executor (#22637)
---
be/src/runtime/plan_fragment_executor.cpp | 2 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 6 ++++++
be/src/vec/runtime/vdata_stream_recvr.h | 5 +++++
3 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 110b82d189..aba0893046 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -361,7 +361,7 @@ Status
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
if (block->rows() > 0) {
COUNTER_UPDATE(_rows_produced_counter, block->rows());
// Not very sure, if should contain empty block
- COUNTER_UPDATE(_blocks_produced_counter, block->rows());
+ COUNTER_UPDATE(_blocks_produced_counter, 1);
break;
}
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 45b910cf82..68532bdbfa 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -156,6 +156,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
+ COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+ COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
_block_queue.emplace_back(std::move(block), block_byte_size);
// if done is nullptr, this function can't delay this response
@@ -200,6 +202,8 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
return;
}
COUNTER_UPDATE(_recvr->_local_bytes_received_counter,
block_bytes_received);
+ COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+ COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_data_arrival_cv.notify_one();
@@ -331,6 +335,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_first_batch_wait_total_timer = ADD_TIMER(_profile,
"FirstBatchArrivalWaitTime");
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
_decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
+ _rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced",
TUnit::UNIT);
+ _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced",
TUnit::UNIT);
}
VDataStreamRecvr::~VDataStreamRecvr() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 3e85a649c5..ce9e36b093 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -155,6 +155,11 @@ private:
RuntimeProfile::Counter* _memory_usage_counter;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
+ // Number of rows received
+ RuntimeProfile::Counter* _rows_produced_counter;
+ // Number of blocks received
+ RuntimeProfile::Counter* _blocks_produced_counter;
+
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
bool _enable_pipeline;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]