This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e4ea2eeafd0 [Bug](pipelineX) fix memleak in stream agg (#32862)
e4ea2eeafd0 is described below
commit e4ea2eeafd0c97d5b02961fd96cc7bc4f2c6efd2
Author: HappenLee <[email protected]>
AuthorDate: Wed Mar 27 09:20:29 2024 +0800
[Bug](pipelineX) fix memleak in stream agg (#32862)
* [Bug](pipelineX) fix memleak in stream agg
* change by review
---
.../exec/streaming_aggregation_operator.cpp | 9 +++++++++
.../pipeline/exec/streaming_aggregation_operator.h | 21 +++++++++++++++++++++
be/src/pipeline/pipeline_x/dependency.h | 1 +
.../aggregate_functions/aggregate_function_uniq.h | 1 +
4 files changed, 32 insertions(+)
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 31ae9ba423a..1ec283bdc1f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -176,6 +176,7 @@ Status StreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
(!p._have_conjuncts) && // no having conjunct
p._needs_finalize; // agg's finalize step
}
+ _init = true;
return Status::OK();
}
@@ -1076,6 +1077,13 @@ Status
StreamingAggLocalState::_get_without_key_result(RuntimeState* state,
return Status::OK();
}
+void StreamingAggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr
data) {
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->function()->destroy(
+ data +
_parent->cast<StreamingAggOperatorX>()._offsets_of_aggregate_states[i]);
+ }
+}
+
void
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr*
places,
vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
@@ -1260,6 +1268,7 @@ Status StreamingAggLocalState::close(RuntimeState* state)
{
},
_agg_data->method_variant);
}
+ _close_with_serialized_key();
return Base::close(state);
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 9125437f4ab..1ccb7e31d0f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -182,6 +182,27 @@ private:
bool _child_eos = false;
std::unique_ptr<vectorized::Block> _pre_aggregated_block = nullptr;
std::vector<vectorized::AggregateDataPtr> _values;
+ bool _init = false;
+
+ void _destroy_agg_status(vectorized::AggregateDataPtr data);
+
+ void _close_with_serialized_key() {
+ std::visit(
+ [&](auto&& agg_method) -> void {
+ auto& data = *agg_method.hash_table;
+ data.for_each_mapped([&](auto& mapped) {
+ if (mapped) {
+ _destroy_agg_status(mapped);
+ mapped = nullptr;
+ }
+ });
+ if (data.has_null_key_data()) {
+ _destroy_agg_status(
+ data.template
get_null_key_data<vectorized::AggregateDataPtr>());
+ }
+ },
+ _agg_data->method_variant);
+ }
};
class StreamingAggOperatorX final : public
StatefulOperatorX<StreamingAggLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 7815d4a9ce0..fe95c1c4470 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -399,6 +399,7 @@ private:
},
agg_data->method_variant);
}
+
void _close_without_key() {
//because prepare maybe failed, and couldn't create agg data.
//but finally call close to destory agg data, if agg data has
bitmapValue
diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
index 727a145c45a..72be9e01833 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
@@ -68,6 +68,7 @@ struct AggregateFunctionUniqExactData {
using Set = flat_hash_set<Key, Hash>;
+ // TODO: replace SipHash with xxhash to speed up
static UInt128 ALWAYS_INLINE get_key(const StringRef& value) {
UInt128 key;
SipHash hash;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]