This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new cbdb8e25beb branch-3.1: [fix](inverted index) fix memory leak in
inverted index procedure #53235 (#53425)
cbdb8e25beb is described below
commit cbdb8e25beb6966fd6b74d2a19da88b945004158
Author: airborne12 <[email protected]>
AuthorDate: Fri Jul 18 10:45:27 2025 +0800
branch-3.1: [fix](inverted index) fix memory leak in inverted index
procedure #53235 (#53425)
pick and refact from #53235
---
be/src/clucene | 2 +-
.../segment_v2/inverted_index_compaction.cpp | 16 ++--
.../segment_v2/inverted_index_compound_reader.cpp | 98 +++++++++++-----------
.../segment_v2/inverted_index_compound_reader.h | 22 ++---
.../segment_v2/inverted_index_file_reader.cpp | 42 +++++-----
.../rowset/segment_v2/inverted_index_file_reader.h | 7 +-
.../segment_v2/inverted_index_file_writer.cpp | 12 ++-
.../segment_v2/inverted_index_fs_directory.cpp | 39 +++++++--
.../segment_v2/inverted_index_fs_directory.h | 2 +-
.../rowset/segment_v2/inverted_index_searcher.cpp | 23 +++--
.../rowset/segment_v2/inverted_index_writer.cpp | 20 +++--
...inverted_index_exception_fault_injection.groovy | 9 +-
12 files changed, 144 insertions(+), 148 deletions(-)
diff --git a/be/src/clucene b/be/src/clucene
index 4f5449c9037..d6eda26f4dd 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 4f5449c903778fae32884586c728587c24a58806
+Subproject commit d6eda26f4dd87aec21ca5d925727743ad12d0281
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index dcbdca921ab..5f3679aaeae 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -17,6 +17,7 @@
#include "inverted_index_compaction.h"
+#include "inverted_index_common.h"
#include "inverted_index_file_writer.h"
#include "inverted_index_fs_directory.h"
#include "io/fs/local_file_system.h"
@@ -41,15 +42,17 @@ Status compact_column(int64_t index_id,
"debug point: index compaction error");
}
})
+
bool can_use_ram_dir = true;
- lucene::store::Directory* dir = DorisFSDirectoryFactory::getDirectory(
- io::global_local_filesystem(), tmp_path.data(), can_use_ram_dir);
+ std::unique_ptr<lucene::store::Directory, DirectoryDeleter> dir(
+
DorisFSDirectoryFactory::getDirectory(io::global_local_filesystem(),
tmp_path.data(),
+ can_use_ram_dir));
DBUG_EXECUTE_IF("compact_column_getDirectory_error", {
_CLTHROWA(CL_ERR_IO, "debug point: compact_column_getDirectory_error
in index compaction");
})
lucene::analysis::SimpleAnalyzer<char> analyzer;
- auto* index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer,
true /* create */,
- true /*
closeDirOnShutdown */);
+ std::unique_ptr<lucene::index::IndexWriter> index_writer(_CLNEW
lucene::index::IndexWriter(
+ dir.get(), &analyzer, true /* create */, true /*
closeDirOnShutdown */));
DBUG_EXECUTE_IF("compact_column_create_index_writer_error", {
_CLTHROWA(CL_ERR_IO,
"debug point: compact_column_create_index_writer_error in
index compaction");
@@ -71,11 +74,6 @@ Status compact_column(int64_t index_id,
_CLTHROWA(CL_ERR_IO,
"debug point: compact_column_index_writer_close_error in
index compaction");
})
- _CLDELETE(index_writer);
- // NOTE: need to ref_cnt-- for dir,
- // when index_writer is destroyed, if closeDir is set, dir will be close
- // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir
will be destroyed.
- _CLDECDELETE(dir)
// delete temporary segment_path, only when inverted_index_ram_dir_enable
is false
if (!config::inverted_index_ram_dir_enable) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 86efe86ca43..37878689cfd 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -22,10 +22,10 @@
#include <CLucene/debug/mem.h>
#include <CLucene/store/RAMDirectory.h>
#include <CLucene/util/Misc.h>
-#include <stdio.h>
-#include <string.h>
-#include <wchar.h>
+#include <cstdio>
+#include <cstring>
+#include <cwchar>
#include <memory>
#include <utility>
@@ -34,22 +34,13 @@
#include "olap/tablet_schema.h"
#include "util/debug_points.h"
-namespace doris {
-namespace io {
+namespace doris::io {
class FileWriter;
-} // namespace io
-} // namespace doris
+} // namespace doris::io
#define BUFFER_LENGTH 16384
#define CL_MAX_PATH 4096
-
-#define STRDUP_WtoA(x) CL_NS(util)::Misc::_wideToChar(x)
-#define STRDUP_TtoA STRDUP_WtoA
-
-using FileWriterPtr = std::unique_ptr<doris::io::FileWriter>;
-
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
/** Implementation of an IndexInput that reads from a portion of the
* compound file.
@@ -94,7 +85,7 @@ CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base,
const std::string& fi
void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
std::lock_guard wlock(((DorisFSDirectory::FSIndexInput*)base)->_this_lock);
- int64_t start = getFilePointer();
+ auto start = getFilePointer();
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
@@ -153,56 +144,63 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
}
DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
- EntriesType* entries_clone, int32_t
read_buffer_size,
+ const EntriesType& entries_clone,
int32_t read_buffer_size,
const io::IOContext* io_ctx)
: _stream(stream),
- _entries(_CLNEW EntriesType(true, true)),
+ _entries(std::make_unique<EntriesType>()),
_read_buffer_size(read_buffer_size) {
// After stream clone, the io_ctx needs to be reconfigured.
initialize(io_ctx);
- for (auto& e : *entries_clone) {
- auto* origin_entry = e.second;
- auto* entry = _CLNEW ReaderFileEntry();
- char* aid = strdup(e.first);
+ for (const auto& e : entries_clone) {
+ const auto& origin_entry = e.second;
+ auto entry = std::make_unique<ReaderFileEntry>();
entry->file_name = origin_entry->file_name;
entry->offset = origin_entry->offset;
entry->length = origin_entry->length;
- _entries->put(aid, entry);
+ (*_entries)[e.first] = std::move(entry);
}
};
DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
int32_t read_buffer_size,
const io::IOContext* io_ctx)
- : _ram_dir(new lucene::store::RAMDirectory()),
+ : _ram_dir(std::make_unique<lucene::store::RAMDirectory>()),
_stream(stream),
- _entries(_CLNEW EntriesType(true, true)),
+ _entries(std::make_unique<EntriesType>()),
_read_buffer_size(read_buffer_size) {
// After stream clone, the io_ctx needs to be reconfigured.
initialize(io_ctx);
try {
int32_t count = _stream->readVInt();
- ReaderFileEntry* entry = nullptr;
- TCHAR tid[CL_MAX_PATH];
uint8_t buffer[BUFFER_LENGTH];
for (int32_t i = 0; i < count; i++) {
- entry = _CLNEW ReaderFileEntry();
- stream->readString(tid, CL_MAX_PATH);
- char* aid = STRDUP_TtoA(tid);
- entry->file_name = aid;
+ auto entry = std::make_unique<ReaderFileEntry>();
+ // Read the string length first
+ int32_t string_length = stream->readVInt();
+ // Allocate appropriate buffer for the string
+ std::wstring tid;
+ tid.resize(string_length);
+ // Read the string characters directly
+ stream->readChars(tid.data(), 0, string_length);
+ std::string file_name_str(tid.begin(), tid.end());
+ entry->file_name = file_name_str;
entry->offset = stream->readLong();
entry->length = stream->readLong();
+ VLOG_DEBUG << "string_length:" << string_length << " file_name:"
<< entry->file_name
+ << " offset:" << entry->offset << " length:" <<
entry->length;
DBUG_EXECUTE_IF("construct_DorisCompoundReader_failed", {
CLuceneError err;
err.set(CL_ERR_IO, "construct_DorisCompoundReader_failed");
throw err;
})
- _entries->put(aid, entry);
// read header file data
if (entry->offset < 0) {
- copyFile(entry->file_name.c_str(), entry->length, buffer,
BUFFER_LENGTH);
+ //if offset is -1, it means it's size is lower than
DorisFSDirectory::MAX_HEADER_DATA_SIZE, which is 128k.
+ copyFile(entry->file_name.c_str(),
static_cast<int64_t>(entry->length), buffer,
+ BUFFER_LENGTH);
}
+ _entries->emplace(std::move(file_name_str), std::move(entry));
}
} catch (...) {
try {
@@ -212,11 +210,9 @@
DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32
}
if (_entries != nullptr) {
_entries->clear();
- _CLDELETE(_entries);
}
if (_ram_dir) {
_ram_dir->close();
- _CLDELETE(_ram_dir)
}
} catch (CLuceneError& err) {
if (err.number() != CL_ERR_IO) {
@@ -231,11 +227,12 @@ void DorisCompoundReader::copyFile(const char* file,
int64_t file_length, uint8_
int64_t buffer_length) {
std::unique_ptr<lucene::store::IndexOutput>
output(_ram_dir->createOutput(file));
int64_t start_ptr = output->getFilePointer();
- int64_t remainder = file_length;
- int64_t chunk = buffer_length;
+ auto remainder = file_length;
+ auto chunk = buffer_length;
+ auto batch_len = file_length < chunk ? file_length : chunk;
while (remainder > 0) {
- int64_t len = std::min(std::min(chunk, file_length), remainder);
+ auto len = remainder < batch_len ? remainder : batch_len;
_stream->readBytes(buffer, len);
output->writeBytes(buffer, len);
remainder -= len;
@@ -270,7 +267,6 @@ DorisCompoundReader::~DorisCompoundReader() {
LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
}
}
- _CLDELETE(_entries)
}
const char* DorisCompoundReader::getClassName() {
@@ -284,8 +280,8 @@ bool DorisCompoundReader::list(std::vector<std::string>*
names) const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
- for (EntriesType::const_iterator i = _entries->begin(); i !=
_entries->end(); i++) {
- names->push_back(i->first);
+ for (const auto& entry : *_entries) {
+ names->push_back(entry.first);
}
return true;
}
@@ -294,7 +290,7 @@ bool DorisCompoundReader::fileExists(const char* name)
const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
- return _entries->exists((char*)name);
+ return _entries->find(std::string(name)) != _entries->end();
}
int64_t DorisCompoundReader::fileModified(const char* name) const {
@@ -305,15 +301,15 @@ int64_t DorisCompoundReader::fileLength(const char* name)
const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
- ReaderFileEntry* e = _entries->get((char*)name);
- if (e == nullptr) {
+ auto it = _entries->find(std::string(name));
+ if (it == _entries->end()) {
char buf[CL_MAX_PATH + 30];
strcpy(buf, "File ");
strncat(buf, name, CL_MAX_PATH);
strcat(buf, " does not exist");
_CLTHROWA(CL_ERR_IO, buf);
}
- return e->length;
+ return it->second->length;
}
bool DorisCompoundReader::openInput(const char* name,
@@ -338,14 +334,16 @@ bool DorisCompoundReader::openInput(const char* name,
lucene::store::IndexInput*
return false;
}
- const ReaderFileEntry* entry = _entries->get((char*)name);
- if (entry == nullptr) {
+ auto it = _entries->find(std::string(name));
+ if (it == _entries->end()) {
char buf[CL_MAX_PATH + 26];
snprintf(buf, CL_MAX_PATH + 26, "No sub-file with id %s found", name);
error.set(CL_ERR_IO, buf);
return false;
}
+ const auto& entry = it->second;
+
// If file is in RAM, just return.
if (_ram_dir && _ram_dir->fileExists(name)) {
return _ram_dir->openInput(name, ret, error, bufferSize);
@@ -374,7 +372,6 @@ void DorisCompoundReader::close() {
}
if (_ram_dir) {
_ram_dir->close();
- _CLDELETE(_ram_dir)
}
_closed = true;
}
@@ -400,7 +397,7 @@ lucene::store::IndexOutput*
DorisCompoundReader::createOutput(const char* /*name
}
std::string DorisCompoundReader::toString() const {
- return std::string("DorisCompoundReader@");
+ return "DorisCompoundReader@";
}
CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() {
@@ -412,5 +409,4 @@ void DorisCompoundReader::initialize(const io::IOContext*
io_ctx) {
_stream->setIdxFileCache(true);
}
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 4a687e4ed3e..09e6faaeb91 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -24,12 +24,13 @@
#include <CLucene/store/IndexOutput.h>
#include <CLucene/util/Equators.h>
#include <CLucene/util/VoidMap.h>
-#include <stdint.h>
+#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <string>
+#include <unordered_map>
#include <utility>
#include <vector>
@@ -43,9 +44,7 @@ namespace lucene::store {
class RAMDirectory;
} // namespace lucene::store
-namespace doris {
-class TabletIndex;
-namespace segment_v2 {
+namespace doris::segment_v2 {
class ReaderFileEntry : LUCENE_BASE {
public:
@@ -60,16 +59,14 @@ public:
~ReaderFileEntry() override = default;
};
-using EntriesType =
- lucene::util::CLHashMap<char*, ReaderFileEntry*,
lucene::util::Compare::Char,
- lucene::util::Equals::Char,
lucene::util::Deletor::acArray,
-
lucene::util::Deletor::Object<ReaderFileEntry>>;
+using EntriesType = std::unordered_map<std::string,
std::unique_ptr<ReaderFileEntry>>;
+
class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
private:
- lucene::store::RAMDirectory* _ram_dir = nullptr;
+ std::unique_ptr<lucene::store::RAMDirectory> _ram_dir;
CL_NS(store)::IndexInput* _stream = nullptr;
// The life cycle of _entries should be consistent with that of the
DorisCompoundReader.
- EntriesType* _entries = nullptr;
+ std::unique_ptr<EntriesType> _entries;
std::mutex _this_lock;
bool _closed = false;
int32_t _read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE;
@@ -79,7 +76,7 @@ protected:
bool doDeleteFile(const char* name) override;
public:
- DorisCompoundReader(CL_NS(store)::IndexInput* stream, EntriesType*
entries_clone,
+ DorisCompoundReader(CL_NS(store)::IndexInput* stream, const EntriesType&
entries_clone,
int32_t read_buffer_size =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
const io::IOContext* io_ctx = nullptr);
DorisCompoundReader(CL_NS(store)::IndexInput* stream,
@@ -109,5 +106,4 @@ private:
void initialize(const io::IOContext* io_ctx);
};
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index 8dbac071290..9a0fcd43e99 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -84,7 +84,6 @@ Status InvertedIndexFileReader::_init_from(int32_t
read_buffer_size, const io::I
if (version >= InvertedIndexStorageFormatPB::V2) {
DCHECK(version == _storage_format);
int32_t numIndices = _stream->readInt(); // Read number of indices
- ReaderFileEntry* entry = nullptr;
for (int32_t i = 0; i < numIndices; ++i) {
int64_t indexId = _stream->readLong(); // Read index ID
@@ -95,23 +94,19 @@ Status InvertedIndexFileReader::_init_from(int32_t
read_buffer_size, const io::I
int32_t numFiles = _stream->readInt(); // Read number of files
in the index
- // true, true means it will deconstruct key and value
- auto fileEntries = std::make_unique<EntriesType>(true, true);
+ auto fileEntries = std::make_unique<EntriesType>();
+ fileEntries->reserve(numFiles);
for (int32_t j = 0; j < numFiles; ++j) {
- entry = _CLNEW ReaderFileEntry();
-
int32_t file_name_length = _stream->readInt();
- // aid will destruct in EntriesType map.
- char* aid = (char*)malloc(file_name_length + 1);
- _stream->readBytes(reinterpret_cast<uint8_t*>(aid),
file_name_length);
- aid[file_name_length] = '\0';
- //stream->readString(tid, CL_MAX_PATH);
- entry->file_name = std::string(aid, file_name_length);
+ std::string file_name(file_name_length, '\0');
+
_stream->readBytes(reinterpret_cast<uint8_t*>(file_name.data()),
+ file_name_length);
+ auto entry = std::make_unique<ReaderFileEntry>();
+ entry->file_name = std::move(file_name);
entry->offset = _stream->readLong();
entry->length = _stream->readLong();
-
- fileEntries->put(aid, entry);
+ fileEntries->emplace(entry->file_name, std::move(entry));
}
_indices_entries.emplace(std::make_pair(indexId,
std::move(suffix_str)),
@@ -223,8 +218,8 @@ Result<std::unique_ptr<DorisCompoundReader>>
InvertedIndexFileReader::_open(
errMsg.str()));
}
// Need to clone resource here, because index searcher cache need it.
- compound_reader = std::make_unique<DorisCompoundReader>(
- _stream->clone(), index_it->second.get(), _read_buffer_size,
io_ctx);
+ compound_reader =
std::make_unique<DorisCompoundReader>(_stream->clone(), *index_it->second,
+
_read_buffer_size, io_ctx);
}
return compound_reader;
}
@@ -291,13 +286,14 @@ Status InvertedIndexFileReader::has_null(const
TabletIndex* index_meta, bool* re
if (index_it == _indices_entries.end()) {
*res = false;
} else {
- auto* entries = index_it->second.get();
- ReaderFileEntry* e =
-
entries->get((char*)InvertedIndexDescriptor::get_temporary_null_bitmap_file_name());
- if (e == nullptr) {
+ const auto& entries = index_it->second;
+ auto entry_it =
+
entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name());
+ if (entry_it == entries->end()) {
*res = false;
return Status::OK();
}
+ const auto& e = entry_it->second;
// roaring bitmap cookie header size is 5
if (e->length <= 5) {
*res = false;
@@ -310,11 +306,11 @@ Status InvertedIndexFileReader::has_null(const
TabletIndex* index_meta, bool* re
void InvertedIndexFileReader::debug_file_entries() {
std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
- for (auto& index : _indices_entries) {
+ for (const auto& index : _indices_entries) {
LOG(INFO) << "index_id:" << index.first.first;
- auto* index_entries = index.second.get();
- for (auto& entry : (*index_entries)) {
- ReaderFileEntry* file_entry = entry.second;
+ const auto& index_entries = index.second;
+ for (const auto& entry : *index_entries) {
+ const auto& file_entry = entry.second;
LOG(INFO) << "file entry name:" << file_entry->file_name
<< " length:" << file_entry->length << " offset:" <<
file_entry->offset;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
index 63dd89cf975..49a7dd88400 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
@@ -42,11 +42,6 @@ class DorisCompoundReader;
class InvertedIndexFileReader {
public:
- using EntriesType =
- lucene::util::CLHashMap<char*, ReaderFileEntry*,
lucene::util::Compare::Char,
- lucene::util::Equals::Char,
lucene::util::Deletor::acArray,
-
lucene::util::Deletor::Object<ReaderFileEntry>>;
- // Map to hold the file entries for each index ID.
using IndicesEntriesMap =
std::map<std::pair<int64_t, std::string>,
std::unique_ptr<EntriesType>>;
@@ -56,7 +51,7 @@ public:
: _fs(std::move(fs)),
_index_path_prefix(std::move(index_path_prefix)),
_storage_format(storage_format),
- _idx_file_info(idx_file_info) {}
+ _idx_file_info(std::move(idx_file_info)) {}
Status init(int32_t read_buffer_size =
config::inverted_index_read_buffer_size,
const io::IOContext* io_ctx = nullptr);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index e31fb407ec6..e4be9bf64cb 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -262,6 +262,7 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
lucene::store::IndexInput* tmp = nullptr;
CLuceneError err;
auto open = dir->openInput(fileName, tmp, err);
+ std::unique_ptr<lucene::store::IndexInput> input(tmp);
DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_openInput_error", {
open = false;
err.set(CL_ERR_IO, "debug point: copyFile_openInput_error");
@@ -274,7 +275,6 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
throw err;
}
- std::unique_ptr<lucene::store::IndexInput> input(tmp);
int64_t start_ptr = output->getFilePointer();
int64_t length = input->length();
int64_t remainder = length;
@@ -488,15 +488,14 @@ InvertedIndexFileWriter::create_output_stream_v1(int64_t
index_id,
out_dir->set_file_writer_opts(_opts);
std::unique_ptr<lucene::store::Directory, DirectoryDeleter>
out_dir_ptr(out_dir);
- auto* out = out_dir->createOutput(idx_name.c_str());
+ std::unique_ptr<lucene::store::IndexOutput>
output(out_dir->createOutput(idx_name.c_str()));
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
- { out = nullptr; });
- if (out == nullptr) {
+ { output = nullptr; });
+ if (output == nullptr) {
LOG(WARNING) << "InvertedIndexFileWriter::create_output_stream_v1
error: CompoundDirectory "
"output is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
}
- std::unique_ptr<lucene::store::IndexOutput> output(out);
return {std::move(out_dir_ptr), std::move(output)};
}
@@ -546,8 +545,7 @@ InvertedIndexFileWriter::create_output_stream() {
std::unique_ptr<lucene::store::Directory, DirectoryDeleter>
out_dir_ptr(out_dir);
DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is
nullptr";
- auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
- out_dir->createOutputV2(_idx_v2_writer.get()));
+ auto compound_file_output = out_dir->createOutputV2(_idx_v2_writer.get());
return {std::move(out_dir_ptr), std::move(compound_file_output)};
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index c633d29a7fc..7bdca1941fa 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -23,6 +23,7 @@
#include "inverted_index_desc.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
#include "olap/tablet_schema.h"
#include "util/debug_points.h"
#include "util/slice.h"
@@ -119,6 +120,7 @@ public:
bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const
char* path,
IndexInput*& ret, CLuceneError&
error,
int32_t buffer_size, int64_t
file_size) {
+ // no throw error
CND_PRECONDITION(path != nullptr, "path is NULL");
if (buffer_size == -1) {
@@ -704,21 +706,42 @@ lucene::store::IndexOutput*
DorisFSDirectory::createOutput(const char* name) {
assert(!exists);
}
auto* ret = _CLNEW FSIndexOutput();
+ ErrorContext error_context;
ret->set_file_writer_opts(_opts);
try {
ret->init(_fs, fl);
} catch (CLuceneError& err) {
- ret->close();
- _CLDELETE(ret)
- LOG(WARNING) << "FSIndexOutput init error: " << err.what();
- _CLTHROWA(CL_ERR_IO, "FSIndexOutput init error");
- }
+ error_context.eptr = std::current_exception();
+ error_context.err_msg.append("FSIndexOutput init error: ");
+ error_context.err_msg.append(err.what());
+ LOG(ERROR) << error_context.err_msg;
+ }
+ FINALLY_EXCEPTION({
+ if (error_context.eptr) {
+ FINALLY_CLOSE(ret);
+ _CLDELETE(ret);
+ }
+ })
return ret;
}
-lucene::store::IndexOutput* DorisFSDirectory::createOutputV2(io::FileWriter*
file_writer) {
- auto* ret = _CLNEW FSIndexOutputV2();
- ret->init(file_writer);
+std::unique_ptr<lucene::store::IndexOutput> DorisFSDirectory::createOutputV2(
+ io::FileWriter* file_writer) {
+ auto ret = std::make_unique<FSIndexOutputV2>();
+ ErrorContext error_context;
+ try {
+ ret->init(file_writer);
+ } catch (CLuceneError& err) {
+ error_context.eptr = std::current_exception();
+ error_context.err_msg.append("FSIndexOutputV2 init error: ");
+ error_context.err_msg.append(err.what());
+ LOG(ERROR) << error_context.err_msg;
+ }
+ FINALLY_EXCEPTION({
+ if (error_context.eptr) {
+ FINALLY_CLOSE(ret);
+ }
+ })
return ret;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
index 0bba5b49756..60e3a132aa4 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
@@ -79,7 +79,7 @@ public:
void renameFile(const char* from, const char* to) override;
void touchFile(const char* name) override;
lucene::store::IndexOutput* createOutput(const char* name) override;
- lucene::store::IndexOutput* createOutputV2(io::FileWriter* file_writer);
+ std::unique_ptr<lucene::store::IndexOutput> createOutputV2(io::FileWriter*
file_writer);
void close() override;
std::string toString() const override;
static const char* getClassName();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
index 8d56b913b31..a8538dabab8 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp
@@ -21,6 +21,7 @@
#include <CLucene/util/bkd/bkd_reader.h>
#include "common/config.h"
+#include "olap/rowset/segment_v2/inverted_index_common.h"
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
@@ -29,10 +30,10 @@ namespace doris::segment_v2 {
Status FulltextIndexSearcherBuilder::build(lucene::store::Directory* directory,
OptionalIndexSearcherPtr&
output_searcher) {
auto close_directory = true;
- lucene::index::IndexReader* reader = nullptr;
+ std::unique_ptr<lucene::index::IndexReader> reader;
try {
- reader = lucene::index::IndexReader::open(
- directory, config::inverted_index_read_buffer_size,
close_directory);
+ reader =
std::unique_ptr<lucene::index::IndexReader>(lucene::index::IndexReader::open(
+ directory, config::inverted_index_read_buffer_size,
close_directory));
} catch (const CLuceneError& e) {
std::vector<std::string> file_names;
directory->list(&file_names);
@@ -44,16 +45,15 @@ Status
FulltextIndexSearcherBuilder::build(lucene::store::Directory* directory,
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(msg);
}
bool close_reader = true;
- auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(reader, close_reader);
+ reader_size = reader->getTermInfosRAMUsed();
+ auto index_searcher =
+ std::make_shared<lucene::search::IndexSearcher>(reader.release(),
close_reader);
if (!index_searcher) {
output_searcher = std::nullopt;
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"FulltextIndexSearcherBuilder build index_searcher error.");
}
- reader_size = reader->getTermInfosRAMUsed();
- // NOTE: need to cl_refcount-- here, so that directory will be deleted when
- // index_searcher is destroyed
- _CLDECDELETE(directory)
+ // NOTE: IndexSearcher takes ownership of the reader, and directory
cleanup is handled by caller
output_searcher = index_searcher;
return Status::OK();
}
@@ -69,7 +69,6 @@ Status
BKDIndexSearcherBuilder::build(lucene::store::Directory* directory,
}
reader_size = bkd_reader->ram_bytes_used();
output_searcher = bkd_reader;
- _CLDECDELETE(directory)
return Status::OK();
} catch (const CLuceneError& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
@@ -104,13 +103,13 @@ Result<std::unique_ptr<IndexSearcherBuilder>>
IndexSearcherBuilder::create_index
Result<IndexSearcherPtr> IndexSearcherBuilder::get_index_searcher(
lucene::store::Directory* directory) {
OptionalIndexSearcherPtr result;
- auto st = build(directory, result);
+ std::unique_ptr<lucene::store::Directory, DirectoryDeleter>
directory_ptr(directory);
+
+ auto st = build(directory_ptr.get(), result);
if (!st.ok()) {
- _CLDECDELETE(directory)
return ResultError(st);
}
if (!result.has_value()) {
- _CLDECDELETE(directory)
return
ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"InvertedIndexSearcherCache build error."));
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 6059d82ef5d..bd704577867 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -425,8 +425,6 @@ public:
Status add_array_values(size_t field_size, const void* value_ptr,
const uint8_t* nested_null_map, const uint8_t*
offsets_ptr,
size_t count) override {
-
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_count_is_zero",
- { count = 0; })
if (count == 0) {
// no values to add inverted index
return Status::OK();
@@ -446,7 +444,7 @@ public:
// every single array row element size to go through the
nullmap & value ptr-array, and also can go through the every row in array to
keep with _rid++
auto array_elem_size = offsets[i + 1] - offsets[i];
// TODO(Amory).later we use object pool to avoid field creation
- lucene::document::Field* new_field = nullptr;
+ std::unique_ptr<lucene::document::Field> new_field;
CL_NS(analysis)::TokenStream* ts = nullptr;
for (auto j = start_off; j < start_off + array_elem_size; ++j)
{
if (nested_null_map && nested_null_map[j] == 1) {
@@ -460,7 +458,9 @@ public:
continue;
} else {
// now we temp create field . later make a pool
- Status st = create_field(&new_field);
+ lucene::document::Field* tmp_field = nullptr;
+ Status st = create_field(&tmp_field);
+ new_field.reset(tmp_field);
DBUG_EXECUTE_IF(
"InvertedIndexColumnWriterImpl::add_array_values_create_field_"
"error",
@@ -488,9 +488,9 @@ public:
char_string_reader.release());
new_field->setValue(ts, own_token_stream);
} else {
- new_field_char_value(v->get_data(), v->get_size(),
new_field);
+ new_field_char_value(v->get_data(), v->get_size(),
new_field.get());
}
- _doc->add(*new_field);
+ _doc->add(*new_field.release());
}
}
start_off += array_elem_size;
@@ -519,7 +519,9 @@ public:
// avoid to add doc which without any field which may make
threadState init skip
// init fieldDataArray, then will make error with next doc
with fields in
// resetCurrentFieldData
- Status st = create_field(&new_field);
+ lucene::document::Field* tmp_field = nullptr;
+ Status st = create_field(&tmp_field);
+ new_field.reset(tmp_field);
DBUG_EXECUTE_IF(
"InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2",
{
@@ -532,7 +534,7 @@ public:
<< " error:" << st;
return st;
}
- _doc->add(*new_field);
+ _doc->add(*new_field.release());
RETURN_IF_ERROR(add_null_document());
_doc->clear();
}
@@ -793,7 +795,7 @@ Status InvertedIndexColumnWriter::create(const Field* field,
}
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_unsupported_type_for_inverted_index",
- { type = FieldType::OLAP_FIELD_TYPE_FLOAT; })
+ { type = FieldType::OLAP_FIELD_TYPE_HLL; })
switch (type) {
#define M(TYPE) \
case TYPE: \
diff --git
a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
index 9fbb245c243..52ce8f87a74 100644
---
a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
@@ -219,7 +219,6 @@
suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") {
"InvertedIndexColumnWriterImpl::new_char_token_stream__char_string_reader_init_error",
"InvertedIndexColumnWriterImpl::add_values_field_is_nullptr",
"InvertedIndexColumnWriterImpl::add_values_index_writer_is_nullptr",
- "InvertedIndexColumnWriterImpl::add_array_values_count_is_zero",
"InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr",
"InvertedIndexColumnWriterImpl::add_array_values_create_field_error",
"InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2",
@@ -262,13 +261,7 @@
suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") {
GetDebugPoint().enableDebugPointForAllBEs(debug_point)
run_insert("${tableName}")
check_count("${tableName}", 6)
- // if debug_point equals
InvertedIndexColumnWriterImpl::add_array_values_count_is_zero,
run_select(false(abnormal))
- // else run_select(true(normal))
- if (debug_point ==
"InvertedIndexColumnWriterImpl::add_array_values_count_is_zero") {
- run_select("${tableName}", false)
- } else {
- run_select("${tableName}", true)
- }
+ run_select("${tableName}", true)
sql "TRUNCATE TABLE ${tableName}"
} catch (Exception e) {
log.error("Caught exception: ${e}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]