This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0680c8d3149 [improve](cache) File cache async init (#39036)
0680c8d3149 is described below

commit 0680c8d31493cfc8f305e853e1d03b513cdd2e02
Author: 苏小刚 <[email protected]>
AuthorDate: Thu Aug 15 16:27:51 2024 +0800

    [improve](cache) File cache async init (#39036)
    
    ## Proposed changes
    
    Do `load_cache_info_into_memory()` asynchronously in a background thread
    in `LRUFileCache::initialize()`.
    When the cache is not ready, `LRUFileCache::get_or_set()` will return
    the FileBlock which state is SKIP_CACHE.
---
 be/src/common/config.cpp                       |  3 ++
 be/src/common/config.h                         |  2 +
 be/src/io/cache/block/block_lru_file_cache.cpp | 57 ++++++++++++++++++++------
 be/src/io/cache/block/block_lru_file_cache.h   |  5 +++
 4 files changed, 54 insertions(+), 13 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0289fadb716..fe03cc9158f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -996,6 +996,9 @@ DEFINE_Bool(enable_file_cache, "false");
 // format: 
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
 // format: 
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
 DEFINE_String(file_cache_path, "");
+// thread will sleep 10ms per scan file num to limit IO
+DEFINE_Int64(async_file_cache_init_file_num_interval, "1000");
+DEFINE_Int64(async_file_cache_init_sleep_interval_ms, "20");
 DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB
 // 4KB <= file_cache_max_file_segment_size <= 256MB
 DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) -> 
bool {
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d226623f0e5..afcdc62cb78 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1047,6 +1047,8 @@ DECLARE_Bool(enable_file_cache);
 // format: 
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
 // format: 
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85,
 "disposable_percent":10, "index_percent":5}]
 DECLARE_String(file_cache_path);
+DECLARE_Int64(async_file_cache_init_file_num_interval);
+DECLARE_Int64(async_file_cache_init_sleep_interval_ms);
 DECLARE_Int64(file_cache_min_file_segment_size);
 DECLARE_Int64(file_cache_max_file_segment_size);
 DECLARE_Bool(clear_file_cache);
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp 
b/be/src/io/cache/block/block_lru_file_cache.cpp
index 3406e827980..349c626fb7d 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -39,6 +39,7 @@
 #include <system_error>
 #include <utility>
 
+#include "common/logging.h"
 #include "common/status.h"
 #include "io/cache/block/block_file_cache.h"
 #include "io/cache/block/block_file_cache_fwd.h"
@@ -119,10 +120,33 @@ LRUFileCache::LRUFileCache(const std::string& 
cache_base_path,
 Status LRUFileCache::initialize() {
     MonotonicStopWatch watch;
     watch.start();
-    std::lock_guard cache_lock(_mutex);
     if (!_is_initialized) {
         if (fs::exists(_cache_base_path)) {
-            RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock));
+            // the cache already exists, try to load cache info asyncly
+            _lazy_open_done = false;
+            _cache_background_load_thread = std::thread([this]() {
+                MonotonicStopWatch watch;
+                watch.start();
+                std::lock_guard<std::mutex> cache_lock(_mutex);
+                Status s = load_cache_info_into_memory(cache_lock);
+                if (s.ok()) {
+                    _lazy_open_done = true;
+                } else {
+                    LOG(WARNING) << fmt::format("Failed to load cache info 
from {}: {}",
+                                                _cache_base_path, 
s.to_string());
+                }
+                int64_t cost = watch.elapsed_time() / 1000 / 1000;
+                LOG(INFO) << fmt::format(
+                        "FileCache lazy load done path={}, disposable queue 
size={} elements={}, "
+                        "index queue size={} elements={}, query queue size={} 
elements={}, init "
+                        "cost(ms)={}",
+                        _cache_base_path, 
_disposable_queue.get_total_cache_size(cache_lock),
+                        _disposable_queue.get_elements_num(cache_lock),
+                        _index_queue.get_total_cache_size(cache_lock),
+                        _index_queue.get_elements_num(cache_lock),
+                        _normal_queue.get_total_cache_size(cache_lock),
+                        _normal_queue.get_elements_num(cache_lock), cost);
+            });
         } else {
             std::error_code ec;
             fs::create_directories(_cache_base_path, ec);
@@ -136,17 +160,8 @@ Status LRUFileCache::initialize() {
     _is_initialized = true;
     _cache_background_thread = 
std::thread(&LRUFileCache::run_background_operation, this);
     int64_t cost = watch.elapsed_time() / 1000 / 1000;
-    LOG(INFO) << fmt::format(
-            "After initialize file cache path={}, disposable queue size={} 
elements={}, index "
-            "queue size={} "
-            "elements={}, query queue "
-            "size={} elements={}, init cost(ms)={}",
-            _cache_base_path, 
_disposable_queue.get_total_cache_size(cache_lock),
-            _disposable_queue.get_elements_num(cache_lock),
-            _index_queue.get_total_cache_size(cache_lock),
-            _index_queue.get_elements_num(cache_lock),
-            _normal_queue.get_total_cache_size(cache_lock),
-            _normal_queue.get_elements_num(cache_lock), cost);
+    LOG(INFO) << fmt::format("After initialize file cache path={}, init 
cost(ms)={}",
+                             _cache_base_path, cost);
     return Status::OK();
 }
 
@@ -376,6 +391,16 @@ void 
LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co
 
 FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, 
size_t size,
                                           const CacheContext& context) {
+    if (!_lazy_open_done) {
+        // Cache is not ready yet
+        VLOG_NOTICE << fmt::format(
+                "Cache is not ready yet, skip cache for key: {}, offset: {}, 
size: {}.",
+                key.to_string(), offset, size);
+        FileBlocks file_blocks = {std::make_shared<FileBlock>(
+                offset, size, key, this, FileBlock::State::SKIP_CACHE, 
context.cache_type)};
+        return FileBlocksHolder(std::move(file_blocks));
+    }
+
     FileBlock::Range range(offset, offset + size - 1);
 
     std::lock_guard cache_lock(_mutex);
@@ -827,6 +852,7 @@ Status 
LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
     std::vector<std::pair<Key, size_t>> queue_entries;
     std::vector<std::string> need_to_check_if_empty_dir;
     Status st = Status::OK();
+    size_t scan_file_num = 0;
     auto scan_file_cache = [&](fs::directory_iterator& key_it) {
         for (; key_it != fs::directory_iterator(); ++key_it) {
             key = Key(
@@ -888,6 +914,11 @@ Status 
LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
                     }
                     need_to_check_if_empty_dir.push_back(key_it->path());
                 }
+                scan_file_num += 1;
+                if (scan_file_num % 
config::async_file_cache_init_file_num_interval == 0) {
+                    std::this_thread::sleep_for(std::chrono::milliseconds(
+                            config::async_file_cache_init_sleep_interval_ms));
+                }
             }
         }
     };
diff --git a/be/src/io/cache/block/block_lru_file_cache.h 
b/be/src/io/cache/block/block_lru_file_cache.h
index c7644ecbd8a..bcf00d938a7 100644
--- a/be/src/io/cache/block/block_lru_file_cache.h
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -53,6 +53,9 @@ public:
     LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& 
cache_settings);
     ~LRUFileCache() override {
         _close = true;
+        if (_cache_background_load_thread.joinable()) {
+            _cache_background_thread.join();
+        }
         if (_cache_background_thread.joinable()) {
             _cache_background_thread.join();
         }
@@ -201,6 +204,8 @@ public:
 private:
     std::atomic_bool _close {false};
     std::thread _cache_background_thread;
+    std::atomic_bool _lazy_open_done {true};
+    std::thread _cache_background_load_thread;
     size_t _num_read_segments = 0;
     size_t _num_hit_segments = 0;
     size_t _num_removed_segments = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to