This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 96a3a1a2362 [fix](spill) Memory leak in the arena used by the 
aggregation source … (#60205)
96a3a1a2362 is described below

commit 96a3a1a236212b8c6fdbb115cad867e39dd397a1
Author: dh-cloud <[email protected]>
AuthorDate: Mon Jan 26 22:15:35 2026 +0800

    [fix](spill) Memory leak in the arena used by the aggregation source … 
(#60205)
    
    …operator
    
    After switching spill partition in partitioned aggregation source
    operator, the agg_arena_pool also needs to be reset.
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/aggregation_source_operator.cpp             | 8 ++++++++
 be/src/pipeline/exec/aggregation_source_operator.h               | 2 ++
 be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp | 2 +-
 3 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 989a7cd478f..b8ecd137016 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -543,6 +543,14 @@ size_t 
AggSourceOperatorX::get_estimated_memory_size_for_merging(RuntimeState* s
     return size;
 }
 
+Status AggSourceOperatorX::reset_hash_table(RuntimeState* state) {
+    auto& local_state = get_local_state(state);
+    auto& ss = *local_state.Base::_shared_state;
+    RETURN_IF_ERROR(ss.reset_hash_table());
+    ss.agg_arena_pool.clear(true);
+    return Status::OK();
+}
+
 void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* 
places,
                                              vectorized::ColumnRawPtrs& 
key_columns,
                                              uint32_t num_rows) {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index 1bf4edabf5d..9273be7caed 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -114,6 +114,8 @@ public:
 
     size_t get_estimated_memory_size_for_merging(RuntimeState* state, size_t 
rows) const;
 
+    Status reset_hash_table(RuntimeState* state);
+
 private:
     friend class AggLocalState;
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 9adff22d52f..728555507a3 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -179,7 +179,7 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
             if (!local_state._shared_state->spill_partitions.empty()) {
                 local_state._current_partition_eos = false;
                 local_state._need_to_merge_data_for_current_partition = true;
-                status = 
local_state._shared_state->in_mem_shared_state->reset_hash_table();
+                status = _agg_source_operator->reset_hash_table(runtime_state);
                 RETURN_IF_ERROR(status);
                 *eos = false;
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to