This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch branch-2.0-var
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0-var by this push:
     new 64b56c29083 [optimize](invert index) "optimize inverted index metadata 
into file cache (#28418)
64b56c29083 is described below

commit 64b56c290830712ccdae4b10e55e50f68cd28788
Author: zzzxl <[email protected]>
AuthorDate: Fri Dec 15 20:04:32 2023 +0800

    [optimize](invert index) "optimize inverted index metadata into file cache 
(#28418)
---
 be/src/clucene                                     |  2 +-
 be/src/common/config.cpp                           |  1 +
 be/src/common/config.h                             |  2 +
 be/src/olap/rowset/beta_rowset.cpp                 | 38 +++++++++++++++-
 be/src/olap/rowset/beta_rowset.h                   |  3 +-
 be/src/olap/rowset/rowset.h                        |  3 +-
 .../rowset/segment_v2/inverted_index_cache.cpp     | 19 +++++---
 .../inverted_index_compound_directory.cpp          | 50 ++++++++++++++--------
 .../segment_v2/inverted_index_compound_directory.h | 16 ++++++-
 .../segment_v2/inverted_index_compound_reader.cpp  |  8 +++-
 .../segment_v2/inverted_index_compound_reader.h    |  4 +-
 be/src/olap/tablet.cpp                             |  2 +-
 build.sh                                           |  2 +-
 13 files changed, 119 insertions(+), 31 deletions(-)

diff --git a/be/src/clucene b/be/src/clucene
index 6f8a21ffe15..d20200ed36d 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 6f8a21ffe15bd78a1cd3e685067ee5c9ed071827
+Subproject commit d20200ed36dda4087489d49457a4da0c44ad4d09
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c44249da760..7b38fcdf5d2 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -991,6 +991,7 @@ DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, 
"600");
 DEFINE_String(inverted_index_searcher_cache_limit, "10%");
 // set `true` to enable insert searcher into cache when write inverted index 
data
 DEFINE_Bool(enable_write_index_searcher_cache, "true");
+DEFINE_Bool(enable_inverted_index_cache_on_cooldown, "false");
 DEFINE_Bool(enable_inverted_index_cache_check_timestamp, "true");
 DEFINE_Int32(inverted_index_fd_number_limit_percent, "40"); // 40%
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 52a2eeee2e3..3fb4adf65e2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1027,6 +1027,8 @@ DECLARE_mInt32(inverted_index_cache_stale_sweep_time_sec);
 DECLARE_String(inverted_index_searcher_cache_limit);
 // set `true` to enable insert searcher into cache when write inverted index 
data
 DECLARE_Bool(enable_write_index_searcher_cache);
+// Pre-load the inverted index reader into the file cache during cooldown.
+DECLARE_Bool(enable_inverted_index_cache_on_cooldown);
 DECLARE_Bool(enable_inverted_index_cache_check_timestamp);
 DECLARE_Int32(inverted_index_fd_number_limit_percent); // 50%
 
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 176ff0d21b5..69e5ac888ce 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -346,7 +346,8 @@ Status BetaRowset::copy_files_to(const std::string& dir, 
const RowsetId& new_row
     return Status::OK();
 }
 
-Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& 
new_rowset_id) {
+Status BetaRowset::upload_to(const io::RemoteFileSystemSPtr& dest_fs,
+                             const RowsetId& new_rowset_id) {
     DCHECK(is_local());
     if (num_segments() < 1) {
         return Status::OK();
@@ -355,6 +356,8 @@ Status BetaRowset::upload_to(io::RemoteFileSystem* dest_fs, 
const RowsetId& new_
     local_paths.reserve(num_segments());
     std::vector<io::Path> dest_paths;
     dest_paths.reserve(num_segments());
+    std::vector<io::Path> idx_remote_paths;
+    idx_remote_paths.reserve(num_segments());
     for (int i = 0; i < num_segments(); ++i) {
         // Note: Here we use relative path for remote.
         auto remote_seg_path = remote_segment_path(_rowset_meta->tablet_id(), 
new_rowset_id, i);
@@ -373,11 +376,44 @@ Status BetaRowset::upload_to(io::RemoteFileSystem* 
dest_fs, const RowsetId& new_
                                                                      
index_meta->index_id());
                 dest_paths.push_back(remote_inverted_index_file);
                 local_paths.push_back(local_inverted_index_file);
+                idx_remote_paths.push_back(remote_inverted_index_file);
             }
         }
     }
     auto st = dest_fs->batch_upload(local_paths, dest_paths);
     if (st.ok()) {
+        // Pre-write the metadata of the inverted index into the file cache
+        if (config::enable_inverted_index_cache_on_cooldown) {
+            if (dest_fs->type() == io::FileSystemType::S3 && 
config::enable_file_cache) {
+                auto start = std::chrono::steady_clock::now();
+                for (auto& path : idx_remote_paths) {
+                    std::shared_ptr<io::FileReader> file_reader = nullptr;
+                    if (!dest_fs->open_file(path, &file_reader).ok()) {
+                        continue;
+                    }
+
+                    auto& url = file_reader->path().native();
+                    size_t pos = url.rfind('/');
+                    if (pos != std::string::npos) {
+                        auto idx_path = url.substr(0, pos);
+                        auto idx_name = url.substr(pos + 1);
+
+                        try {
+                            
InvertedIndexSearcherCache::build_index_searcher(dest_fs, idx_path,
+                                                                             
idx_name);
+                        } catch (CLuceneError& err) {
+                            LOG(WARNING) << "opening index reader clucene err: 
" << err.what();
+                        } catch (...) {
+                            LOG(WARNING) << "opening index reader other err";
+                        }
+                    }
+                }
+                auto duration =
+                        
std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
+                LOG(INFO) << "cooldown upload open invert index duration: " << 
duration.count();
+            }
+        }
+
         DorisMetrics::instance()->upload_rowset_count->increment(1);
         
DorisMetrics::instance()->upload_total_byte->increment(data_disk_size());
     } else {
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 064b8fcb6ef..6efb0066ff9 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -76,7 +76,8 @@ public:
 
     Status copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) override;
 
-    Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& 
new_rowset_id) override;
+    Status upload_to(const io::RemoteFileSystemSPtr& dest_fs,
+                     const RowsetId& new_rowset_id) override;
 
     // only applicable to alpha rowset, no op here
     Status remove_old_files(std::vector<std::string>* files_to_remove) 
override {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 7ac31e608e4..407731d8dd2 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -211,7 +211,8 @@ public:
     // copy all files to `dir`
     virtual Status copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) = 0;
 
-    virtual Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& 
new_rowset_id) {
+    virtual Status upload_to(const io::RemoteFileSystemSPtr& dest_fs,
+                             const RowsetId& new_rowset_id) {
         return Status::OK();
     }
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index f3c68984ebb..dc4ad06b360 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -43,12 +43,21 @@ InvertedIndexSearcherCache* 
InvertedIndexSearcherCache::_s_instance = nullptr;
 IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const 
io::FileSystemSPtr& fs,
                                                                   const 
std::string& index_dir,
                                                                   const 
std::string& file_name) {
-    DorisCompoundReader* directory =
-            new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, 
index_dir.c_str()),
-                                    file_name.c_str(), 
config::inverted_index_read_buffer_size);
+    bool open_idx_file_cache = true;
+    DorisCompoundReader* directory = new DorisCompoundReader(
+            DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), 
file_name.c_str(),
+            config::inverted_index_read_buffer_size, open_idx_file_cache);
+
     auto closeDirectory = true;
-    auto index_searcher =
-            std::make_shared<lucene::search::IndexSearcher>(directory, 
closeDirectory);
+    auto reader = lucene::index::IndexReader::open(
+            directory, config::inverted_index_read_buffer_size, 
closeDirectory);
+
+    bool close_reader = true;
+    auto index_searcher = 
std::make_shared<lucene::search::IndexSearcher>(reader, close_reader);
+
+    // close read data into idx file cache
+    directory->getDorisIndexInput()->setIdxFileCache(false);
+
     // NOTE: need to cl_refcount-- here, so that directory will be deleted when
     // index_searcher is destroyed
     _CLDECDELETE(directory)
diff --git 
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index de57eed6f85..4b9ff7d8dae 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -97,6 +97,22 @@ CL_NS(store)::Directory* 
DorisCompoundFileWriter::getDirectory() {
     return directory;
 }
 
+void DorisCompoundFileWriter::sort_files(std::vector<FileInfo>& file_infos) {
+    auto file_priority = [](const std::string& filename) {
+        if (filename.find("segments") != std::string::npos) return 1;
+        if (filename.find("fnm") != std::string::npos) return 2;
+        if (filename.find("tii") != std::string::npos) return 3;
+        return 4; // Other files
+    };
+
+    std::sort(file_infos.begin(), file_infos.end(), [&](const FileInfo& a, 
const FileInfo& b) {
+        int32_t priority_a = file_priority(a.filename);
+        int32_t priority_b = file_priority(b.filename);
+        if (priority_a != priority_b) return priority_a < priority_b;
+        return a.filesize < b.filesize;
+    });
+}
+
 void DorisCompoundFileWriter::writeCompoundFile() {
     // list files in current dir
     std::vector<std::string> files;
@@ -106,15 +122,15 @@ void DorisCompoundFileWriter::writeCompoundFile() {
     if (it != files.end()) {
         files.erase(it);
     }
-    // sort file list by file length
-    std::vector<std::pair<std::string, int64_t>> sorted_files;
+
+    std::vector<FileInfo> sorted_files;
     for (auto file : files) {
-        sorted_files.push_back(std::make_pair(
-                file, 
((DorisCompoundDirectory*)directory)->fileLength(file.c_str())));
+        FileInfo file_info;
+        file_info.filename = file;
+        file_info.filesize = 
((DorisCompoundDirectory*)directory)->fileLength(file.c_str());
+        sorted_files.emplace_back(std::move(file_info));
     }
-    std::sort(sorted_files.begin(), sorted_files.end(),
-              [](const std::pair<std::string, int64_t>& a,
-                 const std::pair<std::string, int64_t>& b) { return (a.second 
< b.second); });
+    sort_files(sorted_files);
 
     int32_t file_count = sorted_files.size();
 
@@ -138,12 +154,12 @@ void DorisCompoundFileWriter::writeCompoundFile() {
     const int64_t buffer_length = 16384;
     uint8_t ram_buffer[buffer_length];
     for (auto file : sorted_files) {
-        ram_output->writeString(file.first); // file name
-        ram_output->writeLong(0);            // data offset
-        ram_output->writeLong(file.second);  // file length
-        header_file_length += file.second;
+        ram_output->writeString(file.filename); // file name
+        ram_output->writeLong(0);               // data offset
+        ram_output->writeLong(file.filesize);   // file length
+        header_file_length += file.filesize;
         if (header_file_length <= MAX_HEADER_DATA_SIZE) {
-            copyFile(file.first.c_str(), ram_output.get(), ram_buffer, 
buffer_length);
+            copyFile(file.filename.c_str(), ram_output.get(), ram_buffer, 
buffer_length);
             header_file_count++;
         }
     }
@@ -167,7 +183,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
     uint8_t header_buffer[buffer_length];
     for (int i = 0; i < sorted_files.size(); ++i) {
         auto file = sorted_files[i];
-        output->writeString(file.first); // FileName
+        output->writeString(file.filename); // FileName
         // DataOffset
         if (i < header_file_count) {
             // file data write in header, so we set its offset to -1.
@@ -175,19 +191,19 @@ void DorisCompoundFileWriter::writeCompoundFile() {
         } else {
             output->writeLong(data_offset);
         }
-        output->writeLong(file.second); // FileLength
+        output->writeLong(file.filesize); // FileLength
         if (i < header_file_count) {
             // append data
-            copyFile(file.first.c_str(), output.get(), header_buffer, 
buffer_length);
+            copyFile(file.filename.c_str(), output.get(), header_buffer, 
buffer_length);
         } else {
-            data_offset += file.second;
+            data_offset += file.filesize;
         }
     }
     // write rest files' data
     uint8_t data_buffer[buffer_length];
     for (int i = header_file_count; i < sorted_files.size(); ++i) {
         auto file = sorted_files[i];
-        copyFile(file.first.c_str(), output.get(), data_buffer, buffer_length);
+        copyFile(file.filename.c_str(), output.get(), data_buffer, 
buffer_length);
     }
     out_dir->close();
     // NOTE: need to decrease ref count, but not to delete here,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
index b89c6383539..78bb64e0170 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
@@ -55,7 +55,15 @@ public:
                   int64_t bufferLength);
 
 private:
-    CL_NS(store)::Directory* directory;
+    class FileInfo {
+    public:
+        std::string filename;
+        int32_t filesize;
+    };
+
+    void sort_files(std::vector<FileInfo>& file_infos);
+
+    CL_NS(store)::Directory* directory = nullptr;
 };
 
 class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory {
@@ -143,6 +151,7 @@ class DorisCompoundDirectory::FSIndexInput : public 
lucene::store::BufferedIndex
         this->_pos = 0;
         this->_handle = handle;
         this->_io_ctx.reader_type = ReaderType::READER_QUERY;
+        this->_io_ctx.read_segment_index = false;
     }
 
 protected:
@@ -161,6 +170,11 @@ public:
     const char* getObjectName() const override { return getClassName(); }
     static const char* getClassName() { return "FSIndexInput"; }
 
+    // Sets the flag to enable or disable the index file cache.
+    // @param index If true, the data read afterwards will be stored in the 
index file cache.
+    // @param index If false, the data read afterwards will not be stored in 
the index file cache.
+    void setIdxFileCache(bool index) override { _io_ctx.read_segment_index = 
index; }
+
     doris::Mutex _this_lock;
 
 protected:
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index fce1009be51..697758c1086 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -124,7 +124,7 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : 
BufferedIndexInput(clone
 void CSIndexInput::close() {}
 
 DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const 
char* name,
-                                         int32_t read_buffer_size)
+                                         int32_t read_buffer_size, bool 
open_idx_file_cache)
         : readBufferSize(read_buffer_size),
           dir(d),
           ram_dir(new lucene::store::RAMDirectory()),
@@ -140,6 +140,8 @@ 
DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
                               .c_str());
         }
         stream = dir->openInput(name, readBufferSize);
+        // open read data into idx file cache
+        stream->setIdxFileCache(open_idx_file_cache);
 
         int32_t count = stream->readVInt();
         ReaderFileEntry* entry = nullptr;
@@ -327,5 +329,9 @@ std::string DorisCompoundReader::toString() const {
            std::string(file_name);
 }
 
+CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() {
+    return stream;
+}
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 75bf1ab633e..995a8a5c463 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -73,7 +73,8 @@ protected:
 
 public:
     DorisCompoundReader(lucene::store::Directory* dir, const char* name,
-                        int32_t _readBufferSize = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
+                        int32_t _readBufferSize = 
CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
+                        bool open_idx_file_cache = false);
     ~DorisCompoundReader() override;
     void copyFile(const char* file, int64_t file_length, uint8_t* buffer, 
int64_t buffer_length);
     bool list(std::vector<std::string>* names) const override;
@@ -93,6 +94,7 @@ public:
     std::string getFileName() { return file_name; }
     static const char* getClassName();
     const char* getObjectName() const override;
+    CL_NS(store)::IndexInput* getDorisIndexInput();
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 2cbd5463397..67995c2f54a 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2151,7 +2151,7 @@ Status Tablet::_cooldown_data() {
         }
     }};
     auto start = std::chrono::steady_clock::now();
-    if (st = old_rowset->upload_to(dest_fs.get(), new_rowset_id); !st.ok()) {
+    if (st = old_rowset->upload_to(dest_fs, new_rowset_id); !st.ok()) {
         return st;
     }
 
diff --git a/build.sh b/build.sh
index a7e31fa9b3b..15f22b03a72 100755
--- a/build.sh
+++ b/build.sh
@@ -302,7 +302,7 @@ update_submodule() {
 }
 
 update_submodule "be/src/apache-orc" "apache-orc" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz";
-update_submodule "be/src/clucene" "clucene" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene.tar.gz";
+update_submodule "be/src/clucene" "clucene" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene-2.0.tar.gz";
 
 if [[ "${CLEAN}" -eq 1 && "${BUILD_BE}" -eq 0 && "${BUILD_FE}" -eq 0 && 
"${BUILD_SPARK_DPP}" -eq 0 ]]; then
     clean_gensrc


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to