This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 721f5965cf8 [fix](inverted index) fix memory leaks in inverted index
(#52747)
721f5965cf8 is described below
commit 721f5965cf85430a3d6ccf8fda8d84a6b60446f8
Author: zzzxl <[email protected]>
AuthorDate: Fri Jul 4 10:55:47 2025 +0800
[fix](inverted index) fix memory leaks in inverted index (#52747)
---
.../olap/rowset/segment_v2/inverted_index_common.h | 103 +++++++++++++++++++++
.../segment_v2/inverted_index_compound_reader.cpp | 6 +-
.../segment_v2/inverted_index_compound_reader.h | 4 +-
.../segment_v2/inverted_index_file_writer.cpp | 99 +++++++++++---------
.../segment_v2/inverted_index_fs_directory.cpp | 18 +++-
5 files changed, 176 insertions(+), 54 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_common.h
b/be/src/olap/rowset/segment_v2/inverted_index_common.h
new file mode 100644
index 00000000000..1fdb7df2931
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/inverted_index_common.h
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <CLucene.h> // IWYU pragma: keep
+
+#include <memory>
+
+#include "common/logging.h"
+
+namespace lucene::store {
+class Directory;
+} // namespace lucene::store
+
+namespace doris::segment_v2 {
+
+struct DirectoryDeleter {
+ void operator()(lucene::store::Directory* ptr) const { _CLDECDELETE(ptr); }
+};
+
+struct ErrorContext {
+ std::string err_msg;
+ std::exception_ptr eptr;
+};
+
+template <typename T>
+concept HasClose = requires(T t) {
+ { t->close() };
+};
+
+template <typename PtrType>
+ requires HasClose<PtrType>
+void finally_close(PtrType& resource, ErrorContext& error_context) {
+ if (resource) {
+ try {
+ resource->close();
+ } catch (CLuceneError& err) {
+ error_context.eptr = std::current_exception();
+ error_context.err_msg.append("Error occurred while closing
resource: ");
+ error_context.err_msg.append(err.what());
+ LOG(ERROR) << error_context.err_msg;
+ } catch (...) {
+ error_context.eptr = std::current_exception();
+ error_context.err_msg.append("Error occurred while closing
resource");
+ LOG(ERROR) << error_context.err_msg;
+ }
+ }
+}
+
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunused-macros"
+#endif
+
+#define FINALLY_CLOSE(resource)
\
+ {
\
+ static_assert(sizeof(error_context) > 0,
\
+ "error_context must be defined before using FINALLY
macro!"); \
+ finally_close(resource, error_context);
\
+ }
+
+// Return ERROR after finally
+#define FINALLY(finally_block)
\
+ {
\
+ static_assert(sizeof(error_context) > 0,
\
+ "error_context must be defined before using FINALLY
macro!"); \
+ finally_block;
\
+ if (error_context.eptr) {
\
+ return
Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(error_context.err_msg); \
+ }
\
+ }
+
+// Re-throw the exception after finally
+#define FINALLY_EXCEPTION(finally_block)
\
+ {
\
+ static_assert(sizeof(error_context) > 0,
\
+ "error_context must be defined before using FINALLY
macro!"); \
+ finally_block;
\
+ if (error_context.eptr) {
\
+ std::rethrow_exception(error_context.eptr);
\
+ }
\
+ }
+
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 08ab1b6cc88..4ea65240534 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -114,9 +114,9 @@
DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
int32_t read_buffer_size, bool
open_idx_file_cache)
: readBufferSize(read_buffer_size),
dir(d),
- ram_dir(new lucene::store::RAMDirectory()),
+ ram_dir(std::make_unique<lucene::store::RAMDirectory>()),
file_name(name),
- entries(_CLNEW EntriesType(true, true)) {
+ entries(std::make_unique<EntriesType>(true, true)) {
bool success = false;
try {
if (dir->fileLength(name) == 0) {
@@ -203,7 +203,6 @@ DorisCompoundReader::~DorisCompoundReader() {
LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
}
}
- _CLDELETE(entries)
}
const char* DorisCompoundReader::getClassName() {
@@ -314,7 +313,6 @@ void DorisCompoundReader::close() {
}
if (ram_dir) {
ram_dir->close();
- _CLDELETE(ram_dir)
}
if (dir) {
dir->close();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index bc5ae415052..81b8544a1ae 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -68,12 +68,12 @@ private:
int32_t readBufferSize;
// base info
lucene::store::Directory* dir = nullptr;
- lucene::store::RAMDirectory* ram_dir = nullptr;
+ std::unique_ptr<lucene::store::RAMDirectory> ram_dir;
std::string directory;
std::string file_name;
CL_NS(store)::IndexInput* stream = nullptr;
// The life cycle of entries should be consistent with that of the
DorisCompoundReader.
- EntriesType* entries = nullptr;
+ std::unique_ptr<EntriesType> entries;
std::mutex _this_lock;
bool _closed = false;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index 7b66ee70cbe..87ace2580f2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -21,6 +21,7 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
@@ -137,15 +138,14 @@ Status InvertedIndexFileWriter::close() {
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
for (const auto& entry : _indices_dirs) {
const auto& dir = entry.second;
- auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get());
+ DorisCompoundFileWriter cfsWriter(dir.get());
// write compound file
- _file_size += cfsWriter->writeCompoundFile();
+ _file_size += cfsWriter.writeCompoundFile();
// delete index path, which contains separated inverted index
files
if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") ==
0) {
auto* compound_dir =
static_cast<DorisFSDirectory*>(dir.get());
compound_dir->deleteDirectory();
}
- _CLDELETE(cfsWriter)
}
} else {
_file_size = write();
@@ -337,50 +337,63 @@ size_t DorisCompoundFileWriter::writeCompoundFile() {
ram_dir.close();
auto compound_fs = ((DorisFSDirectory*)directory)->getCompoundFileSystem();
- auto* out_dir = DorisFSDirectoryFactory::getDirectory(compound_fs,
idx_path.c_str());
+ std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir;
+ std::unique_ptr<lucene::store::IndexOutput> output;
- auto* out = out_dir->createOutput(idx_name.c_str());
- if (out == nullptr) {
- LOG(WARNING) << "Write compound file error: CompoundDirectory output
is nullptr.";
- _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
- }
- std::unique_ptr<lucene::store::IndexOutput> output(out);
- size_t start = output->getFilePointer();
- output->writeVInt(file_count);
- // write file entries
- int64_t data_offset = header_len;
- uint8_t header_buffer[buffer_length];
- for (int i = 0; i < sorted_files.size(); ++i) {
- auto file = sorted_files[i];
- output->writeString(file.filename); // FileName
- // DataOffset
- if (i < header_file_count) {
- // file data write in header, so we set its offset to -1.
- output->writeLong(-1);
- } else {
- output->writeLong(data_offset);
+ ErrorContext error_context;
+ size_t compound_file_size = 0;
+ try {
+ out_dir = std::unique_ptr<lucene::store::Directory, DirectoryDeleter>(
+ DorisFSDirectoryFactory::getDirectory(compound_fs,
idx_path.c_str()));
+ output = std::unique_ptr<lucene::store::IndexOutput>(
+ out_dir->createOutput(idx_name.c_str()));
+ if (output == nullptr) {
+ LOG(WARNING) << "Write compound file error: CompoundDirectory
output is nullptr.";
+ _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
}
- output->writeLong(file.filesize); // FileLength
- if (i < header_file_count) {
- // append data
- copyFile(file.filename.c_str(), directory, output.get(),
header_buffer, buffer_length);
- } else {
- data_offset += file.filesize;
+
+ size_t start = output->getFilePointer();
+ output->writeVInt(file_count);
+ // write file entries
+ int64_t data_offset = header_len;
+ uint8_t header_buffer[buffer_length];
+ for (int i = 0; i < sorted_files.size(); ++i) {
+ auto file = sorted_files[i];
+ output->writeString(file.filename); // FileName
+ // DataOffset
+ if (i < header_file_count) {
+ // file data write in header, so we set its offset to -1.
+ output->writeLong(-1);
+ } else {
+ output->writeLong(data_offset);
+ }
+ output->writeLong(file.filesize); // FileLength
+ if (i < header_file_count) {
+ // append data
+ copyFile(file.filename.c_str(), directory, output.get(),
header_buffer,
+ buffer_length);
+ } else {
+ data_offset += file.filesize;
+ }
}
+ // write rest files' data
+ uint8_t data_buffer[buffer_length];
+ for (int i = header_file_count; i < sorted_files.size(); ++i) {
+ auto file = sorted_files[i];
+ copyFile(file.filename.c_str(), directory, output.get(),
data_buffer, buffer_length);
+ }
+
+ compound_file_size = output->getFilePointer() - start;
+ } catch (CLuceneError& err) {
+ error_context.eptr = std::current_exception();
+ error_context.err_msg.append("writeCompoundFile exception, error msg:
");
+ error_context.err_msg.append(err.what());
+ LOG(ERROR) << error_context.err_msg;
}
- // write rest files' data
- uint8_t data_buffer[buffer_length];
- for (int i = header_file_count; i < sorted_files.size(); ++i) {
- auto file = sorted_files[i];
- copyFile(file.filename.c_str(), directory, output.get(), data_buffer,
buffer_length);
- }
- out_dir->close();
- // NOTE: need to decrease ref count, but not to delete here,
- // because index cache may get the same directory from DIRECTORIES
- _CLDECDELETE(out_dir)
- auto compound_file_size = output->getFilePointer() - start;
- output->close();
- //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" <<
compound_file_size;
+ FINALLY_EXCEPTION({
+ FINALLY_CLOSE(out_dir);
+ FINALLY_CLOSE(output);
+ })
return compound_file_size;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 9e2e253c404..542d3f97eca 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -23,6 +23,7 @@
#include "inverted_index_desc.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
#include "olap/tablet_schema.h"
#include "util/debug_points.h"
#include "util/slice.h"
@@ -516,14 +517,21 @@ lucene::store::IndexOutput*
DorisFSDirectory::createOutput(const char* name) {
assert(!exists);
}
auto* ret = _CLNEW FSIndexOutput();
+ ErrorContext error_context;
try {
ret->init(fs, fl);
} catch (CLuceneError& err) {
- ret->close();
- _CLDELETE(ret)
- LOG(WARNING) << "FSIndexOutput init error: " << err.what();
- _CLTHROWA(CL_ERR_IO, "FSIndexOutput init error");
- }
+ error_context.eptr = std::current_exception();
+ error_context.err_msg.append("FSIndexOutput init error: ");
+ error_context.err_msg.append(err.what());
+ LOG(ERROR) << error_context.err_msg;
+ }
+ FINALLY_EXCEPTION({
+ if (error_context.eptr) {
+ FINALLY_CLOSE(ret);
+ _CLDELETE(ret);
+ }
+ })
return ret;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]