This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a1653be226 [Exec](profile) add max row count one backend in 
materialization profile (#59728)
7a1653be226 is described below

commit 7a1653be226cf36b85af6acf0bc72580c51c817b
Author: HappenLee <[email protected]>
AuthorDate: Mon Jan 12 14:58:56 2026 +0800

    [Exec](profile) add max row count one backend in materialization profile 
(#59728)
    
    - Add `_backend_rows_count` to `MaterializationSharedState` to count the
    number of rows assigned to each backend during block distribution.
    - Introduce `_max_rows_per_backend` to record the maximum row count
    handled by any single backend in the current batch.
    - Update `merge_multi_response()` to compute `_max_rows_per_backend`
    after row assignment.
    - Expose this metric via a new runtime profile counter
    `MaxRowsPerBackend` in `MaterializationLocalState`.
    - This helps monitor data skew across backends and aids in performance
    debugging for materialization-heavy queries.
---
 be/src/pipeline/exec/materialization_opertor.cpp | 13 +++++++++++++
 be/src/pipeline/exec/materialization_opertor.h   |  8 ++++++++
 2 files changed, 21 insertions(+)

diff --git a/be/src/pipeline/exec/materialization_opertor.cpp 
b/be/src/pipeline/exec/materialization_opertor.cpp
index 51f07d65002..20b1994f662 100644
--- a/be/src/pipeline/exec/materialization_opertor.cpp
+++ b/be/src/pipeline/exec/materialization_opertor.cpp
@@ -56,6 +56,7 @@ void MaterializationSharedState::get_block(vectorized::Block* 
block) {
 
 Status MaterializationSharedState::merge_multi_response() {
     std::unordered_map<int64_t, std::pair<vectorized::Block, int>> block_maps;
+
     for (int i = 0; i < block_order_results.size(); ++i) {
         for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
             vectorized::Block partial_block;
@@ -175,12 +176,22 @@ Status 
MaterializationSharedState::create_muiltget_result(const vectorized::Colu
                 
rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
                         row_location.file_id);
                 block_order[j] = row_location.backend_id;
+
+                // Count rows per backend
+                _backend_rows_count[row_location.backend_id]++;
             } else {
                 block_order[j] = 0;
             }
         }
     }
 
+    // Update max rows per backend
+    for (const auto& [_, row_count] : _backend_rows_count) {
+        if (row_count > _max_rows_per_backend) {
+            _max_rows_per_backend = row_count;
+        }
+    }
+
     eos = child_eos;
     if (eos && gc_id_map) {
         for (auto& [_, rpc_struct] : rpc_struct_map) {
@@ -361,6 +372,8 @@ Status MaterializationOperator::push(RuntimeState* state, 
vectorized::Block* in_
         if (local_state._materialization_state.need_merge_block) {
             SCOPED_TIMER(local_state._merge_response_timer);
             
RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response());
+            local_state._max_rows_per_backend_counter->set(
+                    
(int64_t)local_state._materialization_state._max_rows_per_backend);
         }
     }
 
diff --git a/be/src/pipeline/exec/materialization_opertor.h 
b/be/src/pipeline/exec/materialization_opertor.h
index cdb197d101d..a456374d09f 100644
--- a/be/src/pipeline/exec/materialization_opertor.h
+++ b/be/src/pipeline/exec/materialization_opertor.h
@@ -66,6 +66,11 @@ public:
     std::vector<std::vector<int64_t>> block_order_results;
     // backend id => <rpc profile info string key, rpc profile info string 
value>.
     std::map<int64_t, std::map<std::string, fmt::memory_buffer>> 
backend_profile_info_string;
+
+    // Store the maximum number of rows processed by a single backend in the 
current batch
+    uint32_t _max_rows_per_backend = 0;
+    // Store the number of rows processed by each backend
+    std::unordered_map<int64_t, uint32_t> _backend_rows_count; // backend_id 
=> rows_count
 };
 
 class MaterializationLocalState final : public 
PipelineXLocalState<FakeSharedState> {
@@ -80,6 +85,8 @@ public:
         RETURN_IF_ERROR(Base::init(state, info));
         _max_rpc_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MaxRpcTime", 
2);
         _merge_response_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), 
"MergeResponseTime", 2);
+        _max_rows_per_backend_counter =
+                ADD_COUNTER_WITH_LEVEL(custom_profile(), "MaxRowsPerBackend", 
TUnit::UNIT, 2);
         return Status::OK();
     }
 
@@ -93,6 +100,7 @@ private:
     MaterializationSharedState _materialization_state;
     RuntimeProfile::Counter* _max_rpc_timer = nullptr;
     RuntimeProfile::Counter* _merge_response_timer = nullptr;
+    RuntimeProfile::Counter* _max_rows_per_backend_counter = nullptr;
 };
 
 class MaterializationOperator final : public 
StatefulOperatorX<MaterializationLocalState> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to