This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0a447273f07 [Bug](pipelineX) fix memleak in stream agg (#32862)
0a447273f07 is described below

commit 0a447273f07993a645620969b983d92d416e0327
Author: HappenLee <[email protected]>
AuthorDate: Wed Mar 27 09:20:29 2024 +0800

    [Bug](pipelineX) fix memleak in stream agg (#32862)
    
    * [Bug](pipelineX) fix memleak in stream agg
    
    * change by review
---
 .../exec/streaming_aggregation_operator.cpp         |  9 +++++++++
 .../pipeline/exec/streaming_aggregation_operator.h  | 21 +++++++++++++++++++++
 be/src/pipeline/pipeline_x/dependency.h             |  1 +
 .../aggregate_functions/aggregate_function_uniq.h   |  1 +
 4 files changed, 32 insertions(+)

diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 31ae9ba423a..1ec283bdc1f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -176,6 +176,7 @@ Status StreamingAggLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
                                (!p._have_conjuncts) && // no having conjunct
                                p._needs_finalize;      // agg's finalize step
     }
+    _init = true;
     return Status::OK();
 }
 
@@ -1076,6 +1077,13 @@ Status 
StreamingAggLocalState::_get_without_key_result(RuntimeState* state,
     return Status::OK();
 }
 
+void StreamingAggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr 
data) {
+    for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+        _aggregate_evaluators[i]->function()->destroy(
+                data + 
_parent->cast<StreamingAggOperatorX>()._offsets_of_aggregate_states[i]);
+    }
+}
+
 void 
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* 
places,
                                                       
vectorized::ColumnRawPtrs& key_columns,
                                                       const size_t num_rows) {
@@ -1260,6 +1268,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) 
{
                 },
                 _agg_data->method_variant);
     }
+    _close_with_serialized_key();
     return Base::close(state);
 }
 
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 9125437f4ab..1ccb7e31d0f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -182,6 +182,27 @@ private:
     bool _child_eos = false;
     std::unique_ptr<vectorized::Block> _pre_aggregated_block = nullptr;
     std::vector<vectorized::AggregateDataPtr> _values;
+    bool _init = false;
+
+    void _destroy_agg_status(vectorized::AggregateDataPtr data);
+
+    void _close_with_serialized_key() {
+        std::visit(
+                [&](auto&& agg_method) -> void {
+                    auto& data = *agg_method.hash_table;
+                    data.for_each_mapped([&](auto& mapped) {
+                        if (mapped) {
+                            _destroy_agg_status(mapped);
+                            mapped = nullptr;
+                        }
+                    });
+                    if (data.has_null_key_data()) {
+                        _destroy_agg_status(
+                                data.template 
get_null_key_data<vectorized::AggregateDataPtr>());
+                    }
+                },
+                _agg_data->method_variant);
+    }
 };
 
 class StreamingAggOperatorX final : public 
StatefulOperatorX<StreamingAggLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 7815d4a9ce0..fe95c1c4470 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -399,6 +399,7 @@ private:
                 },
                 agg_data->method_variant);
     }
+
     void _close_without_key() {
         //because prepare maybe failed, and couldn't create agg data.
         //but finally call close to destory agg data, if agg data has 
bitmapValue
diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h 
b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
index 727a145c45a..72be9e01833 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
@@ -68,6 +68,7 @@ struct AggregateFunctionUniqExactData {
 
     using Set = flat_hash_set<Key, Hash>;
 
+    // TODO: replace SipHash with xxhash to speed up
     static UInt128 ALWAYS_INLINE get_key(const StringRef& value) {
         UInt128 key;
         SipHash hash;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to