This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 721f5965cf8 [fix](inverted index) fix memory leaks in inverted index 
(#52747)
721f5965cf8 is described below

commit 721f5965cf85430a3d6ccf8fda8d84a6b60446f8
Author: zzzxl <[email protected]>
AuthorDate: Fri Jul 4 10:55:47 2025 +0800

    [fix](inverted index) fix memory leaks in inverted index (#52747)
---
 .../olap/rowset/segment_v2/inverted_index_common.h | 103 +++++++++++++++++++++
 .../segment_v2/inverted_index_compound_reader.cpp  |   6 +-
 .../segment_v2/inverted_index_compound_reader.h    |   4 +-
 .../segment_v2/inverted_index_file_writer.cpp      |  99 +++++++++++---------
 .../segment_v2/inverted_index_fs_directory.cpp     |  18 +++-
 5 files changed, 176 insertions(+), 54 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/inverted_index_common.h 
b/be/src/olap/rowset/segment_v2/inverted_index_common.h
new file mode 100644
index 00000000000..1fdb7df2931
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/inverted_index_common.h
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <CLucene.h> // IWYU pragma: keep
+
+#include <memory>
+
+#include "common/logging.h"
+
+namespace lucene::store {
+class Directory;
+} // namespace lucene::store
+
+namespace doris::segment_v2 {
+
+struct DirectoryDeleter {
+    void operator()(lucene::store::Directory* ptr) const { _CLDECDELETE(ptr); }
+};
+
+struct ErrorContext {
+    std::string err_msg;
+    std::exception_ptr eptr;
+};
+
+template <typename T>
+concept HasClose = requires(T t) {
+    { t->close() };
+};
+
+template <typename PtrType>
+    requires HasClose<PtrType>
+void finally_close(PtrType& resource, ErrorContext& error_context) {
+    if (resource) {
+        try {
+            resource->close();
+        } catch (CLuceneError& err) {
+            error_context.eptr = std::current_exception();
+            error_context.err_msg.append("Error occurred while closing 
resource: ");
+            error_context.err_msg.append(err.what());
+            LOG(ERROR) << error_context.err_msg;
+        } catch (...) {
+            error_context.eptr = std::current_exception();
+            error_context.err_msg.append("Error occurred while closing 
resource");
+            LOG(ERROR) << error_context.err_msg;
+        }
+    }
+}
+
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunused-macros"
+#endif
+
+#define FINALLY_CLOSE(resource)                                                
     \
+    {                                                                          
     \
+        static_assert(sizeof(error_context) > 0,                               
     \
+                      "error_context must be defined before using FINALLY 
macro!"); \
+        finally_close(resource, error_context);                                
     \
+    }
+
+// Return ERROR after finally
+#define FINALLY(finally_block)                                                 
                   \
+    {                                                                          
                   \
+        static_assert(sizeof(error_context) > 0,                               
                   \
+                      "error_context must be defined before using FINALLY 
macro!");               \
+        finally_block;                                                         
                   \
+        if (error_context.eptr) {                                              
                   \
+            return 
Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(error_context.err_msg); \
+        }                                                                      
                   \
+    }
+
+// Re-throw the exception after finally
+#define FINALLY_EXCEPTION(finally_block)                                       
     \
+    {                                                                          
     \
+        static_assert(sizeof(error_context) > 0,                               
     \
+                      "error_context must be defined before using FINALLY 
macro!"); \
+        finally_block;                                                         
     \
+        if (error_context.eptr) {                                              
     \
+            std::rethrow_exception(error_context.eptr);                        
     \
+        }                                                                      
     \
+    }
+
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 08ab1b6cc88..4ea65240534 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -114,9 +114,9 @@ 
DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
                                          int32_t read_buffer_size, bool 
open_idx_file_cache)
         : readBufferSize(read_buffer_size),
           dir(d),
-          ram_dir(new lucene::store::RAMDirectory()),
+          ram_dir(std::make_unique<lucene::store::RAMDirectory>()),
           file_name(name),
-          entries(_CLNEW EntriesType(true, true)) {
+          entries(std::make_unique<EntriesType>(true, true)) {
     bool success = false;
     try {
         if (dir->fileLength(name) == 0) {
@@ -203,7 +203,6 @@ DorisCompoundReader::~DorisCompoundReader() {
             LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
         }
     }
-    _CLDELETE(entries)
 }
 
 const char* DorisCompoundReader::getClassName() {
@@ -314,7 +313,6 @@ void DorisCompoundReader::close() {
     }
     if (ram_dir) {
         ram_dir->close();
-        _CLDELETE(ram_dir)
     }
     if (dir) {
         dir->close();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index bc5ae415052..81b8544a1ae 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -68,12 +68,12 @@ private:
     int32_t readBufferSize;
     // base info
     lucene::store::Directory* dir = nullptr;
-    lucene::store::RAMDirectory* ram_dir = nullptr;
+    std::unique_ptr<lucene::store::RAMDirectory> ram_dir;
     std::string directory;
     std::string file_name;
     CL_NS(store)::IndexInput* stream = nullptr;
     // The life cycle of entries should be consistent with that of the 
DorisCompoundReader.
-    EntriesType* entries = nullptr;
+    std::unique_ptr<EntriesType> entries;
     std::mutex _this_lock;
     bool _closed = false;
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index 7b66ee70cbe..87ace2580f2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -21,6 +21,7 @@
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
 #include "olap/rowset/segment_v2/inverted_index_cache.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
 #include "olap/rowset/segment_v2/inverted_index_reader.h"
@@ -137,15 +138,14 @@ Status InvertedIndexFileWriter::close() {
         if (_storage_format == InvertedIndexStorageFormatPB::V1) {
             for (const auto& entry : _indices_dirs) {
                 const auto& dir = entry.second;
-                auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get());
+                DorisCompoundFileWriter cfsWriter(dir.get());
                 // write compound file
-                _file_size += cfsWriter->writeCompoundFile();
+                _file_size += cfsWriter.writeCompoundFile();
                 // delete index path, which contains separated inverted index 
files
                 if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 
0) {
                     auto* compound_dir = 
static_cast<DorisFSDirectory*>(dir.get());
                     compound_dir->deleteDirectory();
                 }
-                _CLDELETE(cfsWriter)
             }
         } else {
             _file_size = write();
@@ -337,50 +337,63 @@ size_t DorisCompoundFileWriter::writeCompoundFile() {
     ram_dir.close();
 
     auto compound_fs = ((DorisFSDirectory*)directory)->getCompoundFileSystem();
-    auto* out_dir = DorisFSDirectoryFactory::getDirectory(compound_fs, 
idx_path.c_str());
+    std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir;
+    std::unique_ptr<lucene::store::IndexOutput> output;
 
-    auto* out = out_dir->createOutput(idx_name.c_str());
-    if (out == nullptr) {
-        LOG(WARNING) << "Write compound file error: CompoundDirectory output 
is nullptr.";
-        _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
-    }
-    std::unique_ptr<lucene::store::IndexOutput> output(out);
-    size_t start = output->getFilePointer();
-    output->writeVInt(file_count);
-    // write file entries
-    int64_t data_offset = header_len;
-    uint8_t header_buffer[buffer_length];
-    for (int i = 0; i < sorted_files.size(); ++i) {
-        auto file = sorted_files[i];
-        output->writeString(file.filename); // FileName
-        // DataOffset
-        if (i < header_file_count) {
-            // file data write in header, so we set its offset to -1.
-            output->writeLong(-1);
-        } else {
-            output->writeLong(data_offset);
+    ErrorContext error_context;
+    size_t compound_file_size = 0;
+    try {
+        out_dir = std::unique_ptr<lucene::store::Directory, DirectoryDeleter>(
+                DorisFSDirectoryFactory::getDirectory(compound_fs, 
idx_path.c_str()));
+        output = std::unique_ptr<lucene::store::IndexOutput>(
+                out_dir->createOutput(idx_name.c_str()));
+        if (output == nullptr) {
+            LOG(WARNING) << "Write compound file error: CompoundDirectory 
output is nullptr.";
+            _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
         }
-        output->writeLong(file.filesize); // FileLength
-        if (i < header_file_count) {
-            // append data
-            copyFile(file.filename.c_str(), directory, output.get(), 
header_buffer, buffer_length);
-        } else {
-            data_offset += file.filesize;
+
+        size_t start = output->getFilePointer();
+        output->writeVInt(file_count);
+        // write file entries
+        int64_t data_offset = header_len;
+        uint8_t header_buffer[buffer_length];
+        for (int i = 0; i < sorted_files.size(); ++i) {
+            auto file = sorted_files[i];
+            output->writeString(file.filename); // FileName
+            // DataOffset
+            if (i < header_file_count) {
+                // file data write in header, so we set its offset to -1.
+                output->writeLong(-1);
+            } else {
+                output->writeLong(data_offset);
+            }
+            output->writeLong(file.filesize); // FileLength
+            if (i < header_file_count) {
+                // append data
+                copyFile(file.filename.c_str(), directory, output.get(), 
header_buffer,
+                         buffer_length);
+            } else {
+                data_offset += file.filesize;
+            }
         }
+        // write rest files' data
+        uint8_t data_buffer[buffer_length];
+        for (int i = header_file_count; i < sorted_files.size(); ++i) {
+            auto file = sorted_files[i];
+            copyFile(file.filename.c_str(), directory, output.get(), 
data_buffer, buffer_length);
+        }
+
+        compound_file_size = output->getFilePointer() - start;
+    } catch (CLuceneError& err) {
+        error_context.eptr = std::current_exception();
+        error_context.err_msg.append("writeCompoundFile exception, error msg: 
");
+        error_context.err_msg.append(err.what());
+        LOG(ERROR) << error_context.err_msg;
     }
-    // write rest files' data
-    uint8_t data_buffer[buffer_length];
-    for (int i = header_file_count; i < sorted_files.size(); ++i) {
-        auto file = sorted_files[i];
-        copyFile(file.filename.c_str(), directory, output.get(), data_buffer, 
buffer_length);
-    }
-    out_dir->close();
-    // NOTE: need to decrease ref count, but not to delete here,
-    // because index cache may get the same directory from DIRECTORIES
-    _CLDECDELETE(out_dir)
-    auto compound_file_size = output->getFilePointer() - start;
-    output->close();
-    //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << 
compound_file_size;
+    FINALLY_EXCEPTION({
+        FINALLY_CLOSE(out_dir);
+        FINALLY_CLOSE(output);
+    })
     return compound_file_size;
 }
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 9e2e253c404..542d3f97eca 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -23,6 +23,7 @@
 #include "inverted_index_desc.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
 #include "olap/tablet_schema.h"
 #include "util/debug_points.h"
 #include "util/slice.h"
@@ -516,14 +517,21 @@ lucene::store::IndexOutput* 
DorisFSDirectory::createOutput(const char* name) {
         assert(!exists);
     }
     auto* ret = _CLNEW FSIndexOutput();
+    ErrorContext error_context;
     try {
         ret->init(fs, fl);
     } catch (CLuceneError& err) {
-        ret->close();
-        _CLDELETE(ret)
-        LOG(WARNING) << "FSIndexOutput init error: " << err.what();
-        _CLTHROWA(CL_ERR_IO, "FSIndexOutput init error");
-    }
+        error_context.eptr = std::current_exception();
+        error_context.err_msg.append("FSIndexOutput init error: ");
+        error_context.err_msg.append(err.what());
+        LOG(ERROR) << error_context.err_msg;
+    }
+    FINALLY_EXCEPTION({
+        if (error_context.eptr) {
+            FINALLY_CLOSE(ret);
+            _CLDELETE(ret);
+        }
+    })
     return ret;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to