This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 758f8c6143e branch-4.1: [fix](be) Move partitioned agg shared cleanup 
to shared state #63253 (#63287)
758f8c6143e is described below

commit 758f8c6143e87b42c588a4cc9e0372f27d78bd40
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 20 15:58:30 2026 +0800

    branch-4.1: [fix](be) Move partitioned agg shared cleanup to shared state 
#63253 (#63287)
    
    Cherry-picked from #63253
    
    Co-authored-by: Jerry Hu <[email protected]>
---
 be/src/exec/operator/partitioned_aggregation_source_operator.cpp | 7 -------
 be/src/exec/pipeline/dependency.cpp                              | 5 +++++
 be/src/exec/pipeline/dependency.h                                | 6 +++++-
 be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp      | 4 +++-
 4 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp 
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index ea57cc0091d..057915cac93 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -153,13 +153,6 @@ Status 
PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
 Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
     RETURN_IF_ERROR(OperatorXBase::close(state));
 
-    // Centralize shared_state cleanup here so resources are released when
-    // the pipeline task finishes, matching the Sort operator pattern.
-    auto& local_state = get_local_state(state);
-    if (local_state._shared_state) {
-        local_state._shared_state->close();
-    }
-
     return _agg_source_operator->close(state);
 }
 
diff --git a/be/src/exec/pipeline/dependency.cpp 
b/be/src/exec/pipeline/dependency.cpp
index 87d53c20991..014f3182183 100644
--- a/be/src/exec/pipeline/dependency.cpp
+++ b/be/src/exec/pipeline/dependency.cpp
@@ -312,6 +312,11 @@ Status AggSharedState::reset_hash_table() {
 }
 
 void PartitionedAggSharedState::close() {
+    bool false_close = false;
+    if (!is_closed.compare_exchange_strong(false_close, true)) {
+        return;
+    }
+
     for (auto& partition : _spill_partitions) {
         if (partition) {
             
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition);
diff --git a/be/src/exec/pipeline/dependency.h 
b/be/src/exec/pipeline/dependency.h
index 2c6e3b9aef2..dec1d3aaeea 100644
--- a/be/src/exec/pipeline/dependency.h
+++ b/be/src/exec/pipeline/dependency.h
@@ -428,7 +428,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
     ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
 
     PartitionedAggSharedState() = default;
-    ~PartitionedAggSharedState() override = default;
+    ~PartitionedAggSharedState() override { close(); }
 
     void close();
 
@@ -437,6 +437,10 @@ struct PartitionedAggSharedState : public BasicSharedState,
 
     // partition count is no longer stored in shared state; operators maintain 
their own
     std::atomic<bool> _is_spilled = false;
+    // This state is shared by the partitioned agg sink and source pipelines. 
Spill files left
+    // here are owned by the shared state until the source moves them into its 
local queue, so the
+    // cleanup must be tied to the shared state's lifetime and must be 
idempotent.
+    std::atomic_bool is_closed = false;
     std::deque<SpillFileSPtr> _spill_partitions;
 };
 
diff --git a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp 
b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
index 4118d923e57..b07b5527637 100644
--- a/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
+++ b/be/test/exec/pipeline/partitioned_agg_shared_state_test.cpp
@@ -130,7 +130,9 @@ TEST_F(PartitionedAggSharedStateTest, 
CloseCalledMultipleTimes) {
     for (int round = 0; round < 5; ++round) {
         state._spill_partitions.emplace_back(nullptr);
         state.close();
-        ASSERT_TRUE(state._spill_partitions.empty());
+
+        // repeatly calling close should not cause issues but also should not 
do anything after the first call.
+        ASSERT_EQ(state._spill_partitions.empty(), round == 0) << "After round 
" << round;
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to