This is an automated email from the ASF dual-hosted git repository.
lide pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new a49a6c7c644 [fix](merge-iterator) Fix mem leak when get next batch
failed (#41671)
a49a6c7c644 is described below
commit a49a6c7c64403d523166355bef23d88a7094c856
Author: lw112 <[email protected]>
AuthorDate: Mon Oct 14 16:02:14 2024 +0800
[fix](merge-iterator) Fix mem leak when get next batch failed (#41671)
---
be/src/vec/olap/vgeneric_iterators.cpp | 5 +++--
be/src/vec/olap/vgeneric_iterators.h | 26 ++++++++++----------------
2 files changed, 13 insertions(+), 18 deletions(-)
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 75ce50b6c4d..5d9a79d5ef0 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -17,6 +17,7 @@
#include <vec/olap/vgeneric_iterators.h>
+#include <algorithm>
#include <memory>
#include <queue>
#include <utility>
@@ -321,13 +322,13 @@ Status VMergeIterator::init(const StorageReadOptions&
opts) {
_record_rowids = opts.record_rowids;
for (auto iter : _origin_iters) {
- auto ctx = std::make_unique<VMergeIteratorContext>(
+ auto ctx = std::make_shared<VMergeIteratorContext>(
iter, _sequence_id_idx, _is_unique, _is_reverse,
opts.read_orderby_key_columns);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
}
- _merge_heap.push(ctx.release());
+ _merge_heap.push(ctx);
}
_origin_iters.clear();
diff --git a/be/src/vec/olap/vgeneric_iterators.h
b/be/src/vec/olap/vgeneric_iterators.h
index e5bb36d0d34..bd2dc934d98 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include <memory>
+
#include "olap/iterators.h"
#include "olap/row.h"
#include "olap/row_block2.h"
@@ -181,13 +183,7 @@ public:
_is_reverse(is_reverse),
_merged_rows(merged_rows) {}
- ~VMergeIterator() override {
- while (!_merge_heap.empty()) {
- auto ctx = _merge_heap.top();
- _merge_heap.pop();
- delete ctx;
- }
- }
+ ~VMergeIterator() override = default;
Status init(const StorageReadOptions& opts) override;
@@ -221,7 +217,7 @@ private:
_block_row_locations.resize(_block_row_max);
}
size_t row_idx = 0;
- VMergeIteratorContext* pre_ctx = nullptr;
+ std::shared_ptr<VMergeIteratorContext> pre_ctx;
while (_get_size(block) < _block_row_max) {
if (_merge_heap.empty()) {
break;
@@ -238,7 +234,7 @@ private:
}
pre_ctx = ctx;
}
- pre_ctx->set_pre_ctx_same(ctx);
+ pre_ctx->set_pre_ctx_same(ctx.get());
if (UNLIKELY(_record_rowids)) {
_block_row_locations[row_idx] =
ctx->current_row_location();
}
@@ -261,9 +257,6 @@ private:
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
_merge_heap.push(ctx);
- } else {
- // Release ctx earlier to reduce resource consumed
- delete ctx;
}
}
if (!_merge_heap.empty()) {
@@ -284,14 +277,15 @@ private:
const Schema* _schema = nullptr;
struct VMergeContextComparator {
- bool operator()(const VMergeIteratorContext* lhs, const
VMergeIteratorContext* rhs) const {
+ bool operator()(const std::shared_ptr<VMergeIteratorContext>& lhs,
+ const std::shared_ptr<VMergeIteratorContext>& rhs)
const {
return lhs->compare(*rhs);
}
};
- using VMergeHeap =
- std::priority_queue<VMergeIteratorContext*,
std::vector<VMergeIteratorContext*>,
- VMergeContextComparator>;
+ using VMergeHeap =
std::priority_queue<std::shared_ptr<VMergeIteratorContext>,
+
std::vector<std::shared_ptr<VMergeIteratorContext>>,
+ VMergeContextComparator>;
VMergeHeap _merge_heap;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]