This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a14daca7bac [feature](inverted index)write separated index files in
RAM directory to reduce IO(#28810)
a14daca7bac is described below
commit a14daca7bacd02122d73078ada4ff2677d6d0351
Author: qiye <[email protected]>
AuthorDate: Thu Dec 28 17:18:59 2023 +0800
[feature](inverted index)write separated index files in RAM directory to
reduce IO(#28810)
Normally we write the separate index files to disk before we merge the
index files into an idx compound file.
In high-frequency load scenarios, disk IO can become a bottleneck.
In order to reduce the pressure on the disk, we write the standalone index
file to the RAM directory for the first time, and then write it to the disk
when merging it into a composite file.
Add config `index_inverted_index_by_ram_dir_enable`, default is `false`.
---
be/src/clucene | 2 +-
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/index-tools/index_tool.cpp | 8 +-
be/src/olap/compaction.cpp | 3 +-
.../rowset/segment_v2/inverted_index_cache.cpp | 10 +-
.../segment_v2/inverted_index_compaction.cpp | 18 +-
.../inverted_index_compound_directory.cpp | 315 +++++++++++++++------
.../segment_v2/inverted_index_compound_directory.h | 123 +++++---
.../rowset/segment_v2/inverted_index_reader.cpp | 2 +-
.../rowset/segment_v2/inverted_index_writer.cpp | 12 +-
11 files changed, 358 insertions(+), 139 deletions(-)
diff --git a/be/src/clucene b/be/src/clucene
index d95d6be91ec..df3ab39ca63 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit d95d6be91ecd4e471306caa57b580ba548605962
+Subproject commit df3ab39ca636e58ed1ee640921444f7ef6d6438d
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 42a95302122..b5a16ef0166 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1013,6 +1013,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_Bool(inverted_index_compaction_enable, "false");
+// index by RAM directory
+DEFINE_mBool(inverted_index_ram_dir_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
DEFINE_Int32(num_broadcast_buffer, "32");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d38e059c416..53dc0a2a0d8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1050,6 +1050,8 @@ DECLARE_Int32(inverted_index_read_buffer_size);
DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_Bool(inverted_index_compaction_enable);
+// index by RAM directory
+DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
DECLARE_Int32(num_broadcast_buffer);
diff --git a/be/src/index-tools/index_tool.cpp
b/be/src/index-tools/index_tool.cpp
index c3564491759..cb84e2b9846 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -33,7 +33,7 @@
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
using doris::segment_v2::DorisCompoundReader;
-using doris::segment_v2::DorisCompoundDirectory;
+using doris::segment_v2::DorisCompoundDirectoryFactory;
using doris::io::FileInfo;
using namespace lucene::analysis;
using namespace lucene::index;
@@ -186,7 +186,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+ DorisCompoundDirectoryFactory::getDirectory(fs,
dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::vector<std::string> files;
std::cout << "Nested files for " << file_str << std::endl;
@@ -209,7 +209,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+ DorisCompoundDirectoryFactory::getDirectory(fs,
dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::cout << "Term statistics for " << file_str << std::endl;
std::cout << "==================================" << std::endl;
@@ -226,7 +226,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs,
FLAGS_directory.c_str());
+ DorisCompoundDirectoryFactory::getDirectory(fs,
FLAGS_directory.c_str());
if (FLAGS_idx_file_name == "") {
//try to search from directory's all files
std::vector<FileInfo> files;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a6ad5f6ee52..4ad78367d28 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -603,7 +603,8 @@ Status
Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
std::string dir_str = p.parent_path().string();
std::string file_str = p.filename().string();
lucene::store::Directory* dir =
-
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+
DorisCompoundDirectoryFactory::getDirectory(
+ fs, dir_str.c_str());
DorisCompoundReader reader(dir,
file_str.c_str());
std::vector<std::string> files;
reader.list(&files);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index 0440f6865a4..e88573c1be8 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -178,8 +178,8 @@ Status InvertedIndexSearcherCache::get_index_searcher(
// During the process of opening the index, write the file information
read to the idx file cache.
bool open_idx_file_cache = true;
auto* directory = new DorisCompoundReader(
- DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(),
- config::inverted_index_read_buffer_size, open_idx_file_cache);
+ DorisCompoundDirectoryFactory::getDirectory(fs,
index_dir.c_str()),
+ file_name.c_str(), config::inverted_index_read_buffer_size,
open_idx_file_cache);
auto null_bitmap_file_name =
InvertedIndexDescriptor::get_temporary_null_bitmap_file_name();
if (!directory->fileExists(null_bitmap_file_name.c_str())) {
has_null = false;
@@ -261,9 +261,9 @@ Status InvertedIndexSearcherCache::insert(const
io::FileSystemSPtr& fs,
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"InvertedIndexSearcherCache do not support reader type.");
}
- auto* directory =
- new
DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
- file_name.c_str(),
config::inverted_index_read_buffer_size);
+ auto* directory = new DorisCompoundReader(
+ DorisCompoundDirectoryFactory::getDirectory(fs,
index_dir.c_str()),
+ file_name.c_str(), config::inverted_index_read_buffer_size);
OptionalIndexSearcherPtr result;
RETURN_IF_ERROR(builder->build(directory, result));
if (!result.has_value()) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index ca9d1c8f9a4..677d359f1a4 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -22,8 +22,7 @@
#include "inverted_index_compound_directory.h"
#include "inverted_index_compound_reader.h"
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
Status compact_column(int32_t index_id, int src_segment_num, int
dest_segment_num,
std::vector<std::string> src_index_files,
std::vector<std::string> dest_index_files, const
io::FileSystemSPtr& fs,
@@ -31,7 +30,7 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
std::vector<std::vector<std::pair<uint32_t, uint32_t>>>
trans_vec,
std::vector<uint32_t> dest_segment_num_rows) {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs,
index_writer_path.c_str(), false);
+ DorisCompoundDirectoryFactory::getDirectory(fs,
index_writer_path.c_str());
lucene::analysis::SimpleAnalyzer<char> analyzer;
auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true
/* create */,
true /*
closeDirOnShutdown */);
@@ -42,8 +41,8 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
// format: rowsetId_segmentId_indexId.idx
std::string src_idx_full_name =
src_index_files[i] + "_" + std::to_string(index_id) + ".idx";
- DorisCompoundReader* reader = new DorisCompoundReader(
- DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()),
+ auto* reader = new DorisCompoundReader(
+ DorisCompoundDirectoryFactory::getDirectory(fs,
tablet_path.c_str()),
src_idx_full_name.c_str());
src_index_dirs[i] = reader;
}
@@ -53,7 +52,7 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto path = tablet_path + "/" + dest_index_files[i] + "_" +
std::to_string(index_id);
- dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs,
path.c_str(), true);
+ dest_index_dirs[i] = DorisCompoundDirectoryFactory::getDirectory(fs,
path.c_str(), true);
}
index_writer->indexCompaction(src_index_dirs, dest_index_dirs, trans_vec,
@@ -65,13 +64,13 @@ Status compact_column(int32_t index_id, int
src_segment_num, int dest_segment_nu
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir
will be destroyed.
_CLDECDELETE(dir)
- for (auto d : src_index_dirs) {
+ for (auto* d : src_index_dirs) {
if (d != nullptr) {
d->close();
_CLDELETE(d);
}
}
- for (auto d : dest_index_dirs) {
+ for (auto* d : dest_index_dirs) {
if (d != nullptr) {
// NOTE: DO NOT close dest dir here, because it will be closed
when dest index writer finalize.
//d->close();
@@ -83,5 +82,4 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
static_cast<void>(fs->delete_directory(index_writer_path.c_str()));
return Status::OK();
}
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index 6b1cacee454..d43afd4fc5e 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -18,10 +18,10 @@
#include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
#include "CLucene/SharedHeader.h"
+#include "CLucene/_SharedHeader.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
-#include "io/fs/path.h"
#include "util/debug_points.h"
#include "util/slice.h"
@@ -47,17 +47,14 @@
#include <CLucene/store/RAMDirectory.h>
#include <CLucene/util/Misc.h>
#include <assert.h>
-// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <glog/logging.h>
#include <stdio.h>
#include <string.h>
#include <wchar.h>
-#include <algorithm>
#include <filesystem>
#include <iostream>
-#include <memory>
#include <mutex>
#include <utility>
@@ -76,15 +73,12 @@
LOG(WARNING) << err; \
_CLTHROWA(CL_ERR_IO, err.c_str()); \
}
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
const char* WRITE_LOCK_FILE = "write.lock";
const char* COMPOUND_FILE_EXTENSION = ".idx";
const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k
-bool DorisCompoundDirectory::disableLocks = false;
-
DorisCompoundFileWriter::DorisCompoundFileWriter(CL_NS(store)::Directory* dir)
{
if (dir == nullptr) {
_CLTHROWA(CL_ERR_NullPointer, "directory cannot be null");
@@ -127,7 +121,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
for (auto file : files) {
FileInfo file_info;
file_info.filename = file;
- file_info.filesize =
((DorisCompoundDirectory*)directory)->fileLength(file.c_str());
+ file_info.filesize = directory->fileLength(file.c_str());
sorted_files.emplace_back(std::move(file_info));
}
sort_files(sorted_files);
@@ -139,7 +133,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
std::string idx_name = std::string(cfs_path.stem().c_str()) +
COMPOUND_FILE_EXTENSION;
// write file entries to ram directory to get header length
lucene::store::RAMDirectory ram_dir;
- auto out_idx = ram_dir.createOutput(idx_name.c_str());
+ auto* out_idx = ram_dir.createOutput(idx_name.c_str());
if (out_idx == nullptr) {
LOG(WARNING) << "Write compound file error: RAMDirectory output is
nullptr.";
_CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
@@ -169,9 +163,9 @@ void DorisCompoundFileWriter::writeCompoundFile() {
ram_dir.close();
auto compound_fs =
((DorisCompoundDirectory*)directory)->getCompoundFileSystem();
- auto out_dir = DorisCompoundDirectory::getDirectory(compound_fs,
idx_path.c_str(), false);
+ auto* out_dir = DorisCompoundDirectoryFactory::getDirectory(compound_fs,
idx_path.c_str());
- auto out = out_dir->createOutput(idx_name.c_str());
+ auto* out = out_dir->createOutput(idx_name.c_str());
if (out == nullptr) {
LOG(WARNING) << "Write compound file error: CompoundDirectory output
is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
@@ -277,7 +271,7 @@ bool DorisCompoundDirectory::FSIndexInput::open(const
io::FileSystemSPtr& fs, co
if (buffer_size == -1) {
buffer_size = CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE;
}
- SharedHandle* h = _CLNEW SharedHandle(path);
+ auto* h = _CLNEW SharedHandle(path);
io::FileReaderOptions reader_options;
reader_options.cache_type = config::enable_file_cache ?
io::FileCachePolicy::FILE_BLOCK_CACHE
@@ -519,10 +513,12 @@ DorisCompoundDirectory::DorisCompoundDirectory() {
}
void DorisCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char*
_path,
+ bool use_compound_file_writer,
lucene::store::LockFactory* lock_factory,
const io::FileSystemSPtr& cfs, const char*
cfs_path) {
fs = _fs;
directory = _path;
+ useCompoundFileWriter = use_compound_file_writer;
if (cfs == nullptr) {
compound_fs = fs;
@@ -534,17 +530,12 @@ void DorisCompoundDirectory::init(const
io::FileSystemSPtr& _fs, const char* _pa
} else {
cfs_directory = _path;
}
- bool doClearLockID = false;
if (lock_factory == nullptr) {
lock_factory = _CLNEW lucene::store::NoLockFactory();
}
- setLockFactory(lock_factory);
-
- if (doClearLockID) {
- lockFactory->setLockPrefix(nullptr);
- }
+ lucene::store::Directory::setLockFactory(lock_factory);
// It's fail checking directory existence in S3.
if (fs->type() == io::FileSystemType::S3) {
@@ -559,24 +550,6 @@ void DorisCompoundDirectory::init(const
io::FileSystemSPtr& _fs, const char* _pa
}
}
-void DorisCompoundDirectory::create() {
- std::lock_guard<std::mutex> wlock(_this_lock);
-
- //clear old files
- std::vector<std::string> files;
- lucene::util::Misc::listFiles(directory.c_str(), files, false);
- std::vector<std::string>::iterator itr = files.begin();
- while (itr != files.end()) {
- if (CL_NS(index)::IndexReader::isLuceneFile(itr->c_str())) {
- if (unlink((directory + PATH_DELIMITERA + *itr).c_str()) == -1) {
- _CLTHROWA(CL_ERR_IO, "Couldn't delete file ");
- }
- }
- itr++;
- }
- lockFactory->clearLock(CL_NS(index)::IndexWriter::WRITE_LOCK_NAME);
-}
-
void DorisCompoundDirectory::priv_getFN(char* buffer, const char* name) const {
buffer[0] = 0;
strcpy(buffer, directory.c_str());
@@ -619,45 +592,6 @@ const char* DorisCompoundDirectory::getCfsDirName() const {
return cfs_directory.c_str();
}
-DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(const
io::FileSystemSPtr& fs,
- const char* file,
- bool
use_compound_file_writer,
- const
io::FileSystemSPtr& cfs_fs,
- const char*
cfs_file) {
- DorisCompoundDirectory* dir =
- getDirectory(fs, file, (lucene::store::LockFactory*)nullptr,
cfs_fs, cfs_file);
- dir->useCompoundFileWriter = use_compound_file_writer;
- return dir;
-}
-
-//static
-DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
- const io::FileSystemSPtr& _fs, const char* _file,
lucene::store::LockFactory* lock_factory,
- const io::FileSystemSPtr& _cfs, const char* _cfs_file) {
- const char* cfs_file = _cfs_file;
- if (cfs_file == nullptr) {
- cfs_file = _file;
- }
- DorisCompoundDirectory* dir = nullptr;
- if (!_file || !*_file) {
- _CLTHROWA(CL_ERR_IO, "Invalid directory");
- }
-
- const char* file = _file;
-
- bool exists = false;
- LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory exists
IO error")
- if (!exists) {
- LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
- "Get directory create directory IO error")
- }
-
- dir = _CLNEW DorisCompoundDirectory();
- dir->init(_fs, file, lock_factory, _cfs, cfs_file);
-
- return dir;
-}
-
int64_t DorisCompoundDirectory::fileModified(const char* name) const {
CND_PRECONDITION(directory[0] != 0, "directory is not open");
struct stat buf;
@@ -698,7 +632,7 @@ bool DorisCompoundDirectory::openInput(const char* name,
lucene::store::IndexInp
void DorisCompoundDirectory::close() {
if (useCompoundFileWriter) {
- DorisCompoundFileWriter* cfsWriter = _CLNEW
DorisCompoundFileWriter(this);
+ auto* cfsWriter = _CLNEW DorisCompoundFileWriter(this);
// write compound file
cfsWriter->writeCompoundFile();
// delete index path, which contains separated inverted index files
@@ -754,7 +688,7 @@ lucene::store::IndexOutput*
DorisCompoundDirectory::createOutput(const char* nam
LOG_AND_THROW_IF_ERROR(fs->exists(fl, &exists), "Create output file
exists IO error")
assert(!exists);
}
- auto ret = _CLNEW FSIndexOutput();
+ auto* ret = _CLNEW FSIndexOutput();
try {
ret->init(fs, fl);
} catch (CLuceneError& err) {
@@ -768,5 +702,226 @@ std::string DorisCompoundDirectory::toString() const {
return std::string("DorisCompoundDirectory@") + this->directory;
}
-} // namespace segment_v2
-} // namespace doris
+/**
+ * DorisRAMCompoundDirectory
+ */
+DorisRAMCompoundDirectory::DorisRAMCompoundDirectory() {
+ filesMap = _CLNEW FileMap(true, true);
+ this->sizeInBytes = 0;
+}
+
+DorisRAMCompoundDirectory::~DorisRAMCompoundDirectory() {
+ _CLDELETE(lockFactory);
+ _CLDELETE(filesMap);
+}
+
+void DorisRAMCompoundDirectory::init(const io::FileSystemSPtr& _fs, const
char* _path,
+ bool use_compound_file_writer,
+ lucene::store::LockFactory* lock_factory,
+ const io::FileSystemSPtr& cfs, const
char* cfs_path) {
+ fs = _fs;
+ directory = _path;
+ useCompoundFileWriter = use_compound_file_writer;
+
+ if (cfs == nullptr) {
+ compound_fs = fs;
+ } else {
+ compound_fs = cfs;
+ }
+ if (cfs_path != nullptr) {
+ cfs_directory = cfs_path;
+ } else {
+ cfs_directory = _path;
+ }
+
+ lucene::store::Directory::setLockFactory(_CLNEW
lucene::store::SingleInstanceLockFactory());
+}
+
+bool DorisRAMCompoundDirectory::list(std::vector<std::string>* names) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto itr = filesMap->begin();
+ while (itr != filesMap->end()) {
+ names->emplace_back(itr->first);
+ ++itr;
+ }
+ return true;
+}
+
+bool DorisRAMCompoundDirectory::fileExists(const char* name) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ return filesMap->exists((char*)name);
+}
+
+int64_t DorisRAMCompoundDirectory::fileModified(const char* name) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto* f = filesMap->get((char*)name);
+ return f->getLastModified();
+}
+
+void DorisRAMCompoundDirectory::touchFile(const char* name) {
+ lucene::store::RAMFile* file = nullptr;
+ {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ file = filesMap->get((char*)name);
+ }
+ const uint64_t ts1 = file->getLastModified();
+ uint64_t ts2 = lucene::util::Misc::currentTimeMillis();
+
+ //make sure that the time has actually changed
+ while (ts1 == ts2) {
+ _LUCENE_SLEEP(1);
+ ts2 = lucene::util::Misc::currentTimeMillis();
+ };
+
+ file->setLastModified(ts2);
+}
+
+int64_t DorisRAMCompoundDirectory::fileLength(const char* name) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto* f = filesMap->get((char*)name);
+ return f->getLength();
+}
+
+bool DorisRAMCompoundDirectory::openInput(const char* name,
lucene::store::IndexInput*& ret,
+ CLuceneError& error, int32_t
bufferSize) {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto* file = filesMap->get((char*)name);
+ if (file == nullptr) {
+ error.set(CL_ERR_IO,
+ "[DorisRAMCompoundDirectory::open] The requested file does
not exist.");
+ return false;
+ }
+ ret = _CLNEW lucene::store::RAMInputStream(file);
+ return true;
+}
+
+void DorisRAMCompoundDirectory::close() {
+ // write compound file
+ DorisCompoundDirectory::close();
+
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ filesMap->clear();
+ _CLDELETE(filesMap);
+}
+
+bool DorisRAMCompoundDirectory::doDeleteFile(const char* name) {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto itr = filesMap->find((char*)name);
+ if (itr != filesMap->end()) {
+ SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+ sizeInBytes -= itr->second->sizeInBytes;
+ filesMap->removeitr(itr);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool DorisRAMCompoundDirectory::deleteDirectory() {
+ // do nothing, RAM dir do not have actual files
+ return true;
+}
+
+void DorisRAMCompoundDirectory::renameFile(const char* from, const char* to) {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto itr = filesMap->find((char*)from);
+
+ /* DSR:CL_BUG_LEAK:
+ ** If a file named $to already existed, its old value was leaked.
+ ** My inclination would be to prevent this implicit deletion with an
+ ** exception, but it happens routinely in CLucene's internals (e.g., during
+ ** IndexWriter.addIndexes with the file named 'segments'). */
+ if (filesMap->exists((char*)to)) {
+ auto itr1 = filesMap->find((char*)to);
+ SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+ sizeInBytes -= itr1->second->sizeInBytes;
+ filesMap->removeitr(itr1);
+ }
+ if (itr == filesMap->end()) {
+ char tmp[1024];
+ snprintf(tmp, 1024, "cannot rename %s, file does not exist", from);
+ _CLTHROWT(CL_ERR_IO, tmp);
+ }
+ DCHECK(itr != filesMap->end());
+ auto* file = itr->second;
+ filesMap->removeitr(itr, false, true);
+ filesMap->put(strdup(to), file);
+}
+
+lucene::store::IndexOutput* DorisRAMCompoundDirectory::createOutput(const
char* name) {
+ /* Check the $filesMap VoidMap to see if there was a previous file named
+ ** $name. If so, delete the old RAMFile object, but reuse the existing
+ ** char buffer ($n) that holds the filename. If not, duplicate the
+ ** supplied filename buffer ($name) and pass ownership of that memory ($n)
+ ** to $files. */
+ std::lock_guard<std::mutex> wlock(_this_lock);
+
+ // get the actual pointer to the output name
+ char* n = nullptr;
+ auto itr = filesMap->find(const_cast<char*>(name));
+ if (itr != filesMap->end()) {
+ n = itr->first;
+ lucene::store::RAMFile* rf = itr->second;
+ SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+ sizeInBytes -= rf->sizeInBytes;
+ _CLDELETE(rf);
+ } else {
+ n = STRDUP_AtoA(name);
+ }
+
+ auto* file = _CLNEW lucene::store::RAMFile();
+ (*filesMap)[n] = file;
+
+ return _CLNEW lucene::store::RAMOutputStream(file);
+}
+
+std::string DorisRAMCompoundDirectory::toString() const {
+ return std::string("DorisRAMCompoundDirectory@") + this->directory;
+}
+
+const char* DorisRAMCompoundDirectory::getClassName() {
+ return "DorisRAMCompoundDirectory";
+}
+
+const char* DorisRAMCompoundDirectory::getObjectName() const {
+ return getClassName();
+}
+
+/**
+ * DorisCompoundDirectoryFactory
+ */
+DorisCompoundDirectory* DorisCompoundDirectoryFactory::getDirectory(
+ const io::FileSystemSPtr& _fs, const char* _file, bool
use_compound_file_writer,
+ bool can_use_ram_dir, lucene::store::LockFactory* lock_factory,
+ const io::FileSystemSPtr& _cfs, const char* _cfs_file) {
+ const char* cfs_file = _cfs_file;
+ if (cfs_file == nullptr) {
+ cfs_file = _file;
+ }
+ DorisCompoundDirectory* dir = nullptr;
+ if (!_file || !*_file) {
+ _CLTHROWA(CL_ERR_IO, "Invalid directory");
+ }
+
+ const char* file = _file;
+
+ // Write by RAM directory
+ // 1. only write separated index files, which is can_use_ram_dir = true.
+ // 2. config::inverted_index_ram_dir_enable = true
+ if (config::inverted_index_ram_dir_enable && can_use_ram_dir) {
+ dir = _CLNEW DorisRAMCompoundDirectory();
+ } else {
+ bool exists = false;
+ LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory
exists IO error")
+ if (!exists) {
+ LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
+ "Get directory create directory IO error")
+ }
+ dir = _CLNEW DorisCompoundDirectory();
+ }
+ dir->init(_fs, file, use_compound_file_writer, lock_factory, _cfs,
cfs_file);
+
+ return dir;
+}
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
index 9b88f46645c..a6def0fd2f0 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
@@ -21,6 +21,7 @@
#include <CLucene/store/Directory.h>
#include <CLucene/store/IndexInput.h>
#include <CLucene/store/IndexOutput.h>
+#include <CLucene/store/_RAMDirectory.h>
#include <stdint.h>
#include <string>
@@ -33,15 +34,11 @@
class CLuceneError;
-namespace lucene {
-namespace store {
+namespace lucene::store {
class LockFactory;
-} // namespace store
-} // namespace lucene
+} // namespace lucene::store
-namespace doris {
-
-namespace segment_v2 {
+namespace doris::segment_v2 {
class DorisCompoundFileWriter : LUCENE_BASE {
public:
@@ -69,26 +66,15 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public
lucene::store::Directory {
private:
int filemode;
- std::mutex _this_lock;
-
protected:
- DorisCompoundDirectory();
- virtual void init(const io::FileSystemSPtr& fs, const char* path,
- lucene::store::LockFactory* lock_factory = nullptr,
- const io::FileSystemSPtr& compound_fs = nullptr,
- const char* cfs_path = nullptr);
- void priv_getFN(char* buffer, const char* name) const;
-
-private:
+ mutable std::mutex _this_lock;
io::FileSystemSPtr fs;
io::FileSystemSPtr compound_fs;
std::string directory;
std::string cfs_directory;
- void create();
- static bool disableLocks;
bool useCompoundFileWriter {false};
-protected:
+ void priv_getFN(char* buffer, const char* name) const;
/// Removes an existing file in the directory.
bool doDeleteFile(const char* name) override;
@@ -106,16 +92,6 @@ public:
bool list(std::vector<std::string>* names) const override;
bool fileExists(const char* name) const override;
const char* getCfsDirName() const;
- static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs,
const char* file,
- lucene::store::LockFactory*
lock_factory = nullptr,
- const io::FileSystemSPtr&
cfs_fs = nullptr,
- const char* cfs_file =
nullptr);
-
- static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs,
const char* file,
- bool use_compound_file_writer,
- const io::FileSystemSPtr&
cfs_fs = nullptr,
- const char* cfs_file =
nullptr);
-
int64_t fileModified(const char* name) const override;
int64_t fileLength(const char* name) const override;
bool openInput(const char* name, lucene::store::IndexInput*& ret,
CLuceneError& err,
@@ -127,7 +103,77 @@ public:
std::string toString() const override;
static const char* getClassName();
const char* getObjectName() const override;
- bool deleteDirectory();
+ virtual bool deleteDirectory();
+
+ DorisCompoundDirectory();
+
+ virtual void init(const io::FileSystemSPtr& fs, const char* path, bool
use_compound_file_writer,
+ lucene::store::LockFactory* lock_factory = nullptr,
+ const io::FileSystemSPtr& compound_fs = nullptr,
+ const char* cfs_path = nullptr);
+};
+
+class CLUCENE_EXPORT DorisRAMCompoundDirectory : public DorisCompoundDirectory
{
+protected:
+ using FileMap =
+ lucene::util::CLHashMap<char*, lucene::store::RAMFile*,
lucene::util::Compare::Char,
+ lucene::util::Equals::Char,
lucene::util::Deletor::acArray,
+
lucene::util::Deletor::Object<lucene::store::RAMFile>>;
+
+ // unlike the java Hashtable, FileMap is not synchronized, and all access
must be protected by a lock
+ FileMap* filesMap;
+ void init(const io::FileSystemSPtr& fs, const char* path, bool
use_compound_file_writer,
+ lucene::store::LockFactory* lock_factory = nullptr,
+ const io::FileSystemSPtr& compound_fs = nullptr,
+ const char* cfs_path = nullptr) override;
+
+public:
+ int64_t sizeInBytes;
+
+ /// Returns a null terminated array of strings, one for each file in the
directory.
+ bool list(std::vector<std::string>* names) const override;
+
+ /** Constructs an empty {@link Directory}. */
+ DorisRAMCompoundDirectory();
+
+ ///Destructor - only call this if you are sure the directory
+ ///is not being used anymore. Otherwise use the ref-counting
+ ///facilities of dir->close
+ ~DorisRAMCompoundDirectory() override;
+
+ bool doDeleteFile(const char* name) override;
+
+ bool deleteDirectory() override;
+
+ /// Returns true iff the named file exists in this directory.
+ bool fileExists(const char* name) const override;
+
+ /// Returns the time the named file was last modified.
+ int64_t fileModified(const char* name) const override;
+
+ /// Returns the length in bytes of a file in the directory.
+ int64_t fileLength(const char* name) const override;
+
+ /// Removes an existing file in the directory.
+ void renameFile(const char* from, const char* to) override;
+
+ /** Set the modified time of an existing file to now. */
+ void touchFile(const char* name) override;
+
+ /// Creates a new, empty file in the directory with the given name.
+ /// Returns a stream writing this file.
+ lucene::store::IndexOutput* createOutput(const char* name) override;
+
+ /// Returns a stream reading an existing file.
+ bool openInput(const char* name, lucene::store::IndexInput*& ret,
CLuceneError& error,
+ int32_t bufferSize = -1) override;
+
+ void close() override;
+
+ std::string toString() const override;
+
+ static const char* getClassName();
+ const char* getObjectName() const override;
};
class DorisCompoundDirectory::FSIndexInput : public
lucene::store::BufferedIndexInput {
@@ -180,5 +226,16 @@ protected:
void readInternal(uint8_t* b, const int32_t len) override;
};
-} // namespace segment_v2
-} // namespace doris
+/**
+ * Factory function to create DorisCompoundDirectory
+ */
+class DorisCompoundDirectoryFactory {
+public:
+ static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs,
const char* file,
+ bool use_compound_file_writer
= false,
+ bool can_use_ram_dir = false,
+ lucene::store::LockFactory*
lock_factory = nullptr,
+ const io::FileSystemSPtr&
cfs_fs = nullptr,
+ const char* cfs_file =
nullptr);
+};
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 6928a976fa8..b8406156f82 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -187,7 +187,7 @@ Status
InvertedIndexReader::read_null_bitmap(InvertedIndexQueryCacheHandle* cach
if (!dir) {
dir = new DorisCompoundReader(
- DorisCompoundDirectory::getDirectory(_fs,
index_dir.c_str()),
+ DorisCompoundDirectoryFactory::getDirectory(_fs,
index_dir.c_str()),
index_file_name.c_str(),
config::inverted_index_read_buffer_size);
owned_dir = true;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 13a31e24768..a43049aaa2b 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -169,7 +169,8 @@ public:
}
Status create_index_directory(std::unique_ptr<DorisCompoundDirectory>&
dir) {
- bool create = true;
+ bool use_compound_file_writer = true;
+ bool can_use_ram_dir = true;
auto index_path = InvertedIndexDescriptor::get_temporary_index_path(
_directory + "/" + _segment_file_name, _index_meta->index_id(),
_index_meta->get_index_suffix());
@@ -185,8 +186,8 @@ public:
return Status::InternalError("init_fulltext_index directory
already exists");
}
- dir = std::unique_ptr<DorisCompoundDirectory>(
- DorisCompoundDirectory::getDirectory(_fs, index_path.c_str(),
create));
+ dir =
std::unique_ptr<DorisCompoundDirectory>(DorisCompoundDirectoryFactory::getDirectory(
+ _fs, index_path.c_str(), use_compound_file_writer,
can_use_ram_dir));
return Status::OK();
}
@@ -536,7 +537,10 @@ public:
auto index_path =
InvertedIndexDescriptor::get_temporary_index_path(
_directory + "/" + _segment_file_name,
_index_meta->index_id(),
_index_meta->get_index_suffix());
- dir = DorisCompoundDirectory::getDirectory(_fs,
index_path.c_str(), true);
+ bool use_compound_file_writer = true;
+ bool can_use_ram_dir = true;
+ dir = DorisCompoundDirectoryFactory::getDirectory(
+ _fs, index_path.c_str(), use_compound_file_writer,
can_use_ram_dir);
write_null_bitmap(null_bitmap_out, dir);
_bkd_writer->max_doc_ = _rid;
_bkd_writer->docs_seen_ = _row_ids_seen_for_bkd;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]