This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 539cc31ebe4 branch-4.0: [opt](Arena)Release Arena memory earlier in 
pipeline operators. #59045 (#59632)
539cc31ebe4 is described below

commit 539cc31ebe422aad5c7613385ccc61c561733823
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 8 11:30:08 2026 +0800

    branch-4.0: [opt](Arena)Release Arena memory earlier in pipeline operators. 
#59045 (#59632)
    
    Cherry-picked from #59045
    
    Co-authored-by: Mryange <[email protected]>
---
 be/src/pipeline/dependency.h                       |  6 +++
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 46 +++++++++++-----------
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 -
 .../pipeline/exec/aggregation_source_operator.cpp  | 13 +++---
 be/src/pipeline/exec/aggregation_source_operator.h |  1 -
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  9 +++--
 be/src/pipeline/exec/analytic_sink_operator.h      |  1 -
 .../distinct_streaming_aggregation_operator.cpp    |  4 +-
 .../exec/distinct_streaming_aggregation_operator.h |  2 -
 be/src/pipeline/exec/set_sink_operator.cpp         |  2 +-
 be/src/pipeline/exec/set_sink_operator.h           |  1 -
 .../exec/streaming_aggregation_operator.cpp        |  2 +-
 .../pipeline/exec/streaming_aggregation_operator.h |  1 -
 13 files changed, 46 insertions(+), 44 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 626e174045b..036ca3a90de 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -382,6 +382,9 @@ public:
     // Refresh the top limit heap with a new row
     void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& 
key_columns);
 
+    vectorized::Arena agg_arena_pool;
+    vectorized::Arena agg_profile_arena;
+
 private:
     vectorized::MutableColumns _get_keys_hash_table();
 
@@ -580,6 +583,7 @@ public:
     std::mutex buffer_mutex;
     bool sink_eos = false;
     std::mutex sink_eos_lock;
+    vectorized::Arena agg_arena_pool;
 };
 
 struct JoinSharedState : public BasicSharedState {
@@ -702,6 +706,8 @@ public:
 
     std::atomic<bool> ready_for_read = false;
 
+    vectorized::Arena arena;
+
     /// called in setup_local_state
     Status hash_table_init();
 };
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index f666a4122c0..63648ab8146 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -102,8 +102,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
     }
 
     if (Base::_shared_state->probe_expr_ctxs.empty()) {
-        _agg_data->without_key =
-                
reinterpret_cast<vectorized::AggregateDataPtr>(_agg_profile_arena.aligned_alloc(
+        _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
+                Base::_shared_state->agg_profile_arena.aligned_alloc(
                         p._total_size_of_aggregate_states, 
p._align_aggregate_states));
 
         if (p._is_merge) {
@@ -187,7 +187,7 @@ Status 
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
                 block,
                 _agg_data->without_key + Base::_parent->template 
cast<AggSinkOperatorX>()
                                                  
._offsets_of_aggregate_states[i],
-                _agg_arena_pool));
+                Base::_shared_state->agg_arena_pool));
     }
     return Status::OK();
 }
@@ -207,7 +207,7 @@ size_t AggSinkLocalState::_memory_usage() const {
         return 0;
     }
     size_t usage = 0;
-    usage += _agg_arena_pool.size();
+    usage += Base::_shared_state->agg_arena_pool.size();
 
     if (Base::_shared_state->aggregate_data_container) {
         usage += Base::_shared_state->aggregate_data_container->memory_usage();
@@ -240,7 +240,7 @@ void 
AggSinkLocalState::_update_memusage_with_serialized_key() {
                        },
                        [&](auto& agg_method) -> void {
                            auto& data = *agg_method.hash_table;
-                           int64_t memory_usage_arena = _agg_arena_pool.size();
+                           int64_t memory_usage_arena = 
Base::_shared_state->agg_arena_pool.size();
                            int64_t memory_usage_container =
                                    
_shared_state->aggregate_data_container->memory_usage();
                            int64_t hash_table_memory_usage = 
data.get_buffer_size_in_bytes();
@@ -321,8 +321,8 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
                                     _places.data(),
                                     Base::_parent->template 
cast<AggSinkOperatorX>()
                                             ._offsets_of_aggregate_states[i],
-                                    _deserialize_buffer.data(), column.get(), 
_agg_arena_pool,
-                                    rows);
+                                    _deserialize_buffer.data(), column.get(),
+                                    Base::_shared_state->agg_arena_pool, rows);
                 }
             } else {
                 RETURN_IF_ERROR(
@@ -330,7 +330,7 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
                                 block,
                                 Base::_parent->template 
cast<AggSinkOperatorX>()
                                         ._offsets_of_aggregate_states[i],
-                                _places.data(), _agg_arena_pool));
+                                _places.data(), 
Base::_shared_state->agg_arena_pool));
             }
         }
     } else {
@@ -375,15 +375,15 @@ Status 
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
                                         _places.data(),
                                         Base::_parent->template 
cast<AggSinkOperatorX>()
                                                 
._offsets_of_aggregate_states[i],
-                                        _deserialize_buffer.data(), 
column.get(), _agg_arena_pool,
-                                        rows);
+                                        _deserialize_buffer.data(), 
column.get(),
+                                        Base::_shared_state->agg_arena_pool, 
rows);
                     }
                 } else {
                     
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add(
                             block,
                             Base::_parent->template cast<AggSinkOperatorX>()
                                     ._offsets_of_aggregate_states[i],
-                            _places.data(), _agg_arena_pool));
+                            _places.data(), 
Base::_shared_state->agg_arena_pool));
                 }
             }
         }
@@ -423,20 +423,20 @@ Status 
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
                             _agg_data->without_key +
                                     Base::_parent->template 
cast<AggSinkOperatorX>()
                                             ._offsets_of_aggregate_states[i],
-                            *column, _agg_arena_pool);
+                            *column, Base::_shared_state->agg_arena_pool);
         } else {
             
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
                     block,
                     _agg_data->without_key + Base::_parent->template 
cast<AggSinkOperatorX>()
                                                      
._offsets_of_aggregate_states[i],
-                    _agg_arena_pool));
+                    Base::_shared_state->agg_arena_pool));
         }
     }
     return Status::OK();
 }
 
 void AggSinkLocalState::_update_memusage_without_key() {
-    int64_t arena_memory_usage = _agg_arena_pool.size();
+    int64_t arena_memory_usage = Base::_shared_state->agg_arena_pool.size();
     COUNTER_SET(_memory_used_counter, arena_memory_usage);
     COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
 }
@@ -487,7 +487,7 @@ Status 
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
                             block,
                             Base::_parent->template cast<AggSinkOperatorX>()
                                     ._offsets_of_aggregate_states[i],
-                            _places.data(), _agg_arena_pool));
+                            _places.data(), 
Base::_shared_state->agg_arena_pool));
         }
     } else {
         auto do_aggregate_evaluators = [&] {
@@ -496,7 +496,7 @@ Status 
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
                         block,
                         Base::_parent->template cast<AggSinkOperatorX>()
                                 ._offsets_of_aggregate_states[i],
-                        _places.data(), _agg_arena_pool));
+                        _places.data(), Base::_shared_state->agg_arena_pool));
             }
             return Status::OK();
         };
@@ -550,8 +550,8 @@ void 
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
                            agg_method.init_serialized_keys(key_columns, 
num_rows);
 
                            auto creator = [this](const auto& ctor, auto& key, 
auto& origin) {
-                               HashMethodType::try_presis_key_and_origin(key, 
origin,
-                                                                         
_agg_arena_pool);
+                               HashMethodType::try_presis_key_and_origin(
+                                       key, origin, 
Base::_shared_state->agg_arena_pool);
                                auto mapped =
                                        
Base::_shared_state->aggregate_data_container->append_data(
                                                origin);
@@ -563,7 +563,7 @@ void 
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
                            };
 
                            auto creator_for_null_key = [&](auto& mapped) {
-                               mapped = _agg_arena_pool.aligned_alloc(
+                               mapped = 
Base::_shared_state->agg_arena_pool.aligned_alloc(
                                        Base::_parent->template 
cast<AggSinkOperatorX>()
                                                
._total_size_of_aggregate_states,
                                        Base::_parent->template 
cast<AggSinkOperatorX>()
@@ -627,8 +627,8 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
 
                             auto creator = [&](const auto& ctor, auto& key, 
auto& origin) {
                                 try {
-                                    
HashMethodType::try_presis_key_and_origin(key, origin,
-                                                                              
_agg_arena_pool);
+                                    HashMethodType::try_presis_key_and_origin(
+                                            key, origin, 
Base::_shared_state->agg_arena_pool);
                                     _shared_state->refresh_top_limit(i, 
key_columns);
                                     auto mapped =
                                             
_shared_state->aggregate_data_container->append_data(
@@ -647,7 +647,7 @@ bool 
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
                             };
 
                             auto creator_for_null_key = [&](auto& mapped) {
-                                mapped = _agg_arena_pool.aligned_alloc(
+                                mapped = 
Base::_shared_state->agg_arena_pool.aligned_alloc(
                                         Base::_parent->template 
cast<AggSinkOperatorX>()
                                                 
._total_size_of_aggregate_states,
                                         Base::_parent->template 
cast<AggSinkOperatorX>()
@@ -909,7 +909,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* 
state) {
     auto& ss = *local_state.Base::_shared_state;
     RETURN_IF_ERROR(ss.reset_hash_table());
     local_state._serialize_key_arena_memory_usage->set((int64_t)0);
-    local_state._agg_arena_pool.clear(true);
+    local_state.Base::_shared_state->agg_arena_pool.clear(true);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index e0dcb3884dc..a1454c114c6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -122,8 +122,6 @@ protected:
     vectorized::Block _preagg_block = vectorized::Block();
 
     AggregatedDataVariants* _agg_data = nullptr;
-    vectorized::Arena _agg_arena_pool;
-    vectorized::Arena _agg_profile_arena;
 
     std::unique_ptr<ExecutorBase> _executor = nullptr;
 
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 24e13d5c0ec..989a7cd478f 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -514,7 +514,8 @@ Status 
AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
             SCOPED_TIMER(_deserialize_data_timer);
             
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
                     _places.data(), 
_shared_state->offsets_of_aggregate_states[i],
-                    _deserialize_buffer.data(), column.get(), _agg_arena_pool, 
rows);
+                    _deserialize_buffer.data(), column.get(), 
Base::_shared_state->agg_arena_pool,
+                    rows);
         }
     }
 
@@ -558,7 +559,8 @@ void 
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
                         agg_method.init_serialized_keys(key_columns, num_rows);
 
                         auto creator = [this](const auto& ctor, auto& key, 
auto& origin) {
-                            HashMethodType::try_presis_key_and_origin(key, 
origin, _agg_arena_pool);
+                            HashMethodType::try_presis_key_and_origin(
+                                    key, origin, 
Base::_shared_state->agg_arena_pool);
                             auto mapped =
                                     
Base::_shared_state->aggregate_data_container->append_data(
                                             origin);
@@ -570,7 +572,7 @@ void 
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
                         };
 
                         auto creator_for_null_key = [&](auto& mapped) {
-                            mapped = _agg_arena_pool.aligned_alloc(
+                            mapped = 
Base::_shared_state->agg_arena_pool.aligned_alloc(
                                     
_shared_state->total_size_of_aggregate_states,
                                     _shared_state->align_aggregate_states);
                             auto st = _create_agg_status(mapped);
@@ -595,8 +597,9 @@ void 
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
                                 _memory_usage_container,
                                 static_cast<int64_t>(
                                         
_shared_state->aggregate_data_container->memory_usage()));
-                        COUNTER_SET(_memory_usage_arena,
-                                    
static_cast<int64_t>(_agg_arena_pool.size()));
+                        COUNTER_SET(
+                                _memory_usage_arena,
+                                
static_cast<int64_t>(Base::_shared_state->agg_arena_pool.size()));
                     }},
             _shared_state->agg_data->method_variant);
 }
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index d2cff32246a..1bf4edabf5d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -68,7 +68,6 @@ protected:
                                   vectorized::ColumnRawPtrs& key_columns, 
uint32_t num_rows);
 
     vectorized::PODArray<vectorized::AggregateDataPtr> _places;
-    vectorized::Arena _agg_arena_pool;
     std::vector<char> _deserialize_buffer;
 
     RuntimeProfile::Counter* _get_results_timer = nullptr;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 37a6cd5fe51..5e777b4dfe5 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -168,8 +168,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
                 
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
     }
 
-    _fn_place_ptr = 
_agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
-                                                  p._align_aggregate_states);
+    _fn_place_ptr = 
_shared_state->agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
+                                                                
p._align_aggregate_states);
     _create_agg_status();
     return Status::OK();
 }
@@ -388,13 +388,14 @@ void 
AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6
             _agg_functions[i]->function()->execute_function_with_incremental(
                     partition_start, partition_end, frame_start, frame_end,
                     _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
-                    _agg_arena_pool, false, false, false, &_use_null_result[i],
+                    _shared_state->agg_arena_pool, false, false, false, 
&_use_null_result[i],
                     &_could_use_previous_result[i]);
         } else {
             _agg_functions[i]->function()->add_range_single_place(
                     partition_start, partition_end, frame_start, frame_end,
                     _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
-                    _agg_arena_pool, &(_use_null_result[i]), 
&_could_use_previous_result[i]);
+                    _shared_state->agg_arena_pool, &(_use_null_result[i]),
+                    &_could_use_previous_result[i]);
         }
     }
 }
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 4e7d82679f3..6b00386ac3d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -133,7 +133,6 @@ private:
     size_t _agg_functions_size = 0;
     bool _agg_functions_created = false;
     vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
-    vectorized::Arena _agg_arena_pool;
     std::vector<vectorized::AggFnEvaluator*> _agg_functions;
     std::vector<size_t> _offsets_of_aggregate_states;
     std::vector<bool> _result_column_nullable_flags;
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 1331a9e38ae..6363355db73 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -60,9 +60,7 @@ 
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
                                                                OperatorXBase* 
parent)
         : PipelineXLocalState<FakeSharedState>(state, parent),
           batch_size(state->batch_size()),
-          _agg_arena_pool(std::make_unique<vectorized::Arena>()),
           _agg_data(std::make_unique<DistinctDataVariants>()),
-          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
           _child_block(vectorized::Block::create_unique()),
           _aggregated_block(vectorized::Block::create_unique()) {}
 
@@ -459,6 +457,8 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* 
state) {
         }
     }
     _cache_block.clear();
+
+    _arena.clear();
     return Base::close(state);
 }
 
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 3a2ecf5ac4f..50c43e956cb 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -72,11 +72,9 @@ private:
     bool _should_expand_hash_table = true;
     bool _stop_emplace_flag = false;
     const int batch_size;
-    std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
     std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
-    std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
     std::unique_ptr<vectorized::Block> _child_block = nullptr;
     bool _child_eos = false;
     bool _reach_limit = false;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index d1580b4f3ac..ae2024f831d 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -128,7 +128,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     vectorized::HashTableBuild<HashTableCtxType, is_intersect>
                             hash_table_build_process(&local_state, 
uint32_t(rows), raw_ptrs, state);
-                    st = hash_table_build_process(arg, local_state._arena);
+                    st = hash_table_build_process(arg, 
local_state._shared_state->arena);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
                     __builtin_unreachable();
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index b076564202f..2df817f6ab4 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -57,7 +57,6 @@ private:
     vectorized::MutableBlock _mutable_block;
     // every child has its result expr list
     vectorized::VExprContextSPtrs _child_exprs;
-    vectorized::Arena _arena;
 
     RuntimeProfile::Counter* _merge_block_timer = nullptr;
     RuntimeProfile::Counter* _build_timer = nullptr;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index df943e64224..9ac8507cc56 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -80,7 +80,6 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
 StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, 
OperatorXBase* parent)
         : Base(state, parent),
           _agg_data(std::make_unique<AggregatedDataVariants>()),
-          _agg_profile_arena(std::make_unique<vectorized::Arena>()),
           _child_block(vectorized::Block::create_unique()),
           _pre_aggregated_block(vectorized::Block::create_unique()) {}
 
@@ -760,6 +759,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) {
                    _agg_data->method_variant);
     }
     _close_with_serialized_key();
+    _agg_arena_pool.clear(true);
     return Base::close(state);
 }
 
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 663431d26b0..001ac300970 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -94,7 +94,6 @@ private:
     std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
-    std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
     std::unique_ptr<AggregateDataContainer> _aggregate_data_container = 
nullptr;
     bool _should_limit_output = false;
     bool _reach_limit = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to