This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 539cc31ebe4 branch-4.0: [opt](Arena)Release Arena memory earlier in
pipeline operators. #59045 (#59632)
539cc31ebe4 is described below
commit 539cc31ebe422aad5c7613385ccc61c561733823
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jan 8 11:30:08 2026 +0800
branch-4.0: [opt](Arena)Release Arena memory earlier in pipeline operators.
#59045 (#59632)
Cherry-picked from #59045
Co-authored-by: Mryange <[email protected]>
---
be/src/pipeline/dependency.h | 6 +++
be/src/pipeline/exec/aggregation_sink_operator.cpp | 46 +++++++++++-----------
be/src/pipeline/exec/aggregation_sink_operator.h | 2 -
.../pipeline/exec/aggregation_source_operator.cpp | 13 +++---
be/src/pipeline/exec/aggregation_source_operator.h | 1 -
be/src/pipeline/exec/analytic_sink_operator.cpp | 9 +++--
be/src/pipeline/exec/analytic_sink_operator.h | 1 -
.../distinct_streaming_aggregation_operator.cpp | 4 +-
.../exec/distinct_streaming_aggregation_operator.h | 2 -
be/src/pipeline/exec/set_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_sink_operator.h | 1 -
.../exec/streaming_aggregation_operator.cpp | 2 +-
.../pipeline/exec/streaming_aggregation_operator.h | 1 -
13 files changed, 46 insertions(+), 44 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 626e174045b..036ca3a90de 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -382,6 +382,9 @@ public:
// Refresh the top limit heap with a new row
void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs&
key_columns);
+ vectorized::Arena agg_arena_pool;
+ vectorized::Arena agg_profile_arena;
+
private:
vectorized::MutableColumns _get_keys_hash_table();
@@ -580,6 +583,7 @@ public:
std::mutex buffer_mutex;
bool sink_eos = false;
std::mutex sink_eos_lock;
+ vectorized::Arena agg_arena_pool;
};
struct JoinSharedState : public BasicSharedState {
@@ -702,6 +706,8 @@ public:
std::atomic<bool> ready_for_read = false;
+ vectorized::Arena arena;
+
/// called in setup_local_state
Status hash_table_init();
};
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index f666a4122c0..63648ab8146 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -102,8 +102,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
}
if (Base::_shared_state->probe_expr_ctxs.empty()) {
- _agg_data->without_key =
-
reinterpret_cast<vectorized::AggregateDataPtr>(_agg_profile_arena.aligned_alloc(
+ _agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
+ Base::_shared_state->agg_profile_arena.aligned_alloc(
p._total_size_of_aggregate_states,
p._align_aggregate_states));
if (p._is_merge) {
@@ -187,7 +187,7 @@ Status
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
block,
_agg_data->without_key + Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _agg_arena_pool));
+ Base::_shared_state->agg_arena_pool));
}
return Status::OK();
}
@@ -207,7 +207,7 @@ size_t AggSinkLocalState::_memory_usage() const {
return 0;
}
size_t usage = 0;
- usage += _agg_arena_pool.size();
+ usage += Base::_shared_state->agg_arena_pool.size();
if (Base::_shared_state->aggregate_data_container) {
usage += Base::_shared_state->aggregate_data_container->memory_usage();
@@ -240,7 +240,7 @@ void
AggSinkLocalState::_update_memusage_with_serialized_key() {
},
[&](auto& agg_method) -> void {
auto& data = *agg_method.hash_table;
- int64_t memory_usage_arena = _agg_arena_pool.size();
+ int64_t memory_usage_arena =
Base::_shared_state->agg_arena_pool.size();
int64_t memory_usage_container =
_shared_state->aggregate_data_container->memory_usage();
int64_t hash_table_memory_usage =
data.get_buffer_size_in_bytes();
@@ -321,8 +321,8 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _deserialize_buffer.data(), column.get(),
_agg_arena_pool,
- rows);
+ _deserialize_buffer.data(), column.get(),
+ Base::_shared_state->agg_arena_pool, rows);
}
} else {
RETURN_IF_ERROR(
@@ -330,7 +330,7 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
block,
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(),
Base::_shared_state->agg_arena_pool));
}
}
} else {
@@ -375,15 +375,15 @@ Status
AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
_places.data(),
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _deserialize_buffer.data(),
column.get(), _agg_arena_pool,
- rows);
+ _deserialize_buffer.data(),
column.get(),
+ Base::_shared_state->agg_arena_pool,
rows);
}
} else {
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add(
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(),
Base::_shared_state->agg_arena_pool));
}
}
}
@@ -423,20 +423,20 @@ Status
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
_agg_data->without_key +
Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- *column, _agg_arena_pool);
+ *column, Base::_shared_state->agg_arena_pool);
} else {
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
block,
_agg_data->without_key + Base::_parent->template
cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _agg_arena_pool));
+ Base::_shared_state->agg_arena_pool));
}
}
return Status::OK();
}
void AggSinkLocalState::_update_memusage_without_key() {
- int64_t arena_memory_usage = _agg_arena_pool.size();
+ int64_t arena_memory_usage = Base::_shared_state->agg_arena_pool.size();
COUNTER_SET(_memory_used_counter, arena_memory_usage);
COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage);
}
@@ -487,7 +487,7 @@ Status
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(),
Base::_shared_state->agg_arena_pool));
}
} else {
auto do_aggregate_evaluators = [&] {
@@ -496,7 +496,7 @@ Status
AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
block,
Base::_parent->template cast<AggSinkOperatorX>()
._offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool));
+ _places.data(), Base::_shared_state->agg_arena_pool));
}
return Status::OK();
};
@@ -550,8 +550,8 @@ void
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
agg_method.init_serialized_keys(key_columns,
num_rows);
auto creator = [this](const auto& ctor, auto& key,
auto& origin) {
- HashMethodType::try_presis_key_and_origin(key,
origin,
-
_agg_arena_pool);
+ HashMethodType::try_presis_key_and_origin(
+ key, origin,
Base::_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
@@ -563,7 +563,7 @@ void
AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* p
};
auto creator_for_null_key = [&](auto& mapped) {
- mapped = _agg_arena_pool.aligned_alloc(
+ mapped =
Base::_shared_state->agg_arena_pool.aligned_alloc(
Base::_parent->template
cast<AggSinkOperatorX>()
._total_size_of_aggregate_states,
Base::_parent->template
cast<AggSinkOperatorX>()
@@ -627,8 +627,8 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
auto creator = [&](const auto& ctor, auto& key,
auto& origin) {
try {
-
HashMethodType::try_presis_key_and_origin(key, origin,
-
_agg_arena_pool);
+ HashMethodType::try_presis_key_and_origin(
+ key, origin,
Base::_shared_state->agg_arena_pool);
_shared_state->refresh_top_limit(i,
key_columns);
auto mapped =
_shared_state->aggregate_data_container->append_data(
@@ -647,7 +647,7 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
};
auto creator_for_null_key = [&](auto& mapped) {
- mapped = _agg_arena_pool.aligned_alloc(
+ mapped =
Base::_shared_state->agg_arena_pool.aligned_alloc(
Base::_parent->template
cast<AggSinkOperatorX>()
._total_size_of_aggregate_states,
Base::_parent->template
cast<AggSinkOperatorX>()
@@ -909,7 +909,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState*
state) {
auto& ss = *local_state.Base::_shared_state;
RETURN_IF_ERROR(ss.reset_hash_table());
local_state._serialize_key_arena_memory_usage->set((int64_t)0);
- local_state._agg_arena_pool.clear(true);
+ local_state.Base::_shared_state->agg_arena_pool.clear(true);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index e0dcb3884dc..a1454c114c6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -122,8 +122,6 @@ protected:
vectorized::Block _preagg_block = vectorized::Block();
AggregatedDataVariants* _agg_data = nullptr;
- vectorized::Arena _agg_arena_pool;
- vectorized::Arena _agg_profile_arena;
std::unique_ptr<ExecutorBase> _executor = nullptr;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 24e13d5c0ec..989a7cd478f 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -514,7 +514,8 @@ Status
AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
SCOPED_TIMER(_deserialize_data_timer);
Base::_shared_state->aggregate_evaluators[i]->function()->deserialize_and_merge_vec(
_places.data(),
_shared_state->offsets_of_aggregate_states[i],
- _deserialize_buffer.data(), column.get(), _agg_arena_pool,
rows);
+ _deserialize_buffer.data(), column.get(),
Base::_shared_state->agg_arena_pool,
+ rows);
}
}
@@ -558,7 +559,8 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
agg_method.init_serialized_keys(key_columns, num_rows);
auto creator = [this](const auto& ctor, auto& key,
auto& origin) {
- HashMethodType::try_presis_key_and_origin(key,
origin, _agg_arena_pool);
+ HashMethodType::try_presis_key_and_origin(
+ key, origin,
Base::_shared_state->agg_arena_pool);
auto mapped =
Base::_shared_state->aggregate_data_container->append_data(
origin);
@@ -570,7 +572,7 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
};
auto creator_for_null_key = [&](auto& mapped) {
- mapped = _agg_arena_pool.aligned_alloc(
+ mapped =
Base::_shared_state->agg_arena_pool.aligned_alloc(
_shared_state->total_size_of_aggregate_states,
_shared_state->align_aggregate_states);
auto st = _create_agg_status(mapped);
@@ -595,8 +597,9 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
_memory_usage_container,
static_cast<int64_t>(
_shared_state->aggregate_data_container->memory_usage()));
- COUNTER_SET(_memory_usage_arena,
-
static_cast<int64_t>(_agg_arena_pool.size()));
+ COUNTER_SET(
+ _memory_usage_arena,
+
static_cast<int64_t>(Base::_shared_state->agg_arena_pool.size()));
}},
_shared_state->agg_data->method_variant);
}
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index d2cff32246a..1bf4edabf5d 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -68,7 +68,6 @@ protected:
vectorized::ColumnRawPtrs& key_columns,
uint32_t num_rows);
vectorized::PODArray<vectorized::AggregateDataPtr> _places;
- vectorized::Arena _agg_arena_pool;
std::vector<char> _deserialize_buffer;
RuntimeProfile::Counter* _get_results_timer = nullptr;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 37a6cd5fe51..5e777b4dfe5 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -168,8 +168,8 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
}
- _fn_place_ptr =
_agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
- p._align_aggregate_states);
+ _fn_place_ptr =
_shared_state->agg_arena_pool.aligned_alloc(p._total_size_of_aggregate_states,
+
p._align_aggregate_states);
_create_agg_status();
return Status::OK();
}
@@ -388,13 +388,14 @@ void
AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int6
_agg_functions[i]->function()->execute_function_with_incremental(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
- _agg_arena_pool, false, false, false, &_use_null_result[i],
+ _shared_state->agg_arena_pool, false, false, false,
&_use_null_result[i],
&_could_use_previous_result[i]);
} else {
_agg_functions[i]->function()->add_range_single_place(
partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
- _agg_arena_pool, &(_use_null_result[i]),
&_could_use_previous_result[i]);
+ _shared_state->agg_arena_pool, &(_use_null_result[i]),
+ &_could_use_previous_result[i]);
}
}
}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 4e7d82679f3..6b00386ac3d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -133,7 +133,6 @@ private:
size_t _agg_functions_size = 0;
bool _agg_functions_created = false;
vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
- vectorized::Arena _agg_arena_pool;
std::vector<vectorized::AggFnEvaluator*> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
std::vector<bool> _result_column_nullable_flags;
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 1331a9e38ae..6363355db73 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -60,9 +60,7 @@
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
OperatorXBase*
parent)
: PipelineXLocalState<FakeSharedState>(state, parent),
batch_size(state->batch_size()),
- _agg_arena_pool(std::make_unique<vectorized::Arena>()),
_agg_data(std::make_unique<DistinctDataVariants>()),
- _agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_aggregated_block(vectorized::Block::create_unique()) {}
@@ -459,6 +457,8 @@ Status DistinctStreamingAggLocalState::close(RuntimeState*
state) {
}
}
_cache_block.clear();
+
+ _arena.clear();
return Base::close(state);
}
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 3a2ecf5ac4f..50c43e956cb 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -72,11 +72,9 @@ private:
bool _should_expand_hash_table = true;
bool _stop_emplace_flag = false;
const int batch_size;
- std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
- std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
std::unique_ptr<vectorized::Block> _child_block = nullptr;
bool _child_eos = false;
bool _reach_limit = false;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index d1580b4f3ac..ae2024f831d 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -128,7 +128,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(&local_state,
uint32_t(rows), raw_ptrs, state);
- st = hash_table_build_process(arg, local_state._arena);
+ st = hash_table_build_process(arg,
local_state._shared_state->arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index b076564202f..2df817f6ab4 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -57,7 +57,6 @@ private:
vectorized::MutableBlock _mutable_block;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
- vectorized::Arena _arena;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index df943e64224..9ac8507cc56 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -80,7 +80,6 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
_agg_data(std::make_unique<AggregatedDataVariants>()),
- _agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_pre_aggregated_block(vectorized::Block::create_unique()) {}
@@ -760,6 +759,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) {
_agg_data->method_variant);
}
_close_with_serialized_key();
+ _agg_arena_pool.clear(true);
return Base::close(state);
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 663431d26b0..001ac300970 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -94,7 +94,6 @@ private:
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
- std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
std::unique_ptr<AggregateDataContainer> _aggregate_data_container =
nullptr;
bool _should_limit_output = false;
bool _reach_limit = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]