This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7a1653be226 [Exec](profile) add max row count one backend in
materialization profile (#59728)
7a1653be226 is described below
commit 7a1653be226cf36b85af6acf0bc72580c51c817b
Author: HappenLee <[email protected]>
AuthorDate: Mon Jan 12 14:58:56 2026 +0800
[Exec](profile) add max row count one backend in materialization profile
(#59728)
- Add `_backend_rows_count` to `MaterializationSharedState` to count the
number of rows assigned to each backend during block distribution.
- Introduce `_max_rows_per_backend` to record the maximum row count
handled by any single backend in the current batch.
- Update `merge_multi_response()` to compute `_max_rows_per_backend`
after row assignment.
- Expose this metric via a new runtime profile counter
`MaxRowsPerBackend` in `MaterializationLocalState`.
- This helps monitor data skew across backends and aids in performance
debugging for materialization-heavy queries.
---
be/src/pipeline/exec/materialization_opertor.cpp | 13 +++++++++++++
be/src/pipeline/exec/materialization_opertor.h | 8 ++++++++
2 files changed, 21 insertions(+)
diff --git a/be/src/pipeline/exec/materialization_opertor.cpp
b/be/src/pipeline/exec/materialization_opertor.cpp
index 51f07d65002..20b1994f662 100644
--- a/be/src/pipeline/exec/materialization_opertor.cpp
+++ b/be/src/pipeline/exec/materialization_opertor.cpp
@@ -56,6 +56,7 @@ void MaterializationSharedState::get_block(vectorized::Block*
block) {
Status MaterializationSharedState::merge_multi_response() {
std::unordered_map<int64_t, std::pair<vectorized::Block, int>> block_maps;
+
for (int i = 0; i < block_order_results.size(); ++i) {
for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
vectorized::Block partial_block;
@@ -175,12 +176,22 @@ Status
MaterializationSharedState::create_muiltget_result(const vectorized::Colu
rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
row_location.file_id);
block_order[j] = row_location.backend_id;
+
+ // Count rows per backend
+ _backend_rows_count[row_location.backend_id]++;
} else {
block_order[j] = 0;
}
}
}
+ // Update max rows per backend
+ for (const auto& [_, row_count] : _backend_rows_count) {
+ if (row_count > _max_rows_per_backend) {
+ _max_rows_per_backend = row_count;
+ }
+ }
+
eos = child_eos;
if (eos && gc_id_map) {
for (auto& [_, rpc_struct] : rpc_struct_map) {
@@ -361,6 +372,8 @@ Status MaterializationOperator::push(RuntimeState* state,
vectorized::Block* in_
if (local_state._materialization_state.need_merge_block) {
SCOPED_TIMER(local_state._merge_response_timer);
RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response());
+ local_state._max_rows_per_backend_counter->set(
+
(int64_t)local_state._materialization_state._max_rows_per_backend);
}
}
diff --git a/be/src/pipeline/exec/materialization_opertor.h
b/be/src/pipeline/exec/materialization_opertor.h
index cdb197d101d..a456374d09f 100644
--- a/be/src/pipeline/exec/materialization_opertor.h
+++ b/be/src/pipeline/exec/materialization_opertor.h
@@ -66,6 +66,11 @@ public:
std::vector<std::vector<int64_t>> block_order_results;
// backend id => <rpc profile info string key, rpc profile info string
value>.
std::map<int64_t, std::map<std::string, fmt::memory_buffer>>
backend_profile_info_string;
+
+ // Store the maximum number of rows processed by a single backend in the
current batch
+ uint32_t _max_rows_per_backend = 0;
+ // Store the number of rows processed by each backend
+ std::unordered_map<int64_t, uint32_t> _backend_rows_count; // backend_id
=> rows_count
};
class MaterializationLocalState final : public
PipelineXLocalState<FakeSharedState> {
@@ -80,6 +85,8 @@ public:
RETURN_IF_ERROR(Base::init(state, info));
_max_rpc_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MaxRpcTime",
2);
_merge_response_timer = ADD_TIMER_WITH_LEVEL(custom_profile(),
"MergeResponseTime", 2);
+ _max_rows_per_backend_counter =
+ ADD_COUNTER_WITH_LEVEL(custom_profile(), "MaxRowsPerBackend",
TUnit::UNIT, 2);
return Status::OK();
}
@@ -93,6 +100,7 @@ private:
MaterializationSharedState _materialization_state;
RuntimeProfile::Counter* _max_rpc_timer = nullptr;
RuntimeProfile::Counter* _merge_response_timer = nullptr;
+ RuntimeProfile::Counter* _max_rows_per_backend_counter = nullptr;
};
class MaterializationOperator final : public
StatefulOperatorX<MaterializationLocalState> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]