This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch branch-2.0-var
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0-var by this push:
new 64b56c29083 [optimize](invert index) "optimize inverted index metadata
into file cache (#28418)
64b56c29083 is described below
commit 64b56c290830712ccdae4b10e55e50f68cd28788
Author: zzzxl <[email protected]>
AuthorDate: Fri Dec 15 20:04:32 2023 +0800
[optimize](invert index) "optimize inverted index metadata into file cache
(#28418)
---
be/src/clucene | 2 +-
be/src/common/config.cpp | 1 +
be/src/common/config.h | 2 +
be/src/olap/rowset/beta_rowset.cpp | 38 +++++++++++++++-
be/src/olap/rowset/beta_rowset.h | 3 +-
be/src/olap/rowset/rowset.h | 3 +-
.../rowset/segment_v2/inverted_index_cache.cpp | 19 +++++---
.../inverted_index_compound_directory.cpp | 50 ++++++++++++++--------
.../segment_v2/inverted_index_compound_directory.h | 16 ++++++-
.../segment_v2/inverted_index_compound_reader.cpp | 8 +++-
.../segment_v2/inverted_index_compound_reader.h | 4 +-
be/src/olap/tablet.cpp | 2 +-
build.sh | 2 +-
13 files changed, 119 insertions(+), 31 deletions(-)
diff --git a/be/src/clucene b/be/src/clucene
index 6f8a21ffe15..d20200ed36d 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 6f8a21ffe15bd78a1cd3e685067ee5c9ed071827
+Subproject commit d20200ed36dda4087489d49457a4da0c44ad4d09
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c44249da760..7b38fcdf5d2 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -991,6 +991,7 @@ DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec,
"600");
DEFINE_String(inverted_index_searcher_cache_limit, "10%");
// set `true` to enable insert searcher into cache when write inverted index
data
DEFINE_Bool(enable_write_index_searcher_cache, "true");
+DEFINE_Bool(enable_inverted_index_cache_on_cooldown, "false");
DEFINE_Bool(enable_inverted_index_cache_check_timestamp, "true");
DEFINE_Int32(inverted_index_fd_number_limit_percent, "40"); // 40%
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 52a2eeee2e3..3fb4adf65e2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1027,6 +1027,8 @@ DECLARE_mInt32(inverted_index_cache_stale_sweep_time_sec);
DECLARE_String(inverted_index_searcher_cache_limit);
// set `true` to enable insert searcher into cache when write inverted index
data
DECLARE_Bool(enable_write_index_searcher_cache);
+// Pre-load the inverted index reader into the file cache during cooldown.
+DECLARE_Bool(enable_inverted_index_cache_on_cooldown);
DECLARE_Bool(enable_inverted_index_cache_check_timestamp);
DECLARE_Int32(inverted_index_fd_number_limit_percent); // 50%
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 176ff0d21b5..69e5ac888ce 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -346,7 +346,8 @@ Status BetaRowset::copy_files_to(const std::string& dir,
const RowsetId& new_row
return Status::OK();
}
-Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId&
new_rowset_id) {
+Status BetaRowset::upload_to(const io::RemoteFileSystemSPtr& dest_fs,
+ const RowsetId& new_rowset_id) {
DCHECK(is_local());
if (num_segments() < 1) {
return Status::OK();
@@ -355,6 +356,8 @@ Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs,
const RowsetId& new_
local_paths.reserve(num_segments());
std::vector<io::Path> dest_paths;
dest_paths.reserve(num_segments());
+ std::vector<io::Path> idx_remote_paths;
+ idx_remote_paths.reserve(num_segments());
for (int i = 0; i < num_segments(); ++i) {
// Note: Here we use relative path for remote.
auto remote_seg_path = remote_segment_path(_rowset_meta->tablet_id(),
new_rowset_id, i);
@@ -373,11 +376,44 @@ Status BetaRowset::upload_to(io::RemoteFileSystem*
dest_fs, const RowsetId& new_
index_meta->index_id());
dest_paths.push_back(remote_inverted_index_file);
local_paths.push_back(local_inverted_index_file);
+ idx_remote_paths.push_back(remote_inverted_index_file);
}
}
}
auto st = dest_fs->batch_upload(local_paths, dest_paths);
if (st.ok()) {
+ // Pre-write the metadata of the inverted index into the file cache
+ if (config::enable_inverted_index_cache_on_cooldown) {
+ if (dest_fs->type() == io::FileSystemType::S3 &&
config::enable_file_cache) {
+ auto start = std::chrono::steady_clock::now();
+ for (auto& path : idx_remote_paths) {
+ std::shared_ptr<io::FileReader> file_reader = nullptr;
+ if (!dest_fs->open_file(path, &file_reader).ok()) {
+ continue;
+ }
+
+ auto& url = file_reader->path().native();
+ size_t pos = url.rfind('/');
+ if (pos != std::string::npos) {
+ auto idx_path = url.substr(0, pos);
+ auto idx_name = url.substr(pos + 1);
+
+ try {
+
InvertedIndexSearcherCache::build_index_searcher(dest_fs, idx_path,
+
idx_name);
+ } catch (CLuceneError& err) {
+ LOG(WARNING) << "opening index reader clucene err:
" << err.what();
+ } catch (...) {
+ LOG(WARNING) << "opening index reader other err";
+ }
+ }
+ }
+ auto duration =
+
std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
+ LOG(INFO) << "cooldown upload open invert index duration: " <<
duration.count();
+ }
+ }
+
DorisMetrics::instance()->upload_rowset_count->increment(1);
DorisMetrics::instance()->upload_total_byte->increment(data_disk_size());
} else {
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 064b8fcb6ef..6efb0066ff9 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -76,7 +76,8 @@ public:
Status copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) override;
- Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId&
new_rowset_id) override;
+ Status upload_to(const io::RemoteFileSystemSPtr& dest_fs,
+ const RowsetId& new_rowset_id) override;
// only applicable to alpha rowset, no op here
Status remove_old_files(std::vector<std::string>* files_to_remove)
override {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 7ac31e608e4..407731d8dd2 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -211,7 +211,8 @@ public:
// copy all files to `dir`
virtual Status copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) = 0;
- virtual Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId&
new_rowset_id) {
+ virtual Status upload_to(const io::RemoteFileSystemSPtr& dest_fs,
+ const RowsetId& new_rowset_id) {
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index f3c68984ebb..dc4ad06b360 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -43,12 +43,21 @@ InvertedIndexSearcherCache*
InvertedIndexSearcherCache::_s_instance = nullptr;
IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const
io::FileSystemSPtr& fs,
const
std::string& index_dir,
const
std::string& file_name) {
- DorisCompoundReader* directory =
- new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs,
index_dir.c_str()),
- file_name.c_str(),
config::inverted_index_read_buffer_size);
+ bool open_idx_file_cache = true;
+ DorisCompoundReader* directory = new DorisCompoundReader(
+ DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()),
file_name.c_str(),
+ config::inverted_index_read_buffer_size, open_idx_file_cache);
+
auto closeDirectory = true;
- auto index_searcher =
- std::make_shared<lucene::search::IndexSearcher>(directory,
closeDirectory);
+ auto reader = lucene::index::IndexReader::open(
+ directory, config::inverted_index_read_buffer_size,
closeDirectory);
+
+ bool close_reader = true;
+ auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(reader, close_reader);
+
+ // close read data into idx file cache
+ directory->getDorisIndexInput()->setIdxFileCache(false);
+
// NOTE: need to cl_refcount-- here, so that directory will be deleted when
// index_searcher is destroyed
_CLDECDELETE(directory)
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index de57eed6f85..4b9ff7d8dae 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -97,6 +97,22 @@ CL_NS(store)::Directory*
DorisCompoundFileWriter::getDirectory() {
return directory;
}
+void DorisCompoundFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
+ auto file_priority = [](const std::string& filename) {
+ if (filename.find("segments") != std::string::npos) return 1;
+ if (filename.find("fnm") != std::string::npos) return 2;
+ if (filename.find("tii") != std::string::npos) return 3;
+ return 4; // Other files
+ };
+
+ std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a,
const FileInfo& b) {
+ int32_t priority_a = file_priority(a.filename);
+ int32_t priority_b = file_priority(b.filename);
+ if (priority_a != priority_b) return priority_a < priority_b;
+ return a.filesize < b.filesize;
+ });
+}
+
void DorisCompoundFileWriter::writeCompoundFile() {
// list files in current dir
std::vector<std::string> files;
@@ -106,15 +122,15 @@ void DorisCompoundFileWriter::writeCompoundFile() {
if (it != files.end()) {
files.erase(it);
}
- // sort file list by file length
- std::vector<std::pair<std::string, int64_t>> sorted_files;
+
+ std::vector<FileInfo> sorted_files;
for (auto file : files) {
- sorted_files.push_back(std::make_pair(
- file,
((DorisCompoundDirectory*)directory)->fileLength(file.c_str())));
+ FileInfo file_info;
+ file_info.filename = file;
+ file_info.filesize =
((DorisCompoundDirectory*)directory)->fileLength(file.c_str());
+ sorted_files.emplace_back(std::move(file_info));
}
- std::sort(sorted_files.begin(), sorted_files.end(),
- [](const std::pair<std::string, int64_t>& a,
- const std::pair<std::string, int64_t>& b) { return (a.second
< b.second); });
+ sort_files(sorted_files);
int32_t file_count = sorted_files.size();
@@ -138,12 +154,12 @@ void DorisCompoundFileWriter::writeCompoundFile() {
const int64_t buffer_length = 16384;
uint8_t ram_buffer[buffer_length];
for (auto file : sorted_files) {
- ram_output->writeString(file.first); // file name
- ram_output->writeLong(0); // data offset
- ram_output->writeLong(file.second); // file length
- header_file_length += file.second;
+ ram_output->writeString(file.filename); // file name
+ ram_output->writeLong(0); // data offset
+ ram_output->writeLong(file.filesize); // file length
+ header_file_length += file.filesize;
if (header_file_length <= MAX_HEADER_DATA_SIZE) {
- copyFile(file.first.c_str(), ram_output.get(), ram_buffer,
buffer_length);
+ copyFile(file.filename.c_str(), ram_output.get(), ram_buffer,
buffer_length);
header_file_count++;
}
}
@@ -167,7 +183,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
uint8_t header_buffer[buffer_length];
for (int i = 0; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
- output->writeString(file.first); // FileName
+ output->writeString(file.filename); // FileName
// DataOffset
if (i < header_file_count) {
// file data write in header, so we set its offset to -1.
@@ -175,19 +191,19 @@ void DorisCompoundFileWriter::writeCompoundFile() {
} else {
output->writeLong(data_offset);
}
- output->writeLong(file.second); // FileLength
+ output->writeLong(file.filesize); // FileLength
if (i < header_file_count) {
// append data
- copyFile(file.first.c_str(), output.get(), header_buffer,
buffer_length);
+ copyFile(file.filename.c_str(), output.get(), header_buffer,
buffer_length);
} else {
- data_offset += file.second;
+ data_offset += file.filesize;
}
}
// write rest files' data
uint8_t data_buffer[buffer_length];
for (int i = header_file_count; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
- copyFile(file.first.c_str(), output.get(), data_buffer, buffer_length);
+ copyFile(file.filename.c_str(), output.get(), data_buffer,
buffer_length);
}
out_dir->close();
// NOTE: need to decrease ref count, but not to delete here,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
index b89c6383539..78bb64e0170 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
@@ -55,7 +55,15 @@ public:
int64_t bufferLength);
private:
- CL_NS(store)::Directory* directory;
+ class FileInfo {
+ public:
+ std::string filename;
+ int32_t filesize;
+ };
+
+ void sort_files(std::vector<FileInfo>& file_infos);
+
+ CL_NS(store)::Directory* directory = nullptr;
};
class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory {
@@ -143,6 +151,7 @@ class DorisCompoundDirectory::FSIndexInput : public
lucene::store::BufferedIndex
this->_pos = 0;
this->_handle = handle;
this->_io_ctx.reader_type = ReaderType::READER_QUERY;
+ this->_io_ctx.read_segment_index = false;
}
protected:
@@ -161,6 +170,11 @@ public:
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "FSIndexInput"; }
+ // Sets the flag to enable or disable the index file cache.
+ // @param index If true, the data read afterwards will be stored in the
index file cache.
+ // @param index If false, the data read afterwards will not be stored in
the index file cache.
+ void setIdxFileCache(bool index) override { _io_ctx.read_segment_index =
index; }
+
doris::Mutex _this_lock;
protected:
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index fce1009be51..697758c1086 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -124,7 +124,7 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) :
BufferedIndexInput(clone
void CSIndexInput::close() {}
DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const
char* name,
- int32_t read_buffer_size)
+ int32_t read_buffer_size, bool
open_idx_file_cache)
: readBufferSize(read_buffer_size),
dir(d),
ram_dir(new lucene::store::RAMDirectory()),
@@ -140,6 +140,8 @@
DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
.c_str());
}
stream = dir->openInput(name, readBufferSize);
+ // open read data into idx file cache
+ stream->setIdxFileCache(open_idx_file_cache);
int32_t count = stream->readVInt();
ReaderFileEntry* entry = nullptr;
@@ -327,5 +329,9 @@ std::string DorisCompoundReader::toString() const {
std::string(file_name);
}
+CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() {
+ return stream;
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 75bf1ab633e..995a8a5c463 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -73,7 +73,8 @@ protected:
public:
DorisCompoundReader(lucene::store::Directory* dir, const char* name,
- int32_t _readBufferSize =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
+ int32_t _readBufferSize =
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
+ bool open_idx_file_cache = false);
~DorisCompoundReader() override;
void copyFile(const char* file, int64_t file_length, uint8_t* buffer,
int64_t buffer_length);
bool list(std::vector<std::string>* names) const override;
@@ -93,6 +94,7 @@ public:
std::string getFileName() { return file_name; }
static const char* getClassName();
const char* getObjectName() const override;
+ CL_NS(store)::IndexInput* getDorisIndexInput();
};
} // namespace segment_v2
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 2cbd5463397..67995c2f54a 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2151,7 +2151,7 @@ Status Tablet::_cooldown_data() {
}
}};
auto start = std::chrono::steady_clock::now();
- if (st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); !st.ok()) {
+ if (st = old_rowset->upload_to(dest_fs, new_rowset_id); !st.ok()) {
return st;
}
diff --git a/build.sh b/build.sh
index a7e31fa9b3b..15f22b03a72 100755
--- a/build.sh
+++ b/build.sh
@@ -302,7 +302,7 @@ update_submodule() {
}
update_submodule "be/src/apache-orc" "apache-orc"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz"
-update_submodule "be/src/clucene" "clucene"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz"
+update_submodule "be/src/clucene" "clucene"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene-2.0.tar.gz"
if [[ "${CLEAN}" -eq 1 && "${BUILD_BE}" -eq 0 && "${BUILD_FE}" -eq 0 &&
"${BUILD_SPARK_DPP}" -eq 0 ]]; then
clean_gensrc
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]