This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new cbdb8e25beb branch-3.1: [fix](inverted index) fix memory leak in 
inverted index procedure #53235 (#53425)
cbdb8e25beb is described below

commit cbdb8e25beb6966fd6b74d2a19da88b945004158
Author: airborne12 <[email protected]>
AuthorDate: Fri Jul 18 10:45:27 2025 +0800

    branch-3.1: [fix](inverted index) fix memory leak in inverted index 
procedure #53235 (#53425)
    
    pick and refact from #53235
---
 be/src/clucene                                     |  2 +-
 .../segment_v2/inverted_index_compaction.cpp       | 16 ++--
 .../segment_v2/inverted_index_compound_reader.cpp  | 98 +++++++++++-----------
 .../segment_v2/inverted_index_compound_reader.h    | 22 ++---
 .../segment_v2/inverted_index_file_reader.cpp      | 42 +++++-----
 .../rowset/segment_v2/inverted_index_file_reader.h |  7 +-
 .../segment_v2/inverted_index_file_writer.cpp      | 12 ++-
 .../segment_v2/inverted_index_fs_directory.cpp     | 39 +++++++--
 .../segment_v2/inverted_index_fs_directory.h       |  2 +-
 .../rowset/segment_v2/inverted_index_searcher.cpp  | 23 +++--
 .../rowset/segment_v2/inverted_index_writer.cpp    | 20 +++--
 ...inverted_index_exception_fault_injection.groovy |  9 +-
 12 files changed, 144 insertions(+), 148 deletions(-)

diff --git a/be/src/clucene b/be/src/clucene
index 4f5449c9037..d6eda26f4dd 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 4f5449c903778fae32884586c728587c24a58806
+Subproject commit d6eda26f4dd87aec21ca5d925727743ad12d0281
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index dcbdca921ab..5f3679aaeae 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -17,6 +17,7 @@
 
 #include "inverted_index_compaction.h"
 
+#include "inverted_index_common.h"
 #include "inverted_index_file_writer.h"
 #include "inverted_index_fs_directory.h"
 #include "io/fs/local_file_system.h"
@@ -41,15 +42,17 @@ Status compact_column(int64_t index_id,
                     "debug point: index compaction error");
         }
     })
+
     bool can_use_ram_dir = true;
-    lucene::store::Directory* dir = DorisFSDirectoryFactory::getDirectory(
-            io::global_local_filesystem(), tmp_path.data(), can_use_ram_dir);
+    std::unique_ptr<lucene::store::Directory, DirectoryDeleter> dir(
+            
DorisFSDirectoryFactory::getDirectory(io::global_local_filesystem(), 
tmp_path.data(),
+                                                  can_use_ram_dir));
     DBUG_EXECUTE_IF("compact_column_getDirectory_error", {
         _CLTHROWA(CL_ERR_IO, "debug point: compact_column_getDirectory_error 
in index compaction");
     })
     lucene::analysis::SimpleAnalyzer<char> analyzer;
-    auto* index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, 
true /* create */,
-                                                           true /* 
closeDirOnShutdown */);
+    std::unique_ptr<lucene::index::IndexWriter> index_writer(_CLNEW 
lucene::index::IndexWriter(
+            dir.get(), &analyzer, true /* create */, true /* 
closeDirOnShutdown */));
     DBUG_EXECUTE_IF("compact_column_create_index_writer_error", {
         _CLTHROWA(CL_ERR_IO,
                   "debug point: compact_column_create_index_writer_error in 
index compaction");
@@ -71,11 +74,6 @@ Status compact_column(int64_t index_id,
         _CLTHROWA(CL_ERR_IO,
                   "debug point: compact_column_index_writer_close_error in 
index compaction");
     })
-    _CLDELETE(index_writer);
-    // NOTE: need to ref_cnt-- for dir,
-    // when index_writer is destroyed, if closeDir is set, dir will be close
-    // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir 
will be destroyed.
-    _CLDECDELETE(dir)
 
     // delete temporary segment_path, only when inverted_index_ram_dir_enable 
is false
     if (!config::inverted_index_ram_dir_enable) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 86efe86ca43..37878689cfd 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -22,10 +22,10 @@
 #include <CLucene/debug/mem.h>
 #include <CLucene/store/RAMDirectory.h>
 #include <CLucene/util/Misc.h>
-#include <stdio.h>
-#include <string.h>
-#include <wchar.h>
 
+#include <cstdio>
+#include <cstring>
+#include <cwchar>
 #include <memory>
 #include <utility>
 
@@ -34,22 +34,13 @@
 #include "olap/tablet_schema.h"
 #include "util/debug_points.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 class FileWriter;
-} // namespace io
-} // namespace doris
+} // namespace doris::io
 
 #define BUFFER_LENGTH 16384
 #define CL_MAX_PATH 4096
-
-#define STRDUP_WtoA(x) CL_NS(util)::Misc::_wideToChar(x)
-#define STRDUP_TtoA STRDUP_WtoA
-
-using FileWriterPtr = std::unique_ptr<doris::io::FileWriter>;
-
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
 
 /** Implementation of an IndexInput that reads from a portion of the
  *  compound file.
@@ -94,7 +85,7 @@ CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, 
const std::string& fi
 void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
     std::lock_guard wlock(((DorisFSDirectory::FSIndexInput*)base)->_this_lock);
 
-    int64_t start = getFilePointer();
+    auto start = getFilePointer();
     if (start + len > _length) {
         _CLTHROWA(CL_ERR_IO, "read past EOF");
     }
@@ -153,56 +144,63 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
 }
 
 DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
-                                         EntriesType* entries_clone, int32_t 
read_buffer_size,
+                                         const EntriesType& entries_clone, 
int32_t read_buffer_size,
                                          const io::IOContext* io_ctx)
         : _stream(stream),
-          _entries(_CLNEW EntriesType(true, true)),
+          _entries(std::make_unique<EntriesType>()),
           _read_buffer_size(read_buffer_size) {
     // After stream clone, the io_ctx needs to be reconfigured.
     initialize(io_ctx);
 
-    for (auto& e : *entries_clone) {
-        auto* origin_entry = e.second;
-        auto* entry = _CLNEW ReaderFileEntry();
-        char* aid = strdup(e.first);
+    for (const auto& e : entries_clone) {
+        const auto& origin_entry = e.second;
+        auto entry = std::make_unique<ReaderFileEntry>();
         entry->file_name = origin_entry->file_name;
         entry->offset = origin_entry->offset;
         entry->length = origin_entry->length;
-        _entries->put(aid, entry);
+        (*_entries)[e.first] = std::move(entry);
     }
 };
 
 DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, 
int32_t read_buffer_size,
                                          const io::IOContext* io_ctx)
-        : _ram_dir(new lucene::store::RAMDirectory()),
+        : _ram_dir(std::make_unique<lucene::store::RAMDirectory>()),
           _stream(stream),
-          _entries(_CLNEW EntriesType(true, true)),
+          _entries(std::make_unique<EntriesType>()),
           _read_buffer_size(read_buffer_size) {
     // After stream clone, the io_ctx needs to be reconfigured.
     initialize(io_ctx);
 
     try {
         int32_t count = _stream->readVInt();
-        ReaderFileEntry* entry = nullptr;
-        TCHAR tid[CL_MAX_PATH];
         uint8_t buffer[BUFFER_LENGTH];
         for (int32_t i = 0; i < count; i++) {
-            entry = _CLNEW ReaderFileEntry();
-            stream->readString(tid, CL_MAX_PATH);
-            char* aid = STRDUP_TtoA(tid);
-            entry->file_name = aid;
+            auto entry = std::make_unique<ReaderFileEntry>();
+            // Read the string length first
+            int32_t string_length = stream->readVInt();
+            // Allocate appropriate buffer for the string
+            std::wstring tid;
+            tid.resize(string_length);
+            // Read the string characters directly
+            stream->readChars(tid.data(), 0, string_length);
+            std::string file_name_str(tid.begin(), tid.end());
+            entry->file_name = file_name_str;
             entry->offset = stream->readLong();
             entry->length = stream->readLong();
+            VLOG_DEBUG << "string_length:" << string_length << " file_name:" 
<< entry->file_name
+                       << " offset:" << entry->offset << " length:" << 
entry->length;
             DBUG_EXECUTE_IF("construct_DorisCompoundReader_failed", {
                 CLuceneError err;
                 err.set(CL_ERR_IO, "construct_DorisCompoundReader_failed");
                 throw err;
             })
-            _entries->put(aid, entry);
             // read header file data
             if (entry->offset < 0) {
-                copyFile(entry->file_name.c_str(), entry->length, buffer, 
BUFFER_LENGTH);
+                //if offset is -1, it means it's size is lower than 
DorisFSDirectory::MAX_HEADER_DATA_SIZE, which is 128k.
+                copyFile(entry->file_name.c_str(), 
static_cast<int64_t>(entry->length), buffer,
+                         BUFFER_LENGTH);
             }
+            _entries->emplace(std::move(file_name_str), std::move(entry));
         }
     } catch (...) {
         try {
@@ -212,11 +210,9 @@ 
DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32
             }
             if (_entries != nullptr) {
                 _entries->clear();
-                _CLDELETE(_entries);
             }
             if (_ram_dir) {
                 _ram_dir->close();
-                _CLDELETE(_ram_dir)
             }
         } catch (CLuceneError& err) {
             if (err.number() != CL_ERR_IO) {
@@ -231,11 +227,12 @@ void DorisCompoundReader::copyFile(const char* file, 
int64_t file_length, uint8_
                                    int64_t buffer_length) {
     std::unique_ptr<lucene::store::IndexOutput> 
output(_ram_dir->createOutput(file));
     int64_t start_ptr = output->getFilePointer();
-    int64_t remainder = file_length;
-    int64_t chunk = buffer_length;
+    auto remainder = file_length;
+    auto chunk = buffer_length;
+    auto batch_len = file_length < chunk ? file_length : chunk;
 
     while (remainder > 0) {
-        int64_t len = std::min(std::min(chunk, file_length), remainder);
+        auto len = remainder < batch_len ? remainder : batch_len;
         _stream->readBytes(buffer, len);
         output->writeBytes(buffer, len);
         remainder -= len;
@@ -270,7 +267,6 @@ DorisCompoundReader::~DorisCompoundReader() {
             LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
         }
     }
-    _CLDELETE(_entries)
 }
 
 const char* DorisCompoundReader::getClassName() {
@@ -284,8 +280,8 @@ bool DorisCompoundReader::list(std::vector<std::string>* 
names) const {
     if (_closed || _entries == nullptr) {
         _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
     }
-    for (EntriesType::const_iterator i = _entries->begin(); i != 
_entries->end(); i++) {
-        names->push_back(i->first);
+    for (const auto& entry : *_entries) {
+        names->push_back(entry.first);
     }
     return true;
 }
@@ -294,7 +290,7 @@ bool DorisCompoundReader::fileExists(const char* name) 
const {
     if (_closed || _entries == nullptr) {
         _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
     }
-    return _entries->exists((char*)name);
+    return _entries->find(std::string(name)) != _entries->end();
 }
 
 int64_t DorisCompoundReader::fileModified(const char* name) const {
@@ -305,15 +301,15 @@ int64_t DorisCompoundReader::fileLength(const char* name) 
const {
     if (_closed || _entries == nullptr) {
         _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
     }
-    ReaderFileEntry* e = _entries->get((char*)name);
-    if (e == nullptr) {
+    auto it = _entries->find(std::string(name));
+    if (it == _entries->end()) {
         char buf[CL_MAX_PATH + 30];
         strcpy(buf, "File ");
         strncat(buf, name, CL_MAX_PATH);
         strcat(buf, " does not exist");
         _CLTHROWA(CL_ERR_IO, buf);
     }
-    return e->length;
+    return it->second->length;
 }
 
 bool DorisCompoundReader::openInput(const char* name,
@@ -338,14 +334,16 @@ bool DorisCompoundReader::openInput(const char* name, 
lucene::store::IndexInput*
         return false;
     }
 
-    const ReaderFileEntry* entry = _entries->get((char*)name);
-    if (entry == nullptr) {
+    auto it = _entries->find(std::string(name));
+    if (it == _entries->end()) {
         char buf[CL_MAX_PATH + 26];
         snprintf(buf, CL_MAX_PATH + 26, "No sub-file with id %s found", name);
         error.set(CL_ERR_IO, buf);
         return false;
     }
 
+    const auto& entry = it->second;
+
     // If file is in RAM, just return.
     if (_ram_dir && _ram_dir->fileExists(name)) {
         return _ram_dir->openInput(name, ret, error, bufferSize);
@@ -374,7 +372,6 @@ void DorisCompoundReader::close() {
     }
     if (_ram_dir) {
         _ram_dir->close();
-        _CLDELETE(_ram_dir)
     }
     _closed = true;
 }
@@ -400,7 +397,7 @@ lucene::store::IndexOutput* 
DorisCompoundReader::createOutput(const char* /*name
 }
 
 std::string DorisCompoundReader::toString() const {
-    return std::string("DorisCompoundReader@");
+    return "DorisCompoundReader@";
 }
 
 CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() {
@@ -412,5 +409,4 @@ void DorisCompoundReader::initialize(const io::IOContext* 
io_ctx) {
     _stream->setIdxFileCache(true);
 }
 
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 4a687e4ed3e..09e6faaeb91 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -24,12 +24,13 @@
 #include <CLucene/store/IndexOutput.h>
 #include <CLucene/util/Equators.h>
 #include <CLucene/util/VoidMap.h>
-#include <stdint.h>
 
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -43,9 +44,7 @@ namespace lucene::store {
 class RAMDirectory;
 } // namespace lucene::store
 
-namespace doris {
-class TabletIndex;
-namespace segment_v2 {
+namespace doris::segment_v2 {
 
 class ReaderFileEntry : LUCENE_BASE {
 public:
@@ -60,16 +59,14 @@ public:
     ~ReaderFileEntry() override = default;
 };
 
-using EntriesType =
-        lucene::util::CLHashMap<char*, ReaderFileEntry*, 
lucene::util::Compare::Char,
-                                lucene::util::Equals::Char, 
lucene::util::Deletor::acArray,
-                                
lucene::util::Deletor::Object<ReaderFileEntry>>;
+using EntriesType = std::unordered_map<std::string, 
std::unique_ptr<ReaderFileEntry>>;
+
 class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
 private:
-    lucene::store::RAMDirectory* _ram_dir = nullptr;
+    std::unique_ptr<lucene::store::RAMDirectory> _ram_dir;
     CL_NS(store)::IndexInput* _stream = nullptr;
     // The life cycle of _entries should be consistent with that of the 
DorisCompoundReader.
-    EntriesType* _entries = nullptr;
+    std::unique_ptr<EntriesType> _entries;
     std::mutex _this_lock;
     bool _closed = false;
     int32_t _read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE;
@@ -79,7 +76,7 @@ protected:
     bool doDeleteFile(const char* name) override;
 
 public:
-    DorisCompoundReader(CL_NS(store)::IndexInput* stream, EntriesType* 
entries_clone,
+    DorisCompoundReader(CL_NS(store)::IndexInput* stream, const EntriesType& 
entries_clone,
                         int32_t read_buffer_size = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
                         const io::IOContext* io_ctx = nullptr);
     DorisCompoundReader(CL_NS(store)::IndexInput* stream,
@@ -109,5 +106,4 @@ private:
     void initialize(const io::IOContext* io_ctx);
 };
 
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index 8dbac071290..9a0fcd43e99 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -84,7 +84,6 @@ Status InvertedIndexFileReader::_init_from(int32_t 
read_buffer_size, const io::I
         if (version >= InvertedIndexStorageFormatPB::V2) {
             DCHECK(version == _storage_format);
             int32_t numIndices = _stream->readInt(); // Read number of indices
-            ReaderFileEntry* entry = nullptr;
 
             for (int32_t i = 0; i < numIndices; ++i) {
                 int64_t indexId = _stream->readLong();      // Read index ID
@@ -95,23 +94,19 @@ Status InvertedIndexFileReader::_init_from(int32_t 
read_buffer_size, const io::I
 
                 int32_t numFiles = _stream->readInt(); // Read number of files 
in the index
 
-                // true, true means it will deconstruct key and value
-                auto fileEntries = std::make_unique<EntriesType>(true, true);
+                auto fileEntries = std::make_unique<EntriesType>();
+                fileEntries->reserve(numFiles);
 
                 for (int32_t j = 0; j < numFiles; ++j) {
-                    entry = _CLNEW ReaderFileEntry();
-
                     int32_t file_name_length = _stream->readInt();
-                    // aid will destruct in EntriesType map.
-                    char* aid = (char*)malloc(file_name_length + 1);
-                    _stream->readBytes(reinterpret_cast<uint8_t*>(aid), 
file_name_length);
-                    aid[file_name_length] = '\0';
-                    //stream->readString(tid, CL_MAX_PATH);
-                    entry->file_name = std::string(aid, file_name_length);
+                    std::string file_name(file_name_length, '\0');
+                    
_stream->readBytes(reinterpret_cast<uint8_t*>(file_name.data()),
+                                       file_name_length);
+                    auto entry = std::make_unique<ReaderFileEntry>();
+                    entry->file_name = std::move(file_name);
                     entry->offset = _stream->readLong();
                     entry->length = _stream->readLong();
-
-                    fileEntries->put(aid, entry);
+                    fileEntries->emplace(entry->file_name, std::move(entry));
                 }
 
                 _indices_entries.emplace(std::make_pair(indexId, 
std::move(suffix_str)),
@@ -223,8 +218,8 @@ Result<std::unique_ptr<DorisCompoundReader>> 
InvertedIndexFileReader::_open(
                     errMsg.str()));
         }
         // Need to clone resource here, because index searcher cache need it.
-        compound_reader = std::make_unique<DorisCompoundReader>(
-                _stream->clone(), index_it->second.get(), _read_buffer_size, 
io_ctx);
+        compound_reader = 
std::make_unique<DorisCompoundReader>(_stream->clone(), *index_it->second,
+                                                                
_read_buffer_size, io_ctx);
     }
     return compound_reader;
 }
@@ -291,13 +286,14 @@ Status InvertedIndexFileReader::has_null(const 
TabletIndex* index_meta, bool* re
     if (index_it == _indices_entries.end()) {
         *res = false;
     } else {
-        auto* entries = index_it->second.get();
-        ReaderFileEntry* e =
-                
entries->get((char*)InvertedIndexDescriptor::get_temporary_null_bitmap_file_name());
-        if (e == nullptr) {
+        const auto& entries = index_it->second;
+        auto entry_it =
+                
entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name());
+        if (entry_it == entries->end()) {
             *res = false;
             return Status::OK();
         }
+        const auto& e = entry_it->second;
         // roaring bitmap cookie header size is 5
         if (e->length <= 5) {
             *res = false;
@@ -310,11 +306,11 @@ Status InvertedIndexFileReader::has_null(const 
TabletIndex* index_meta, bool* re
 
 void InvertedIndexFileReader::debug_file_entries() {
     std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
-    for (auto& index : _indices_entries) {
+    for (const auto& index : _indices_entries) {
         LOG(INFO) << "index_id:" << index.first.first;
-        auto* index_entries = index.second.get();
-        for (auto& entry : (*index_entries)) {
-            ReaderFileEntry* file_entry = entry.second;
+        const auto& index_entries = index.second;
+        for (const auto& entry : *index_entries) {
+            const auto& file_entry = entry.second;
             LOG(INFO) << "file entry name:" << file_entry->file_name
                       << " length:" << file_entry->length << " offset:" << 
file_entry->offset;
         }
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h 
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
index 63dd89cf975..49a7dd88400 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
@@ -42,11 +42,6 @@ class DorisCompoundReader;
 
 class InvertedIndexFileReader {
 public:
-    using EntriesType =
-            lucene::util::CLHashMap<char*, ReaderFileEntry*, 
lucene::util::Compare::Char,
-                                    lucene::util::Equals::Char, 
lucene::util::Deletor::acArray,
-                                    
lucene::util::Deletor::Object<ReaderFileEntry>>;
-    // Map to hold the file entries for each index ID.
     using IndicesEntriesMap =
             std::map<std::pair<int64_t, std::string>, 
std::unique_ptr<EntriesType>>;
 
@@ -56,7 +51,7 @@ public:
             : _fs(std::move(fs)),
               _index_path_prefix(std::move(index_path_prefix)),
               _storage_format(storage_format),
-              _idx_file_info(idx_file_info) {}
+              _idx_file_info(std::move(idx_file_info)) {}
 
     Status init(int32_t read_buffer_size = 
config::inverted_index_read_buffer_size,
                 const io::IOContext* io_ctx = nullptr);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index e31fb407ec6..e4be9bf64cb 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -262,6 +262,7 @@ void InvertedIndexFileWriter::copyFile(const char* 
fileName, lucene::store::Dire
     lucene::store::IndexInput* tmp = nullptr;
     CLuceneError err;
     auto open = dir->openInput(fileName, tmp, err);
+    std::unique_ptr<lucene::store::IndexInput> input(tmp);
     DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_openInput_error", {
         open = false;
         err.set(CL_ERR_IO, "debug point: copyFile_openInput_error");
@@ -274,7 +275,6 @@ void InvertedIndexFileWriter::copyFile(const char* 
fileName, lucene::store::Dire
         throw err;
     }
 
-    std::unique_ptr<lucene::store::IndexInput> input(tmp);
     int64_t start_ptr = output->getFilePointer();
     int64_t length = input->length();
     int64_t remainder = length;
@@ -488,15 +488,14 @@ InvertedIndexFileWriter::create_output_stream_v1(int64_t 
index_id,
     out_dir->set_file_writer_opts(_opts);
     std::unique_ptr<lucene::store::Directory, DirectoryDeleter> 
out_dir_ptr(out_dir);
 
-    auto* out = out_dir->createOutput(idx_name.c_str());
+    std::unique_ptr<lucene::store::IndexOutput> 
output(out_dir->createOutput(idx_name.c_str()));
     
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
-                    { out = nullptr; });
-    if (out == nullptr) {
+                    { output = nullptr; });
+    if (output == nullptr) {
         LOG(WARNING) << "InvertedIndexFileWriter::create_output_stream_v1 
error: CompoundDirectory "
                         "output is nullptr.";
         _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
     }
-    std::unique_ptr<lucene::store::IndexOutput> output(out);
 
     return {std::move(out_dir_ptr), std::move(output)};
 }
@@ -546,8 +545,7 @@ InvertedIndexFileWriter::create_output_stream() {
     std::unique_ptr<lucene::store::Directory, DirectoryDeleter> 
out_dir_ptr(out_dir);
 
     DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is 
nullptr";
-    auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
-            out_dir->createOutputV2(_idx_v2_writer.get()));
+    auto compound_file_output = out_dir->createOutputV2(_idx_v2_writer.get());
 
     return {std::move(out_dir_ptr), std::move(compound_file_output)};
 }
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index c633d29a7fc..7bdca1941fa 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -23,6 +23,7 @@
 #include "inverted_index_desc.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
 #include "olap/tablet_schema.h"
 #include "util/debug_points.h"
 #include "util/slice.h"
@@ -119,6 +120,7 @@ public:
 bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const 
char* path,
                                           IndexInput*& ret, CLuceneError& 
error,
                                           int32_t buffer_size, int64_t 
file_size) {
+    // no throw error
     CND_PRECONDITION(path != nullptr, "path is NULL");
 
     if (buffer_size == -1) {
@@ -704,21 +706,42 @@ lucene::store::IndexOutput* 
DorisFSDirectory::createOutput(const char* name) {
         assert(!exists);
     }
     auto* ret = _CLNEW FSIndexOutput();
+    ErrorContext error_context;
     ret->set_file_writer_opts(_opts);
     try {
         ret->init(_fs, fl);
     } catch (CLuceneError& err) {
-        ret->close();
-        _CLDELETE(ret)
-        LOG(WARNING) << "FSIndexOutput init error: " << err.what();
-        _CLTHROWA(CL_ERR_IO, "FSIndexOutput init error");
-    }
+        error_context.eptr = std::current_exception();
+        error_context.err_msg.append("FSIndexOutput init error: ");
+        error_context.err_msg.append(err.what());
+        LOG(ERROR) << error_context.err_msg;
+    }
+    FINALLY_EXCEPTION({
+        if (error_context.eptr) {
+            FINALLY_CLOSE(ret);
+            _CLDELETE(ret);
+        }
+    })
     return ret;
 }
 
-lucene::store::IndexOutput* DorisFSDirectory::createOutputV2(io::FileWriter* 
file_writer) {
-    auto* ret = _CLNEW FSIndexOutputV2();
-    ret->init(file_writer);
+std::unique_ptr<lucene::store::IndexOutput> DorisFSDirectory::createOutputV2(
+        io::FileWriter* file_writer) {
+    auto ret = std::make_unique<FSIndexOutputV2>();
+    ErrorContext error_context;
+    try {
+        ret->init(file_writer);
+    } catch (CLuceneError& err) {
+        error_context.eptr = std::current_exception();
+        error_context.err_msg.append("FSIndexOutputV2 init error: ");
+        error_context.err_msg.append(err.what());
+        LOG(ERROR) << error_context.err_msg;
+    }
+    FINALLY_EXCEPTION({
+        if (error_context.eptr) {
+            FINALLY_CLOSE(ret);
+        }
+    })
     return ret;
 }
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h 
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index 0bba5b49756..60e3a132aa4 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -79,7 +79,7 @@ public:
     void renameFile(const char* from, const char* to) override;
     void touchFile(const char* name) override;
     lucene::store::IndexOutput* createOutput(const char* name) override;
-    lucene::store::IndexOutput* createOutputV2(io::FileWriter* file_writer);
+    std::unique_ptr<lucene::store::IndexOutput> createOutputV2(io::FileWriter* 
file_writer);
     void close() override;
     std::string toString() const override;
     static const char* getClassName();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
index 8d56b913b31..a8538dabab8 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
@@ -21,6 +21,7 @@
 #include <CLucene/util/bkd/bkd_reader.h>
 
 #include "common/config.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
 #include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
@@ -29,10 +30,10 @@ namespace doris::segment_v2 {
 Status FulltextIndexSearcherBuilder::build(lucene::store::Directory* directory,
                                            OptionalIndexSearcherPtr& 
output_searcher) {
     auto close_directory = true;
-    lucene::index::IndexReader* reader = nullptr;
+    std::unique_ptr<lucene::index::IndexReader> reader;
     try {
-        reader = lucene::index::IndexReader::open(
-                directory, config::inverted_index_read_buffer_size, 
close_directory);
+        reader = 
std::unique_ptr<lucene::index::IndexReader>(lucene::index::IndexReader::open(
+                directory, config::inverted_index_read_buffer_size, 
close_directory));
     } catch (const CLuceneError& e) {
         std::vector<std::string> file_names;
         directory->list(&file_names);
@@ -44,16 +45,15 @@ Status 
FulltextIndexSearcherBuilder::build(lucene::store::Directory* directory,
         return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(msg);
     }
     bool close_reader = true;
-    auto index_searcher = 
std::make_shared<lucene::search::IndexSearcher>(reader, close_reader);
+    reader_size = reader->getTermInfosRAMUsed();
+    auto index_searcher =
+            std::make_shared<lucene::search::IndexSearcher>(reader.release(), 
close_reader);
     if (!index_searcher) {
         output_searcher = std::nullopt;
         return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                 "FulltextIndexSearcherBuilder build index_searcher error.");
     }
-    reader_size = reader->getTermInfosRAMUsed();
-    // NOTE: need to cl_refcount-- here, so that directory will be deleted when
-    // index_searcher is destroyed
-    _CLDECDELETE(directory)
+    // NOTE: IndexSearcher takes ownership of the reader, and directory 
cleanup is handled by caller
     output_searcher = index_searcher;
     return Status::OK();
 }
@@ -69,7 +69,6 @@ Status 
BKDIndexSearcherBuilder::build(lucene::store::Directory* directory,
         }
         reader_size = bkd_reader->ram_bytes_used();
         output_searcher = bkd_reader;
-        _CLDECDELETE(directory)
         return Status::OK();
     } catch (const CLuceneError& e) {
         return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
@@ -104,13 +103,13 @@ Result<std::unique_ptr<IndexSearcherBuilder>> 
IndexSearcherBuilder::create_index
 Result<IndexSearcherPtr> IndexSearcherBuilder::get_index_searcher(
         lucene::store::Directory* directory) {
     OptionalIndexSearcherPtr result;
-    auto st = build(directory, result);
+    std::unique_ptr<lucene::store::Directory, DirectoryDeleter> 
directory_ptr(directory);
+
+    auto st = build(directory_ptr.get(), result);
     if (!st.ok()) {
-        _CLDECDELETE(directory)
         return ResultError(st);
     }
     if (!result.has_value()) {
-        _CLDECDELETE(directory)
         return 
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                 "InvertedIndexSearcherCache build error."));
     }
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 6059d82ef5d..bd704577867 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -425,8 +425,6 @@ public:
     Status add_array_values(size_t field_size, const void* value_ptr,
                             const uint8_t* nested_null_map, const uint8_t* 
offsets_ptr,
                             size_t count) override {
-        
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_count_is_zero",
-                        { count = 0; })
         if (count == 0) {
             // no values to add inverted index
             return Status::OK();
@@ -446,7 +444,7 @@ public:
                 // every single array row element size to go through the 
nullmap & value ptr-array, and also can go through the every row in array to 
keep with _rid++
                 auto array_elem_size = offsets[i + 1] - offsets[i];
                 // TODO(Amory).later we use object pool to avoid field creation
-                lucene::document::Field* new_field = nullptr;
+                std::unique_ptr<lucene::document::Field> new_field;
                 CL_NS(analysis)::TokenStream* ts = nullptr;
                 for (auto j = start_off; j < start_off + array_elem_size; ++j) 
{
                     if (nested_null_map && nested_null_map[j] == 1) {
@@ -460,7 +458,9 @@ public:
                         continue;
                     } else {
                         // now we temp create field . later make a pool
-                        Status st = create_field(&new_field);
+                        lucene::document::Field* tmp_field = nullptr;
+                        Status st = create_field(&tmp_field);
+                        new_field.reset(tmp_field);
                         DBUG_EXECUTE_IF(
                                 
"InvertedIndexColumnWriterImpl::add_array_values_create_field_"
                                 "error",
@@ -488,9 +488,9 @@ public:
                                                         
char_string_reader.release());
                             new_field->setValue(ts, own_token_stream);
                         } else {
-                            new_field_char_value(v->get_data(), v->get_size(), 
new_field);
+                            new_field_char_value(v->get_data(), v->get_size(), 
new_field.get());
                         }
-                        _doc->add(*new_field);
+                        _doc->add(*new_field.release());
                     }
                 }
                 start_off += array_elem_size;
@@ -519,7 +519,9 @@ public:
                     // avoid to add doc which without any field which may make 
threadState init skip
                     // init fieldDataArray, then will make error with next doc 
with fields in
                     // resetCurrentFieldData
-                    Status st = create_field(&new_field);
+                    lucene::document::Field* tmp_field = nullptr;
+                    Status st = create_field(&tmp_field);
+                    new_field.reset(tmp_field);
                     DBUG_EXECUTE_IF(
                             
"InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2",
                             {
@@ -532,7 +534,7 @@ public:
                                 << " error:" << st;
                         return st;
                     }
-                    _doc->add(*new_field);
+                    _doc->add(*new_field.release());
                     RETURN_IF_ERROR(add_null_document());
                     _doc->clear();
                 }
@@ -793,7 +795,7 @@ Status InvertedIndexColumnWriter::create(const Field* field,
     }
 
     
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_unsupported_type_for_inverted_index",
-                    { type = FieldType::OLAP_FIELD_TYPE_FLOAT; })
+                    { type = FieldType::OLAP_FIELD_TYPE_HLL; })
     switch (type) {
 #define M(TYPE)                                                           \
     case TYPE:                                                            \
diff --git 
a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
index 9fbb245c243..52ce8f87a74 100644
--- 
a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
@@ -219,7 +219,6 @@ 
suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") {
         
"InvertedIndexColumnWriterImpl::new_char_token_stream__char_string_reader_init_error",
         "InvertedIndexColumnWriterImpl::add_values_field_is_nullptr",
         "InvertedIndexColumnWriterImpl::add_values_index_writer_is_nullptr",
-        "InvertedIndexColumnWriterImpl::add_array_values_count_is_zero",
         
"InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr",
         "InvertedIndexColumnWriterImpl::add_array_values_create_field_error",
         "InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2",
@@ -262,13 +261,7 @@ 
suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") {
                     GetDebugPoint().enableDebugPointForAllBEs(debug_point)
                     run_insert("${tableName}")
                     check_count("${tableName}", 6)
-                    // if debug_point equals 
InvertedIndexColumnWriterImpl::add_array_values_count_is_zero, 
run_select(false(abnormal))
-                    // else run_select(true(normal))
-                    if (debug_point == 
"InvertedIndexColumnWriterImpl::add_array_values_count_is_zero") {
-                        run_select("${tableName}", false)
-                    } else {
-                        run_select("${tableName}", true)
-                    }
+                    run_select("${tableName}", true)
                     sql "TRUNCATE TABLE ${tableName}"
                 } catch (Exception e) {
                     log.error("Caught exception: ${e}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to