This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new 23294426217 fix agg revocable mem size
23294426217 is described below
commit 232944262176ac446f909efd67b35e6c5467ab69
Author: Hu Shenggang <[email protected]>
AuthorDate: Sun Mar 1 19:54:46 2026 +0800
fix agg revocable mem size
---
.../partitioned_aggregation_source_operator.cpp | 30 +++++++++++++++++++---
1 file changed, 26 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 8de7d8a4411..c5863072fa2 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -52,6 +52,9 @@ Status PartitionedAggLocalState::init(RuntimeState* state,
LocalStateInfo& info)
// Counters for partition spill metrics
_max_partition_level = ADD_COUNTER(custom_profile(),
"SpillMaxPartitionLevel", TUnit::UNIT);
_total_partition_spills = ADD_COUNTER(custom_profile(),
"SpillTotalPartitions", TUnit::UNIT);
+
+ init_spill_write_counters();
+
// Nothing else to init for repartitioner here; fanout is configured when
// repartitioner is initialized with key columns during actual repartition.
return Status::OK();
@@ -165,12 +168,31 @@ bool
PartitionedAggSourceOperatorX::is_shuffled_operator() const {
size_t PartitionedAggSourceOperatorX::revocable_mem_size(RuntimeState* state)
const {
auto& local_state = get_local_state(state);
- if (!local_state._shared_state->_is_spilled) {
+ if (!local_state._shared_state->_is_spilled ||
!local_state._current_partition.has_data()) {
return 0;
}
- return local_state._estimate_memory_usage <
state->spill_min_revocable_mem()
- ? 0
- : local_state._estimate_memory_usage;
+
+ size_t bytes = 0;
+ for (const auto& block : local_state._blocks) {
+ bytes += block.allocated_bytes();
+ }
+ if (local_state._shared_state->_in_mem_shared_state != nullptr &&
+ local_state._shared_state->_in_mem_shared_state->agg_data != nullptr) {
+ auto* agg_data =
local_state._shared_state->_in_mem_shared_state->agg_data.get();
+ bytes += std::visit(
+ vectorized::Overload {[&](std::monostate& arg) -> size_t {
return 0; },
+ [&](auto& agg_method) -> size_t {
+ return
agg_method.hash_table->get_buffer_size_in_bytes();
+ }},
+ agg_data->method_variant);
+
+ if (auto& aggregate_data_container =
+
local_state._shared_state->_in_mem_shared_state->aggregate_data_container;
+ aggregate_data_container) {
+ bytes += aggregate_data_container->memory_usage();
+ }
+ }
+ return bytes;
}
Status PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]