This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a2bbaab89ff [fix](iterator) Fix mem leak when initial iterator failed
(#28480) (#28538)
a2bbaab89ff is described below
commit a2bbaab89ff4ae6749a2911f502f2d297276d2b7
Author: walter <[email protected]>
AuthorDate: Mon Dec 18 15:01:04 2023 +0800
[fix](iterator) Fix mem leak when initial iterator failed (#28480) (#28538)
---
be/src/vec/olap/vertical_merge_iterator.cpp | 51 +++++++++++++++--------------
be/src/vec/olap/vertical_merge_iterator.h | 36 +++++++++-----------
2 files changed, 42 insertions(+), 45 deletions(-)
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp
b/be/src/vec/olap/vertical_merge_iterator.cpp
index e3941966173..e7abdb95457 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -458,23 +458,22 @@ Status VerticalHeapMergeIterator::next_batch(Block*
block) {
_merge_heap.push(ctx);
} else {
// push next iterator in same rowset into heap
- auto cur_order = ctx->order();
- while (cur_order + 1 < _iterator_init_flags.size() &&
- !_iterator_init_flags[cur_order + 1]) {
- auto next_ctx = _ori_iter_ctx[cur_order + 1];
+ size_t cur_order = ctx->order();
+ for (size_t next_order = cur_order + 1;
+ next_order < _iterator_init_flags.size() &&
!_iterator_init_flags[next_order];
+ ++next_order) {
+ auto& next_ctx = _ori_iter_ctx[next_order];
DCHECK(next_ctx);
RETURN_IF_ERROR(next_ctx->init(_opts));
- if (!next_ctx->valid()) {
- // next_ctx is empty segment, move to next
- ++cur_order;
- delete next_ctx;
- continue;
+ if (next_ctx->valid()) {
+ _merge_heap.push(next_ctx.get());
+ break;
}
- _merge_heap.push(next_ctx);
- break;
+ // next_ctx is empty segment, move to next
+ next_ctx.reset();
}
// Release ctx earlier to reduce resource consumed
- delete ctx;
+ _ori_iter_ctx[cur_order].reset();
}
}
RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources));
@@ -495,7 +494,15 @@ Status VerticalHeapMergeIterator::init(const
StorageReadOptions& opts) {
}
_schema = &(*_origin_iters.begin())->schema();
- auto seg_order = 0;
+ size_t num_iters = _origin_iters.size();
+ for (size_t seg_order = 0; seg_order < num_iters; ++seg_order) {
+ auto& iter = _origin_iters[seg_order];
+ auto ctx = std::make_unique<VerticalMergeIteratorContext>(
+ std::move(iter), _rowset_ids[seg_order], _ori_return_cols,
seg_order, _seq_col_idx);
+ _ori_iter_ctx.push_back(std::move(ctx));
+ }
+ _origin_iters.clear();
+
// Init contxt depends on _iterator_init_flags
// for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator
needs
// to be inited and [0-2] is in same rowset.
@@ -503,24 +510,18 @@ Status VerticalHeapMergeIterator::init(const
StorageReadOptions& opts) {
// will not be pushed into heap, we should init next one util we find a
valid iter
// so this rowset can work in heap
bool pre_iter_invalid = false;
- for (auto& iter : _origin_iters) {
- VerticalMergeIteratorContext* ctx = new VerticalMergeIteratorContext(
- std::move(iter), _rowset_ids[seg_order], _ori_return_cols,
seg_order, _seq_col_idx);
- _ori_iter_ctx.push_back(ctx);
- if (_iterator_init_flags[seg_order] || pre_iter_invalid) {
+ for (size_t i = 0; i < num_iters; ++i) {
+ if (_iterator_init_flags[i] || pre_iter_invalid) {
+ auto& ctx = _ori_iter_ctx[i];
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
pre_iter_invalid = true;
- ++seg_order;
- delete ctx;
continue;
}
- _merge_heap.push(ctx);
+ _merge_heap.push(ctx.get());
pre_iter_invalid = false;
}
- ++seg_order;
}
- _origin_iters.clear();
_opts = opts;
_block_row_max = opts.block_row_max;
@@ -622,7 +623,7 @@ Status VerticalFifoMergeIterator::init(const
StorageReadOptions& opts) {
// ---------------- VerticalMaskMergeIterator ------------- //
Status VerticalMaskMergeIterator::check_all_iter_finished() {
- for (auto iter : _origin_iter_ctx) {
+ for (auto& iter : _origin_iter_ctx) {
if (iter->inited()) {
if (iter->valid()) {
RETURN_IF_ERROR(iter->advance());
@@ -753,7 +754,7 @@ Status VerticalMaskMergeIterator::init(const
StorageReadOptions& opts) {
for (auto& iter : _origin_iters) {
auto ctx =
std::make_unique<VerticalMergeIteratorContext>(std::move(iter), rs_id,
_ori_return_cols, -1, -1);
- _origin_iter_ctx.emplace_back(ctx.release());
+ _origin_iter_ctx.push_back(std::move(ctx));
}
_origin_iters.clear();
diff --git a/be/src/vec/olap/vertical_merge_iterator.h
b/be/src/vec/olap/vertical_merge_iterator.h
index 70a452b2b6d..fee6afec89f 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -121,7 +121,7 @@ public:
size_t same_source_count(uint16_t source, size_t limit);
- // return continous agg_flag=true count from index
+ // return continuous agg_flag=true count from index
size_t continuous_agg_count(uint64_t index);
private:
@@ -207,7 +207,7 @@ private:
size_t _ori_return_cols = 0;
// segment order, used to compare key
- uint32_t _order = 0;
+ const uint32_t _order = 0;
int32_t _seq_col_idx = -1;
@@ -239,20 +239,16 @@ public:
KeysType keys_type, int32_t seq_col_idx,
RowSourcesBuffer* row_sources_buf)
: _origin_iters(std::move(iters)),
- _iterator_init_flags(iterator_init_flags),
- _rowset_ids(rowset_ids),
+ _iterator_init_flags(std::move(iterator_init_flags)),
+ _rowset_ids(std::move(rowset_ids)),
_ori_return_cols(ori_return_cols),
_keys_type(keys_type),
_seq_col_idx(seq_col_idx),
_row_sources_buf(row_sources_buf) {}
- ~VerticalHeapMergeIterator() override {
- while (!_merge_heap.empty()) {
- auto ctx = _merge_heap.top();
- _merge_heap.pop();
- delete ctx;
- }
- }
+ ~VerticalHeapMergeIterator() override = default;
+ VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete;
+ VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) =
delete;
Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
@@ -287,7 +283,7 @@ private:
VerticalMergeContextComparator>;
VMergeHeap _merge_heap;
- std::vector<VerticalMergeIteratorContext*> _ori_iter_ctx;
+ std::vector<std::unique_ptr<VerticalMergeIteratorContext>> _ori_iter_ctx;
int _block_row_max = 0;
KeysType _keys_type;
int32_t _seq_col_idx = -1;
@@ -308,14 +304,16 @@ public:
KeysType keys_type, int32_t seq_col_idx,
RowSourcesBuffer* row_sources_buf)
: _origin_iters(std::move(iters)),
- _iterator_init_flags(iterator_init_flags),
- _rowset_ids(rowset_ids),
+ _iterator_init_flags(std::move(iterator_init_flags)),
+ _rowset_ids(std::move(rowset_ids)),
_ori_return_cols(ori_return_cols),
_keys_type(keys_type),
_seq_col_idx(seq_col_idx),
_row_sources_buf(row_sources_buf) {}
~VerticalFifoMergeIterator() override = default;
+ VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete;
+ VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) =
delete;
Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
@@ -359,11 +357,9 @@ public:
_ori_return_cols(ori_return_cols),
_row_sources_buf(row_sources_buf) {}
- ~VerticalMaskMergeIterator() override {
- for (auto iter : _origin_iter_ctx) {
- delete iter;
- }
- }
+ ~VerticalMaskMergeIterator() override = default;
+ VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete;
+ VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) =
delete;
Status init(const StorageReadOptions& opts) override;
@@ -386,7 +382,7 @@ private:
std::vector<RowwiseIteratorUPtr> _origin_iters;
size_t _ori_return_cols = 0;
- std::vector<VerticalMergeIteratorContext*> _origin_iter_ctx;
+ std::vector<std::unique_ptr<VerticalMergeIteratorContext>>
_origin_iter_ctx;
const Schema* _schema = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]