This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 52108afadb [chore](profile) Fix 'BlocksProduced' in 
plan_fragment_executor (#22637)
52108afadb is described below

commit 52108afadb314fd6bb837ecdd1c6711226de9f77
Author: Jerry Hu <[email protected]>
AuthorDate: Sun Aug 6 12:42:39 2023 +0800

    [chore](profile) Fix 'BlocksProduced' in plan_fragment_executor (#22637)
---
 be/src/runtime/plan_fragment_executor.cpp | 2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp | 6 ++++++
 be/src/vec/runtime/vdata_stream_recvr.h   | 5 +++++
 3 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 110b82d189..aba0893046 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -361,7 +361,7 @@ Status 
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
         if (block->rows() > 0) {
             COUNTER_UPDATE(_rows_produced_counter, block->rows());
             // Not very sure, if should contain empty block
-            COUNTER_UPDATE(_blocks_produced_counter, block->rows());
+            COUNTER_UPDATE(_blocks_produced_counter, 1);
             break;
         }
     }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 45b910cf82..68532bdbfa 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -156,6 +156,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
     COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
     COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
     COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
+    COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+    COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
     _block_queue.emplace_back(std::move(block), block_byte_size);
     // if done is nullptr, this function can't delay this response
@@ -200,6 +202,8 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         return;
     }
     COUNTER_UPDATE(_recvr->_local_bytes_received_counter, 
block_bytes_received);
+    COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+    COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
     _block_queue.emplace_back(std::move(nblock), block_mem_size);
     _data_arrival_cv.notify_one();
@@ -331,6 +335,8 @@ VDataStreamRecvr::VDataStreamRecvr(
     _first_batch_wait_total_timer = ADD_TIMER(_profile, 
"FirstBatchArrivalWaitTime");
     _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
     _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
+    _rows_produced_counter = ADD_COUNTER(_profile, "RowsProduced", 
TUnit::UNIT);
+    _blocks_produced_counter = ADD_COUNTER(_profile, "BlocksProduced", 
TUnit::UNIT);
 }
 
 VDataStreamRecvr::~VDataStreamRecvr() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 3e85a649c5..ce9e36b093 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -155,6 +155,11 @@ private:
     RuntimeProfile::Counter* _memory_usage_counter;
     RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage;
 
+    // Number of rows received
+    RuntimeProfile::Counter* _rows_produced_counter;
+    // Number of blocks received
+    RuntimeProfile::Counter* _blocks_produced_counter;
+
     std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
 
     bool _enable_pipeline;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to