This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9154b5471a2 [fix](merge-iterator) Fix mem leak when get next batch
failed (#33627)
9154b5471a2 is described below
commit 9154b5471a206725b704a019a71bce880bf61164
Author: Xin Liao <[email protected]>
AuthorDate: Sun Apr 14 16:37:23 2024 +0800
[fix](merge-iterator) Fix mem leak when get next batch failed (#33627)
---
be/src/vec/olap/vgeneric_iterators.cpp | 4 ++--
be/src/vec/olap/vgeneric_iterators.h | 24 ++++++++----------------
2 files changed, 10 insertions(+), 18 deletions(-)
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 26fe590dd60..4e3df66cd0f 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -334,14 +334,14 @@ Status VMergeIterator::init(const StorageReadOptions&
opts) {
_record_rowids = opts.record_rowids;
for (auto& iter : _origin_iters) {
- auto ctx = std::make_unique<VMergeIteratorContext>(std::move(iter),
_sequence_id_idx,
+ auto ctx = std::make_shared<VMergeIteratorContext>(std::move(iter),
_sequence_id_idx,
_is_unique,
_is_reverse,
opts.read_orderby_key_columns);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
}
- _merge_heap.push(ctx.release());
+ _merge_heap.push(ctx);
}
_origin_iters.clear();
diff --git a/be/src/vec/olap/vgeneric_iterators.h
b/be/src/vec/olap/vgeneric_iterators.h
index d67bb68fefa..89eb130348f 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -194,13 +194,7 @@ public:
_is_reverse(is_reverse),
_merged_rows(merged_rows) {}
- ~VMergeIterator() override {
- while (!_merge_heap.empty()) {
- auto ctx = _merge_heap.top();
- _merge_heap.pop();
- delete ctx;
- }
- }
+ ~VMergeIterator() override = default;
Status init(const StorageReadOptions& opts) override;
@@ -232,7 +226,7 @@ private:
_block_row_locations.resize(_block_row_max);
}
size_t row_idx = 0;
- VMergeIteratorContext* pre_ctx = nullptr;
+ std::shared_ptr<VMergeIteratorContext> pre_ctx;
while (_get_size(block) < _block_row_max) {
if (_merge_heap.empty()) {
break;
@@ -249,7 +243,7 @@ private:
}
pre_ctx = ctx;
}
- pre_ctx->set_pre_ctx_same(ctx);
+ pre_ctx->set_pre_ctx_same(ctx.get());
if (UNLIKELY(_record_rowids)) {
_block_row_locations[row_idx] =
ctx->current_row_location();
}
@@ -272,9 +266,6 @@ private:
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
_merge_heap.push(ctx);
- } else {
- // Release ctx earlier to reduce resource consumed
- delete ctx;
}
}
if (!_merge_heap.empty()) {
@@ -295,14 +286,15 @@ private:
const Schema* _schema = nullptr;
struct VMergeContextComparator {
- bool operator()(const VMergeIteratorContext* lhs, const
VMergeIteratorContext* rhs) const {
+ bool operator()(const std::shared_ptr<VMergeIteratorContext>& lhs,
+ const std::shared_ptr<VMergeIteratorContext>& rhs)
const {
return lhs->compare(*rhs);
}
};
- using VMergeHeap =
- std::priority_queue<VMergeIteratorContext*,
std::vector<VMergeIteratorContext*>,
- VMergeContextComparator>;
+ using VMergeHeap =
std::priority_queue<std::shared_ptr<VMergeIteratorContext>,
+
std::vector<std::shared_ptr<VMergeIteratorContext>>,
+ VMergeContextComparator>;
VMergeHeap _merge_heap;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]