This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a2bbaab89ff [fix](iterator) Fix mem leak when initial iterator failed 
(#28480) (#28538)
a2bbaab89ff is described below

commit a2bbaab89ff4ae6749a2911f502f2d297276d2b7
Author: walter <[email protected]>
AuthorDate: Mon Dec 18 15:01:04 2023 +0800

    [fix](iterator) Fix mem leak when initial iterator failed (#28480) (#28538)
---
 be/src/vec/olap/vertical_merge_iterator.cpp | 51 +++++++++++++++--------------
 be/src/vec/olap/vertical_merge_iterator.h   | 36 +++++++++-----------
 2 files changed, 42 insertions(+), 45 deletions(-)

diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index e3941966173..e7abdb95457 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -458,23 +458,22 @@ Status VerticalHeapMergeIterator::next_batch(Block* 
block) {
             _merge_heap.push(ctx);
         } else {
             // push next iterator in same rowset into heap
-            auto cur_order = ctx->order();
-            while (cur_order + 1 < _iterator_init_flags.size() &&
-                   !_iterator_init_flags[cur_order + 1]) {
-                auto next_ctx = _ori_iter_ctx[cur_order + 1];
+            size_t cur_order = ctx->order();
+            for (size_t next_order = cur_order + 1;
+                 next_order < _iterator_init_flags.size() && 
!_iterator_init_flags[next_order];
+                 ++next_order) {
+                auto& next_ctx = _ori_iter_ctx[next_order];
                 DCHECK(next_ctx);
                 RETURN_IF_ERROR(next_ctx->init(_opts));
-                if (!next_ctx->valid()) {
-                    // next_ctx is empty segment, move to next
-                    ++cur_order;
-                    delete next_ctx;
-                    continue;
+                if (next_ctx->valid()) {
+                    _merge_heap.push(next_ctx.get());
+                    break;
                 }
-                _merge_heap.push(next_ctx);
-                break;
+                // next_ctx is empty segment, move to next
+                next_ctx.reset();
             }
             // Release ctx earlier to reduce resource consumed
-            delete ctx;
+            _ori_iter_ctx[cur_order].reset();
         }
     }
     RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources));
@@ -495,7 +494,15 @@ Status VerticalHeapMergeIterator::init(const 
StorageReadOptions& opts) {
     }
     _schema = &(*_origin_iters.begin())->schema();
 
-    auto seg_order = 0;
+    size_t num_iters = _origin_iters.size();
+    for (size_t seg_order = 0; seg_order < num_iters; ++seg_order) {
+        auto& iter = _origin_iters[seg_order];
+        auto ctx = std::make_unique<VerticalMergeIteratorContext>(
+                std::move(iter), _rowset_ids[seg_order], _ori_return_cols, 
seg_order, _seq_col_idx);
+        _ori_iter_ctx.push_back(std::move(ctx));
+    }
+    _origin_iters.clear();
+
     // Init contxt depends on _iterator_init_flags
     // for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator 
needs
     // to be inited and [0-2] is in same rowset.
@@ -503,24 +510,18 @@ Status VerticalHeapMergeIterator::init(const 
StorageReadOptions& opts) {
     // will not be pushed into heap, we should init next one util we find a 
valid iter
     // so this rowset can work in heap
     bool pre_iter_invalid = false;
-    for (auto& iter : _origin_iters) {
-        VerticalMergeIteratorContext* ctx = new VerticalMergeIteratorContext(
-                std::move(iter), _rowset_ids[seg_order], _ori_return_cols, 
seg_order, _seq_col_idx);
-        _ori_iter_ctx.push_back(ctx);
-        if (_iterator_init_flags[seg_order] || pre_iter_invalid) {
+    for (size_t i = 0; i < num_iters; ++i) {
+        if (_iterator_init_flags[i] || pre_iter_invalid) {
+            auto& ctx = _ori_iter_ctx[i];
             RETURN_IF_ERROR(ctx->init(opts));
             if (!ctx->valid()) {
                 pre_iter_invalid = true;
-                ++seg_order;
-                delete ctx;
                 continue;
             }
-            _merge_heap.push(ctx);
+            _merge_heap.push(ctx.get());
             pre_iter_invalid = false;
         }
-        ++seg_order;
     }
-    _origin_iters.clear();
 
     _opts = opts;
     _block_row_max = opts.block_row_max;
@@ -622,7 +623,7 @@ Status VerticalFifoMergeIterator::init(const 
StorageReadOptions& opts) {
 
 //  ----------------  VerticalMaskMergeIterator  -------------  //
 Status VerticalMaskMergeIterator::check_all_iter_finished() {
-    for (auto iter : _origin_iter_ctx) {
+    for (auto& iter : _origin_iter_ctx) {
         if (iter->inited()) {
             if (iter->valid()) {
                 RETURN_IF_ERROR(iter->advance());
@@ -753,7 +754,7 @@ Status VerticalMaskMergeIterator::init(const 
StorageReadOptions& opts) {
     for (auto& iter : _origin_iters) {
         auto ctx = 
std::make_unique<VerticalMergeIteratorContext>(std::move(iter), rs_id,
                                                                   
_ori_return_cols, -1, -1);
-        _origin_iter_ctx.emplace_back(ctx.release());
+        _origin_iter_ctx.push_back(std::move(ctx));
     }
     _origin_iters.clear();
 
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index 70a452b2b6d..fee6afec89f 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -121,7 +121,7 @@ public:
 
     size_t same_source_count(uint16_t source, size_t limit);
 
-    // return continous agg_flag=true count from index
+    // return continuous agg_flag=true count from index
     size_t continuous_agg_count(uint64_t index);
 
 private:
@@ -207,7 +207,7 @@ private:
     size_t _ori_return_cols = 0;
 
     // segment order, used to compare key
-    uint32_t _order = 0;
+    const uint32_t _order = 0;
 
     int32_t _seq_col_idx = -1;
 
@@ -239,20 +239,16 @@ public:
                               KeysType keys_type, int32_t seq_col_idx,
                               RowSourcesBuffer* row_sources_buf)
             : _origin_iters(std::move(iters)),
-              _iterator_init_flags(iterator_init_flags),
-              _rowset_ids(rowset_ids),
+              _iterator_init_flags(std::move(iterator_init_flags)),
+              _rowset_ids(std::move(rowset_ids)),
               _ori_return_cols(ori_return_cols),
               _keys_type(keys_type),
               _seq_col_idx(seq_col_idx),
               _row_sources_buf(row_sources_buf) {}
 
-    ~VerticalHeapMergeIterator() override {
-        while (!_merge_heap.empty()) {
-            auto ctx = _merge_heap.top();
-            _merge_heap.pop();
-            delete ctx;
-        }
-    }
+    ~VerticalHeapMergeIterator() override = default;
+    VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete;
+    VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = 
delete;
 
     Status init(const StorageReadOptions& opts) override;
     Status next_batch(Block* block) override;
@@ -287,7 +283,7 @@ private:
                                            VerticalMergeContextComparator>;
 
     VMergeHeap _merge_heap;
-    std::vector<VerticalMergeIteratorContext*> _ori_iter_ctx;
+    std::vector<std::unique_ptr<VerticalMergeIteratorContext>> _ori_iter_ctx;
     int _block_row_max = 0;
     KeysType _keys_type;
     int32_t _seq_col_idx = -1;
@@ -308,14 +304,16 @@ public:
                               KeysType keys_type, int32_t seq_col_idx,
                               RowSourcesBuffer* row_sources_buf)
             : _origin_iters(std::move(iters)),
-              _iterator_init_flags(iterator_init_flags),
-              _rowset_ids(rowset_ids),
+              _iterator_init_flags(std::move(iterator_init_flags)),
+              _rowset_ids(std::move(rowset_ids)),
               _ori_return_cols(ori_return_cols),
               _keys_type(keys_type),
               _seq_col_idx(seq_col_idx),
               _row_sources_buf(row_sources_buf) {}
 
     ~VerticalFifoMergeIterator() override = default;
+    VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete;
+    VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = 
delete;
 
     Status init(const StorageReadOptions& opts) override;
     Status next_batch(Block* block) override;
@@ -359,11 +357,9 @@ public:
               _ori_return_cols(ori_return_cols),
               _row_sources_buf(row_sources_buf) {}
 
-    ~VerticalMaskMergeIterator() override {
-        for (auto iter : _origin_iter_ctx) {
-            delete iter;
-        }
-    }
+    ~VerticalMaskMergeIterator() override = default;
+    VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete;
+    VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = 
delete;
 
     Status init(const StorageReadOptions& opts) override;
 
@@ -386,7 +382,7 @@ private:
     std::vector<RowwiseIteratorUPtr> _origin_iters;
     size_t _ori_return_cols = 0;
 
-    std::vector<VerticalMergeIteratorContext*> _origin_iter_ctx;
+    std::vector<std::unique_ptr<VerticalMergeIteratorContext>> 
_origin_iter_ctx;
 
     const Schema* _schema = nullptr;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to