This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 0186f7313b2 [feature](inverted index)write separated index files in
RAM directory to reduce IO #28810 (#29305)
0186f7313b2 is described below
commit 0186f7313b249a842f3df7ac685ce0bbd16b2db1
Author: qiye <[email protected]>
AuthorDate: Sat Dec 30 09:08:22 2023 +0800
[feature](inverted index)write separated index files in RAM directory to
reduce IO #28810 (#29305)
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/index-tools/index_tool.cpp | 8 +-
be/src/olap/compaction.cpp | 3 +-
.../rowset/segment_v2/inverted_index_cache.cpp | 8 +-
.../segment_v2/inverted_index_compaction.cpp | 18 +-
.../inverted_index_compound_directory.cpp | 325 +++++++++++++++------
.../segment_v2/inverted_index_compound_directory.h | 131 ++++++---
.../rowset/segment_v2/inverted_index_reader.cpp | 6 +-
.../rowset/segment_v2/inverted_index_writer.cpp | 15 +-
10 files changed, 369 insertions(+), 149 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index af08d2deaab..a762d0de392 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1007,6 +1007,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_mBool(inverted_index_compaction_enable, "false");
+// index by RAM directory
+DEFINE_mBool(inverted_index_ram_dir_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
DEFINE_Int32(num_broadcast_buffer, "32");
// semi-structure configs
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4fc3bc8dbfa..f71aad99ab3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1042,6 +1042,8 @@ DECLARE_mInt32(inverted_index_max_buffered_docs);
DECLARE_Int32(max_depth_in_bkd_tree);
// index compaction
DECLARE_mBool(inverted_index_compaction_enable);
+// index by RAM directory
+DECLARE_mBool(inverted_index_ram_dir_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
DECLARE_Int32(num_broadcast_buffer);
// semi-structure configs
diff --git a/be/src/index-tools/index_tool.cpp
b/be/src/index-tools/index_tool.cpp
index 9cd72795ef9..a5ecf9b996f 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -30,7 +30,7 @@
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
using doris::segment_v2::DorisCompoundReader;
-using doris::segment_v2::DorisCompoundDirectory;
+using doris::segment_v2::DorisCompoundDirectoryFactory;
using doris::io::FileInfo;
using namespace lucene::analysis;
using namespace lucene::index;
@@ -150,7 +150,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+ DorisCompoundDirectoryFactory::getDirectory(fs,
dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::vector<std::string> files;
std::cout << "Nested files for " << file_str << std::endl;
@@ -173,7 +173,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+ DorisCompoundDirectoryFactory::getDirectory(fs,
dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
std::cout << "Term statistics for " << file_str << std::endl;
std::cout << "==================================" << std::endl;
@@ -190,7 +190,7 @@ int main(int argc, char** argv) {
auto fs = doris::io::global_local_filesystem();
try {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs,
FLAGS_directory.c_str());
+ DorisCompoundDirectoryFactory::getDirectory(fs,
FLAGS_directory.c_str());
if (FLAGS_idx_file_name == "") {
//try to search from directory's all files
std::vector<FileInfo> files;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f6c8b3bb5bf..2cbd354b6de 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -633,7 +633,8 @@ Status
Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
std::string dir_str = p.parent_path().string();
std::string file_str = p.filename().string();
lucene::store::Directory* dir =
-
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+
DorisCompoundDirectoryFactory::getDirectory(
+ fs, dir_str.c_str());
DorisCompoundReader reader(dir,
file_str.c_str());
std::vector<std::string> files;
reader.list(&files);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index f3c68984ebb..0804fcca2b2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -43,9 +43,9 @@ InvertedIndexSearcherCache*
InvertedIndexSearcherCache::_s_instance = nullptr;
IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const
io::FileSystemSPtr& fs,
const
std::string& index_dir,
const
std::string& file_name) {
- DorisCompoundReader* directory =
- new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs,
index_dir.c_str()),
- file_name.c_str(),
config::inverted_index_read_buffer_size);
+ DorisCompoundReader* directory = new DorisCompoundReader(
+ DorisCompoundDirectoryFactory::getDirectory(fs,
index_dir.c_str()), file_name.c_str(),
+ config::inverted_index_read_buffer_size);
auto closeDirectory = true;
auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(directory,
closeDirectory);
@@ -190,7 +190,7 @@ int64_t InvertedIndexSearcherCache::mem_consumption() {
bool InvertedIndexSearcherCache::_lookup(const
InvertedIndexSearcherCache::CacheKey& key,
InvertedIndexCacheHandle* handle) {
- auto lru_handle = _cache->lookup(key.index_file_path);
+ auto* lru_handle = _cache->lookup(key.index_file_path);
if (lru_handle == nullptr) {
return false;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index b3a28c6ebfc..8458de9e7e3 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -22,8 +22,7 @@
#include "inverted_index_compound_directory.h"
#include "inverted_index_compound_reader.h"
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
Status compact_column(int32_t index_id, int src_segment_num, int
dest_segment_num,
std::vector<std::string> src_index_files,
std::vector<std::string> dest_index_files, const
io::FileSystemSPtr& fs,
@@ -31,7 +30,7 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
std::vector<std::vector<std::pair<uint32_t, uint32_t>>>
trans_vec,
std::vector<uint32_t> dest_segment_num_rows) {
lucene::store::Directory* dir =
- DorisCompoundDirectory::getDirectory(fs,
index_writer_path.c_str(), false);
+ DorisCompoundDirectoryFactory::getDirectory(fs,
index_writer_path.c_str());
lucene::analysis::SimpleAnalyzer<char> analyzer;
auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true
/* create */,
true /*
closeDirOnShutdown */);
@@ -42,8 +41,8 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
// format: rowsetId_segmentId_indexId.idx
std::string src_idx_full_name =
src_index_files[i] + "_" + std::to_string(index_id) + ".idx";
- DorisCompoundReader* reader = new DorisCompoundReader(
- DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()),
+ auto* reader = new DorisCompoundReader(
+ DorisCompoundDirectoryFactory::getDirectory(fs,
tablet_path.c_str()),
src_idx_full_name.c_str());
src_index_dirs[i] = reader;
}
@@ -53,7 +52,7 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
for (int i = 0; i < dest_segment_num; ++i) {
// format: rowsetId_segmentId_columnId
auto path = tablet_path + "/" + dest_index_files[i] + "_" +
std::to_string(index_id);
- dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs,
path.c_str(), true);
+ dest_index_dirs[i] = DorisCompoundDirectoryFactory::getDirectory(fs,
path.c_str(), true);
}
DCHECK_EQ(src_index_dirs.size(), trans_vec.size());
@@ -66,13 +65,13 @@ Status compact_column(int32_t index_id, int
src_segment_num, int dest_segment_nu
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir
will be destroyed.
_CLDECDELETE(dir)
- for (auto d : src_index_dirs) {
+ for (auto* d : src_index_dirs) {
if (d != nullptr) {
d->close();
_CLDELETE(d);
}
}
- for (auto d : dest_index_dirs) {
+ for (auto* d : dest_index_dirs) {
if (d != nullptr) {
// NOTE: DO NOT close dest dir here, because it will be closed
when dest index writer finalize.
//d->close();
@@ -84,5 +83,4 @@ Status compact_column(int32_t index_id, int src_segment_num,
int dest_segment_nu
fs->delete_directory(index_writer_path.c_str());
return Status::OK();
}
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index cba340a26f4..0796102e908 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -18,10 +18,10 @@
#include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
#include "CLucene/SharedHeader.h"
+#include "CLucene/_SharedHeader.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
-#include "io/fs/path.h"
#include "util/debug_points.h"
#include "util/slice.h"
@@ -47,17 +47,14 @@
#include <CLucene/store/RAMDirectory.h>
#include <CLucene/util/Misc.h>
#include <assert.h>
-// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <glog/logging.h>
#include <stdio.h>
#include <string.h>
#include <wchar.h>
-#include <algorithm>
#include <filesystem>
#include <iostream>
-#include <memory>
#include <mutex>
#include <utility>
@@ -76,15 +73,12 @@
LOG(WARNING) << err; \
_CLTHROWA(CL_ERR_IO, err.c_str()); \
}
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
const char* WRITE_LOCK_FILE = "write.lock";
const char* COMPOUND_FILE_EXTENSION = ".idx";
const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k
-bool DorisCompoundDirectory::disableLocks = false;
-
DorisCompoundFileWriter::DorisCompoundFileWriter(CL_NS(store)::Directory* dir)
{
if (dir == nullptr) {
_CLTHROWA(CL_ERR_NullPointer, "directory cannot be null");
@@ -123,7 +117,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
std::string idx_name = std::string(cfs_path.stem().c_str()) +
COMPOUND_FILE_EXTENSION;
// write file entries to ram directory to get header length
lucene::store::RAMDirectory ram_dir;
- auto out_idx = ram_dir.createOutput(idx_name.c_str());
+ auto* out_idx = ram_dir.createOutput(idx_name.c_str());
if (out_idx == nullptr) {
LOG(WARNING) << "Write compound file error: RAMDirectory output is
nullptr.";
_CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
@@ -153,9 +147,9 @@ void DorisCompoundFileWriter::writeCompoundFile() {
ram_dir.close();
auto compound_fs =
((DorisCompoundDirectory*)directory)->getCompoundFileSystem();
- auto out_dir = DorisCompoundDirectory::getDirectory(compound_fs,
idx_path.c_str(), false);
+ auto* out_dir = DorisCompoundDirectoryFactory::getDirectory(compound_fs,
idx_path.c_str());
- auto out = out_dir->createOutput(idx_name.c_str());
+ auto* out = out_dir->createOutput(idx_name.c_str());
if (out == nullptr) {
LOG(WARNING) << "Write compound file error: CompoundDirectory output
is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
@@ -261,7 +255,7 @@ bool DorisCompoundDirectory::FSIndexInput::open(const
io::FileSystemSPtr& fs, co
if (buffer_size == -1) {
buffer_size = CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE;
}
- SharedHandle* h = _CLNEW SharedHandle(path);
+ auto* h = _CLNEW SharedHandle(path);
if (!fs->open_file(path, &h->_reader).ok()) {
error.set(CL_ERR_IO, "open file error");
@@ -298,7 +292,7 @@ DorisCompoundDirectory::FSIndexInput::FSIndexInput(const
FSIndexInput& other)
_CLTHROWA(CL_ERR_NullPointer, "other handle is null");
}
- std::lock_guard<doris::Mutex> wlock(*other._handle->_shared_lock);
+ std::lock_guard<std::mutex> wlock(*other._handle->_shared_lock);
_handle = _CL_POINTER(other._handle);
_pos = other._handle->_fpos; //note where we are currently...
}
@@ -307,7 +301,7 @@
DorisCompoundDirectory::FSIndexInput::SharedHandle::SharedHandle(const char* pat
_length = 0;
_fpos = 0;
strcpy(this->path, path);
- _shared_lock = new doris::Mutex();
+ _shared_lock = new std::mutex();
}
DorisCompoundDirectory::FSIndexInput::SharedHandle::~SharedHandle() {
@@ -328,10 +322,10 @@ lucene::store::IndexInput*
DorisCompoundDirectory::FSIndexInput::clone() const {
void DorisCompoundDirectory::FSIndexInput::close() {
BufferedIndexInput::close();
if (_handle != nullptr) {
- doris::Mutex* lock = _handle->_shared_lock;
+ std::mutex* lock = _handle->_shared_lock;
bool ref = false;
{
- std::lock_guard<doris::Mutex> wlock(*lock);
+ std::lock_guard<std::mutex> wlock(*lock);
//determine if we are about to delete the handle...
ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
//decdelete (deletes if refcount is down to 0
@@ -354,7 +348,7 @@ void
DorisCompoundDirectory::FSIndexInput::seekInternal(const int64_t position)
void DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const
int32_t len) {
CND_PRECONDITION(_handle != nullptr, "shared file handle has closed");
CND_PRECONDITION(_handle->_reader != nullptr, "file is not open");
- std::lock_guard<doris::Mutex> wlock(*_handle->_shared_lock);
+ std::lock_guard<std::mutex> wlock(*_handle->_shared_lock);
int64_t position = getFilePointer();
if (_pos != position) {
@@ -498,10 +492,12 @@ DorisCompoundDirectory::DorisCompoundDirectory() {
}
void DorisCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char*
_path,
+ bool use_compound_file_writer,
lucene::store::LockFactory* lock_factory,
const io::FileSystemSPtr& cfs, const char*
cfs_path) {
fs = _fs;
directory = _path;
+ useCompoundFileWriter = use_compound_file_writer;
if (cfs == nullptr) {
compound_fs = fs;
@@ -513,17 +509,12 @@ void DorisCompoundDirectory::init(const
io::FileSystemSPtr& _fs, const char* _pa
} else {
cfs_directory = _path;
}
- bool doClearLockID = false;
if (lock_factory == nullptr) {
lock_factory = _CLNEW lucene::store::NoLockFactory();
}
- setLockFactory(lock_factory);
-
- if (doClearLockID) {
- lockFactory->setLockPrefix(nullptr);
- }
+ lucene::store::Directory::setLockFactory(lock_factory);
// It's fail checking directory existence in S3.
if (fs->type() == io::FileSystemType::S3) {
@@ -538,24 +529,6 @@ void DorisCompoundDirectory::init(const
io::FileSystemSPtr& _fs, const char* _pa
}
}
-void DorisCompoundDirectory::create() {
- std::lock_guard<doris::Mutex> wlock(_this_lock);
-
- //clear old files
- std::vector<std::string> files;
- lucene::util::Misc::listFiles(directory.c_str(), files, false);
- std::vector<std::string>::iterator itr = files.begin();
- while (itr != files.end()) {
- if (CL_NS(index)::IndexReader::isLuceneFile(itr->c_str())) {
- if (unlink((directory + PATH_DELIMITERA + *itr).c_str()) == -1) {
- _CLTHROWA(CL_ERR_IO, "Couldn't delete file ");
- }
- }
- itr++;
- }
- lockFactory->clearLock(CL_NS(index)::IndexWriter::WRITE_LOCK_NAME);
-}
-
void DorisCompoundDirectory::priv_getFN(char* buffer, const char* name) const {
buffer[0] = 0;
strcpy(buffer, directory.c_str());
@@ -598,45 +571,6 @@ const char* DorisCompoundDirectory::getCfsDirName() const {
return cfs_directory.c_str();
}
-DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(const
io::FileSystemSPtr& fs,
- const char* file,
- bool
use_compound_file_writer,
- const
io::FileSystemSPtr& cfs_fs,
- const char*
cfs_file) {
- DorisCompoundDirectory* dir =
- getDirectory(fs, file, (lucene::store::LockFactory*)nullptr,
cfs_fs, cfs_file);
- dir->useCompoundFileWriter = use_compound_file_writer;
- return dir;
-}
-
-//static
-DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
- const io::FileSystemSPtr& _fs, const char* _file,
lucene::store::LockFactory* lock_factory,
- const io::FileSystemSPtr& _cfs, const char* _cfs_file) {
- const char* cfs_file = _cfs_file;
- if (cfs_file == nullptr) {
- cfs_file = _file;
- }
- DorisCompoundDirectory* dir = nullptr;
- if (!_file || !*_file) {
- _CLTHROWA(CL_ERR_IO, "Invalid directory");
- }
-
- const char* file = _file;
-
- bool exists = false;
- LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory exists
IO error")
- if (!exists) {
- LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
- "Get directory create directory IO error")
- }
-
- dir = _CLNEW DorisCompoundDirectory();
- dir->init(_fs, file, lock_factory, _cfs, cfs_file);
-
- return dir;
-}
-
int64_t DorisCompoundDirectory::fileModified(const char* name) const {
CND_PRECONDITION(directory[0] != 0, "directory is not open");
struct stat buf;
@@ -677,7 +611,7 @@ bool DorisCompoundDirectory::openInput(const char* name,
lucene::store::IndexInp
void DorisCompoundDirectory::close() {
if (useCompoundFileWriter) {
- DorisCompoundFileWriter* cfsWriter = _CLNEW
DorisCompoundFileWriter(this);
+ auto* cfsWriter = _CLNEW DorisCompoundFileWriter(this);
// write compound file
cfsWriter->writeCompoundFile();
// delete index path, which contains separated inverted index files
@@ -705,7 +639,7 @@ bool DorisCompoundDirectory::deleteDirectory() {
void DorisCompoundDirectory::renameFile(const char* from, const char* to) {
CND_PRECONDITION(directory[0] != 0, "directory is not open");
- std::lock_guard<doris::Mutex> wlock(_this_lock);
+ std::lock_guard<std::mutex> wlock(_this_lock);
char old[CL_MAX_DIR];
priv_getFN(old, from);
@@ -733,7 +667,7 @@ lucene::store::IndexOutput*
DorisCompoundDirectory::createOutput(const char* nam
LOG_AND_THROW_IF_ERROR(fs->exists(fl, &exists), "Create output file
exists IO error")
assert(!exists);
}
- auto ret = _CLNEW FSIndexOutput();
+ auto* ret = _CLNEW FSIndexOutput();
try {
ret->init(fs, fl);
} catch (CLuceneError& err) {
@@ -747,5 +681,226 @@ std::string DorisCompoundDirectory::toString() const {
return std::string("DorisCompoundDirectory@") + this->directory;
}
-} // namespace segment_v2
-} // namespace doris
+/**
+ * DorisRAMCompoundDirectory
+ */
+DorisRAMCompoundDirectory::DorisRAMCompoundDirectory() {
+ filesMap = _CLNEW FileMap(true, true);
+ this->sizeInBytes = 0;
+}
+
+DorisRAMCompoundDirectory::~DorisRAMCompoundDirectory() {
+ _CLDELETE(lockFactory);
+ _CLDELETE(filesMap);
+}
+
+void DorisRAMCompoundDirectory::init(const io::FileSystemSPtr& _fs, const
char* _path,
+ bool use_compound_file_writer,
+ lucene::store::LockFactory* lock_factory,
+ const io::FileSystemSPtr& cfs, const
char* cfs_path) {
+ fs = _fs;
+ directory = _path;
+ useCompoundFileWriter = use_compound_file_writer;
+
+ if (cfs == nullptr) {
+ compound_fs = fs;
+ } else {
+ compound_fs = cfs;
+ }
+ if (cfs_path != nullptr) {
+ cfs_directory = cfs_path;
+ } else {
+ cfs_directory = _path;
+ }
+
+ lucene::store::Directory::setLockFactory(_CLNEW
lucene::store::SingleInstanceLockFactory());
+}
+
+bool DorisRAMCompoundDirectory::list(std::vector<std::string>* names) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto itr = filesMap->begin();
+ while (itr != filesMap->end()) {
+ names->emplace_back(itr->first);
+ ++itr;
+ }
+ return true;
+}
+
+bool DorisRAMCompoundDirectory::fileExists(const char* name) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ return filesMap->exists((char*)name);
+}
+
+int64_t DorisRAMCompoundDirectory::fileModified(const char* name) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto* f = filesMap->get((char*)name);
+ return f->getLastModified();
+}
+
+void DorisRAMCompoundDirectory::touchFile(const char* name) {
+ lucene::store::RAMFile* file = nullptr;
+ {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ file = filesMap->get((char*)name);
+ }
+ const uint64_t ts1 = file->getLastModified();
+ uint64_t ts2 = lucene::util::Misc::currentTimeMillis();
+
+ //make sure that the time has actually changed
+ while (ts1 == ts2) {
+ _LUCENE_SLEEP(1);
+ ts2 = lucene::util::Misc::currentTimeMillis();
+ };
+
+ file->setLastModified(ts2);
+}
+
+int64_t DorisRAMCompoundDirectory::fileLength(const char* name) const {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto* f = filesMap->get((char*)name);
+ return f->getLength();
+}
+
+bool DorisRAMCompoundDirectory::openInput(const char* name,
lucene::store::IndexInput*& ret,
+ CLuceneError& error, int32_t
bufferSize) {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto* file = filesMap->get((char*)name);
+ if (file == nullptr) {
+ error.set(CL_ERR_IO,
+ "[DorisRAMCompoundDirectory::open] The requested file does
not exist.");
+ return false;
+ }
+ ret = _CLNEW lucene::store::RAMInputStream(file);
+ return true;
+}
+
+void DorisRAMCompoundDirectory::close() {
+ // write compound file
+ DorisCompoundDirectory::close();
+
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ filesMap->clear();
+ _CLDELETE(filesMap);
+}
+
+bool DorisRAMCompoundDirectory::doDeleteFile(const char* name) {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto itr = filesMap->find((char*)name);
+ if (itr != filesMap->end()) {
+ SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+ sizeInBytes -= itr->second->sizeInBytes;
+ filesMap->removeitr(itr);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool DorisRAMCompoundDirectory::deleteDirectory() {
+ // do nothing, RAM dir do not have actual files
+ return true;
+}
+
+void DorisRAMCompoundDirectory::renameFile(const char* from, const char* to) {
+ std::lock_guard<std::mutex> wlock(_this_lock);
+ auto itr = filesMap->find((char*)from);
+
+ /* DSR:CL_BUG_LEAK:
+ ** If a file named $to already existed, its old value was leaked.
+ ** My inclination would be to prevent this implicit deletion with an
+ ** exception, but it happens routinely in CLucene's internals (e.g., during
+ ** IndexWriter.addIndexes with the file named 'segments'). */
+ if (filesMap->exists((char*)to)) {
+ auto itr1 = filesMap->find((char*)to);
+ SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+ sizeInBytes -= itr1->second->sizeInBytes;
+ filesMap->removeitr(itr1);
+ }
+ if (itr == filesMap->end()) {
+ char tmp[1024];
+ snprintf(tmp, 1024, "cannot rename %s, file does not exist", from);
+ _CLTHROWT(CL_ERR_IO, tmp);
+ }
+ DCHECK(itr != filesMap->end());
+ auto* file = itr->second;
+ filesMap->removeitr(itr, false, true);
+ filesMap->put(strdup(to), file);
+}
+
+lucene::store::IndexOutput* DorisRAMCompoundDirectory::createOutput(const
char* name) {
+ /* Check the $filesMap VoidMap to see if there was a previous file named
+ ** $name. If so, delete the old RAMFile object, but reuse the existing
+ ** char buffer ($n) that holds the filename. If not, duplicate the
+ ** supplied filename buffer ($name) and pass ownership of that memory ($n)
+ ** to $files. */
+ std::lock_guard<std::mutex> wlock(_this_lock);
+
+ // get the actual pointer to the output name
+ char* n = nullptr;
+ auto itr = filesMap->find(const_cast<char*>(name));
+ if (itr != filesMap->end()) {
+ n = itr->first;
+ lucene::store::RAMFile* rf = itr->second;
+ SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+ sizeInBytes -= rf->sizeInBytes;
+ _CLDELETE(rf);
+ } else {
+ n = STRDUP_AtoA(name);
+ }
+
+ auto* file = _CLNEW lucene::store::RAMFile();
+ (*filesMap)[n] = file;
+
+ return _CLNEW lucene::store::RAMOutputStream(file);
+}
+
+std::string DorisRAMCompoundDirectory::toString() const {
+ return std::string("DorisRAMCompoundDirectory@") + this->directory;
+}
+
+const char* DorisRAMCompoundDirectory::getClassName() {
+ return "DorisRAMCompoundDirectory";
+}
+
+const char* DorisRAMCompoundDirectory::getObjectName() const {
+ return getClassName();
+}
+
+/**
+ * DorisCompoundDirectoryFactory
+ */
+DorisCompoundDirectory* DorisCompoundDirectoryFactory::getDirectory(
+ const io::FileSystemSPtr& _fs, const char* _file, bool
use_compound_file_writer,
+ bool can_use_ram_dir, lucene::store::LockFactory* lock_factory,
+ const io::FileSystemSPtr& _cfs, const char* _cfs_file) {
+ const char* cfs_file = _cfs_file;
+ if (cfs_file == nullptr) {
+ cfs_file = _file;
+ }
+ DorisCompoundDirectory* dir = nullptr;
+ if (!_file || !*_file) {
+ _CLTHROWA(CL_ERR_IO, "Invalid directory");
+ }
+
+ const char* file = _file;
+
+ // Write by RAM directory
+ // 1. only write separated index files, which is can_use_ram_dir = true.
+ // 2. config::inverted_index_ram_dir_enable = true
+ if (config::inverted_index_ram_dir_enable && can_use_ram_dir) {
+ dir = _CLNEW DorisRAMCompoundDirectory();
+ } else {
+ bool exists = false;
+ LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory
exists IO error")
+ if (!exists) {
+ LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
+ "Get directory create directory IO error")
+ }
+ dir = _CLNEW DorisCompoundDirectory();
+ }
+ dir->init(_fs, file, use_compound_file_writer, lock_factory, _cfs,
cfs_file);
+
+ return dir;
+}
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
index b06f980f6bd..7ae0e618a45 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
@@ -21,6 +21,7 @@
#include <CLucene/store/Directory.h>
#include <CLucene/store/IndexInput.h>
#include <CLucene/store/IndexOutput.h>
+#include <CLucene/store/_RAMDirectory.h>
#include <stdint.h>
#include <string>
@@ -29,19 +30,15 @@
#include "CLucene/SharedHeader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/file_system.h"
-#include "util/lock.h"
+#include "io/io_common.h"
class CLuceneError;
-namespace lucene {
-namespace store {
+namespace lucene::store {
class LockFactory;
-} // namespace store
-} // namespace lucene
+} // namespace lucene::store
-namespace doris {
-
-namespace segment_v2 {
+namespace doris::segment_v2 {
class DorisCompoundFileWriter : LUCENE_BASE {
public:
@@ -61,26 +58,15 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public
lucene::store::Directory {
private:
int filemode;
- doris::Mutex _this_lock;
-
protected:
- DorisCompoundDirectory();
- virtual void init(const io::FileSystemSPtr& fs, const char* path,
- lucene::store::LockFactory* lock_factory = nullptr,
- const io::FileSystemSPtr& compound_fs = nullptr,
- const char* cfs_path = nullptr);
- void priv_getFN(char* buffer, const char* name) const;
-
-private:
+ mutable std::mutex _this_lock;
io::FileSystemSPtr fs;
io::FileSystemSPtr compound_fs;
std::string directory;
std::string cfs_directory;
- void create();
- static bool disableLocks;
bool useCompoundFileWriter {false};
-protected:
+ void priv_getFN(char* buffer, const char* name) const;
/// Removes an existing file in the directory.
bool doDeleteFile(const char* name) override;
@@ -98,16 +84,6 @@ public:
bool list(std::vector<std::string>* names) const override;
bool fileExists(const char* name) const override;
const char* getCfsDirName() const;
- static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs,
const char* file,
- lucene::store::LockFactory*
lock_factory = nullptr,
- const io::FileSystemSPtr&
cfs_fs = nullptr,
- const char* cfs_file =
nullptr);
-
- static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs,
const char* file,
- bool use_compound_file_writer,
- const io::FileSystemSPtr&
cfs_fs = nullptr,
- const char* cfs_file =
nullptr);
-
int64_t fileModified(const char* name) const override;
int64_t fileLength(const char* name) const override;
bool openInput(const char* name, lucene::store::IndexInput*& ret,
CLuceneError& err,
@@ -119,7 +95,77 @@ public:
std::string toString() const override;
static const char* getClassName();
const char* getObjectName() const override;
- bool deleteDirectory();
+ virtual bool deleteDirectory();
+
+ DorisCompoundDirectory();
+
+ virtual void init(const io::FileSystemSPtr& fs, const char* path, bool
use_compound_file_writer,
+ lucene::store::LockFactory* lock_factory = nullptr,
+ const io::FileSystemSPtr& compound_fs = nullptr,
+ const char* cfs_path = nullptr);
+};
+
+class CLUCENE_EXPORT DorisRAMCompoundDirectory : public DorisCompoundDirectory
{
+protected:
+ using FileMap =
+ lucene::util::CLHashMap<char*, lucene::store::RAMFile*,
lucene::util::Compare::Char,
+ lucene::util::Equals::Char,
lucene::util::Deletor::acArray,
+
lucene::util::Deletor::Object<lucene::store::RAMFile>>;
+
+ // unlike the java Hashtable, FileMap is not synchronized, and all access
must be protected by a lock
+ FileMap* filesMap;
+ void init(const io::FileSystemSPtr& fs, const char* path, bool
use_compound_file_writer,
+ lucene::store::LockFactory* lock_factory = nullptr,
+ const io::FileSystemSPtr& compound_fs = nullptr,
+ const char* cfs_path = nullptr) override;
+
+public:
+ int64_t sizeInBytes;
+
+ /// Returns a null terminated array of strings, one for each file in the
directory.
+ bool list(std::vector<std::string>* names) const override;
+
+ /** Constructs an empty {@link Directory}. */
+ DorisRAMCompoundDirectory();
+
+ ///Destructor - only call this if you are sure the directory
+ ///is not being used anymore. Otherwise use the ref-counting
+ ///facilities of dir->close
+ ~DorisRAMCompoundDirectory() override;
+
+ bool doDeleteFile(const char* name) override;
+
+ bool deleteDirectory() override;
+
+ /// Returns true iff the named file exists in this directory.
+ bool fileExists(const char* name) const override;
+
+ /// Returns the time the named file was last modified.
+ int64_t fileModified(const char* name) const override;
+
+ /// Returns the length in bytes of a file in the directory.
+ int64_t fileLength(const char* name) const override;
+
+ /// Removes an existing file in the directory.
+ void renameFile(const char* from, const char* to) override;
+
+ /** Set the modified time of an existing file to now. */
+ void touchFile(const char* name) override;
+
+ /// Creates a new, empty file in the directory with the given name.
+ /// Returns a stream writing this file.
+ lucene::store::IndexOutput* createOutput(const char* name) override;
+
+ /// Returns a stream reading an existing file.
+ bool openInput(const char* name, lucene::store::IndexInput*& ret,
CLuceneError& error,
+ int32_t bufferSize = -1) override;
+
+ void close() override;
+
+ std::string toString() const override;
+
+ static const char* getClassName();
+ const char* getObjectName() const override;
};
class DorisCompoundDirectory::FSIndexInput : public
lucene::store::BufferedIndexInput {
@@ -128,13 +174,13 @@ class DorisCompoundDirectory::FSIndexInput : public
lucene::store::BufferedIndex
io::FileReaderSPtr _reader;
uint64_t _length;
int64_t _fpos;
- doris::Mutex* _shared_lock;
+ std::mutex* _shared_lock;
char path[4096];
SharedHandle(const char* path);
~SharedHandle() override;
};
- SharedHandle* _handle;
+ SharedHandle* _handle = nullptr;
int64_t _pos;
FSIndexInput(SharedHandle* handle, int32_t buffer_size) :
BufferedIndexInput(buffer_size) {
@@ -158,7 +204,7 @@ public:
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "FSIndexInput"; }
- doris::Mutex _this_lock;
+ std::mutex _this_lock;
protected:
// Random-access methods
@@ -167,5 +213,16 @@ protected:
void readInternal(uint8_t* b, const int32_t len) override;
};
-} // namespace segment_v2
-} // namespace doris
+/**
+ * Factory function to create DorisCompoundDirectory
+ */
+class DorisCompoundDirectoryFactory {
+public:
+ static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs,
const char* file,
+ bool use_compound_file_writer
= false,
+ bool can_use_ram_dir = false,
+ lucene::store::LockFactory*
lock_factory = nullptr,
+ const io::FileSystemSPtr&
cfs_fs = nullptr,
+ const char* cfs_file =
nullptr);
+};
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 6e1ebb53a9d..7934c63286d 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -197,7 +197,7 @@ Status
InvertedIndexReader::read_null_bitmap(InvertedIndexQueryCacheHandle* cach
if (!dir) {
dir = new DorisCompoundReader(
- DorisCompoundDirectory::getDirectory(_fs,
index_dir.c_str()),
+ DorisCompoundDirectoryFactory::getDirectory(_fs,
index_dir.c_str()),
index_file_name.c_str(),
config::inverted_index_read_buffer_size);
owned_dir = true;
}
@@ -734,8 +734,8 @@ BkdIndexReader::BkdIndexReader(io::FileSystemSPtr fs, const
std::string& path,
}
_file_full_path = index_file;
_compoundReader = std::make_unique<DorisCompoundReader>(
- DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
index_file_name.c_str(),
- config::inverted_index_read_buffer_size);
+ DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()),
+ index_file_name.c_str(), config::inverted_index_read_buffer_size);
}
Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState*
runtime_state,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 744710d9082..d82f6f6b556 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -64,7 +64,6 @@ const int32_t MERGE_FACTOR = 100000000;
const int32_t MAX_LEAF_COUNT = 1024;
const float MAXMBSortInHeap = 512.0 * 8;
const int DIMS = 1;
-const std::string empty_value;
template <FieldType field_type>
class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
@@ -162,7 +161,10 @@ public:
}
_doc = std::make_unique<lucene::document::Document>();
- _dir.reset(DorisCompoundDirectory::getDirectory(_fs,
index_path.c_str(), true));
+ bool use_compound_file_writer = true;
+ bool can_use_ram_dir = true;
+ _dir.reset(DorisCompoundDirectoryFactory::getDirectory(
+ _fs, index_path.c_str(), use_compound_file_writer,
can_use_ram_dir));
if (_parser_type == InvertedIndexParserType::PARSER_STANDARD ||
_parser_type == InvertedIndexParserType::PARSER_UNICODE) {
@@ -296,7 +298,7 @@ public:
get_parser_ignore_above_value_from_properties(_index_meta->properties());
auto ignore_above = std::stoi(ignore_above_value);
for (int i = 0; i < count; ++i) {
- // only ignore_above UNTOKENIZED strings
+ // only ignore_above UNTOKENIZED strings and empty strings not
tokenized
if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
v->get_size() > ignore_above) ||
(_parser_type != InvertedIndexParserType::PARSER_NONE &&
v->empty())) {
@@ -346,7 +348,7 @@ public:
}
auto value = join(strings, " ");
- // only ignore_above UNTOKENIZED strings
+ // only ignore_above UNTOKENIZED strings and empty strings not
tokenized
if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
value.length() > ignore_above) ||
(_parser_type != InvertedIndexParserType::PARSER_NONE &&
value.empty())) {
@@ -493,7 +495,10 @@ public:
if constexpr (field_is_numeric_type(field_type)) {
auto index_path =
InvertedIndexDescriptor::get_temporary_index_path(
_directory + "/" + _segment_file_name,
_index_meta->index_id());
- dir = DorisCompoundDirectory::getDirectory(_fs,
index_path.c_str(), true);
+ bool use_compound_file_writer = true;
+ bool can_use_ram_dir = true;
+ dir = DorisCompoundDirectoryFactory::getDirectory(
+ _fs, index_path.c_str(), use_compound_file_writer,
can_use_ram_dir);
write_null_bitmap(null_bitmap_out, dir);
_bkd_writer->max_doc_ = _rid;
_bkd_writer->docs_seen_ = _row_ids_seen_for_bkd;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]