This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0680c8d3149 [improve](cache) File cache async init (#39036)
0680c8d3149 is described below
commit 0680c8d31493cfc8f305e853e1d03b513cdd2e02
Author: 苏小刚 <[email protected]>
AuthorDate: Thu Aug 15 16:27:51 2024 +0800
[improve](cache) File cache async init (#39036)
## Proposed changes
Do `load_cache_info_into_memory()` asynchronously in a background thread
in `LRUFileCache::initialize()`.
When the cache is not ready, `LRUFileCache::get_or_set()` will return
the FileBlock which state is SKIP_CACHE.
---
be/src/common/config.cpp | 3 ++
be/src/common/config.h | 2 +
be/src/io/cache/block/block_lru_file_cache.cpp | 57 ++++++++++++++++++++------
be/src/io/cache/block/block_lru_file_cache.h | 5 +++
4 files changed, 54 insertions(+), 13 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0289fadb716..fe03cc9158f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -996,6 +996,9 @@ DEFINE_Bool(enable_file_cache, "false");
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
DEFINE_String(file_cache_path, "");
+// thread will sleep 10ms per scan file num to limit IO
+DEFINE_Int64(async_file_cache_init_file_num_interval, "1000");
+DEFINE_Int64(async_file_cache_init_sleep_interval_ms, "20");
DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB
// 4KB <= file_cache_max_file_segment_size <= 256MB
DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) ->
bool {
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d226623f0e5..afcdc62cb78 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1047,6 +1047,8 @@ DECLARE_Bool(enable_file_cache);
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85,
"disposable_percent":10, "index_percent":5}]
DECLARE_String(file_cache_path);
+DECLARE_Int64(async_file_cache_init_file_num_interval);
+DECLARE_Int64(async_file_cache_init_sleep_interval_ms);
DECLARE_Int64(file_cache_min_file_segment_size);
DECLARE_Int64(file_cache_max_file_segment_size);
DECLARE_Bool(clear_file_cache);
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp
b/be/src/io/cache/block/block_lru_file_cache.cpp
index 3406e827980..349c626fb7d 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -39,6 +39,7 @@
#include <system_error>
#include <utility>
+#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block/block_file_cache.h"
#include "io/cache/block/block_file_cache_fwd.h"
@@ -119,10 +120,33 @@ LRUFileCache::LRUFileCache(const std::string&
cache_base_path,
Status LRUFileCache::initialize() {
MonotonicStopWatch watch;
watch.start();
- std::lock_guard cache_lock(_mutex);
if (!_is_initialized) {
if (fs::exists(_cache_base_path)) {
- RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock));
+ // the cache already exists, try to load cache info asyncly
+ _lazy_open_done = false;
+ _cache_background_load_thread = std::thread([this]() {
+ MonotonicStopWatch watch;
+ watch.start();
+ std::lock_guard<std::mutex> cache_lock(_mutex);
+ Status s = load_cache_info_into_memory(cache_lock);
+ if (s.ok()) {
+ _lazy_open_done = true;
+ } else {
+ LOG(WARNING) << fmt::format("Failed to load cache info
from {}: {}",
+ _cache_base_path,
s.to_string());
+ }
+ int64_t cost = watch.elapsed_time() / 1000 / 1000;
+ LOG(INFO) << fmt::format(
+ "FileCache lazy load done path={}, disposable queue
size={} elements={}, "
+ "index queue size={} elements={}, query queue size={}
elements={}, init "
+ "cost(ms)={}",
+ _cache_base_path,
_disposable_queue.get_total_cache_size(cache_lock),
+ _disposable_queue.get_elements_num(cache_lock),
+ _index_queue.get_total_cache_size(cache_lock),
+ _index_queue.get_elements_num(cache_lock),
+ _normal_queue.get_total_cache_size(cache_lock),
+ _normal_queue.get_elements_num(cache_lock), cost);
+ });
} else {
std::error_code ec;
fs::create_directories(_cache_base_path, ec);
@@ -136,17 +160,8 @@ Status LRUFileCache::initialize() {
_is_initialized = true;
_cache_background_thread =
std::thread(&LRUFileCache::run_background_operation, this);
int64_t cost = watch.elapsed_time() / 1000 / 1000;
- LOG(INFO) << fmt::format(
- "After initialize file cache path={}, disposable queue size={}
elements={}, index "
- "queue size={} "
- "elements={}, query queue "
- "size={} elements={}, init cost(ms)={}",
- _cache_base_path,
_disposable_queue.get_total_cache_size(cache_lock),
- _disposable_queue.get_elements_num(cache_lock),
- _index_queue.get_total_cache_size(cache_lock),
- _index_queue.get_elements_num(cache_lock),
- _normal_queue.get_total_cache_size(cache_lock),
- _normal_queue.get_elements_num(cache_lock), cost);
+ LOG(INFO) << fmt::format("After initialize file cache path={}, init
cost(ms)={}",
+ _cache_base_path, cost);
return Status::OK();
}
@@ -376,6 +391,16 @@ void
LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co
FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset,
size_t size,
const CacheContext& context) {
+ if (!_lazy_open_done) {
+ // Cache is not ready yet
+ VLOG_NOTICE << fmt::format(
+ "Cache is not ready yet, skip cache for key: {}, offset: {},
size: {}.",
+ key.to_string(), offset, size);
+ FileBlocks file_blocks = {std::make_shared<FileBlock>(
+ offset, size, key, this, FileBlock::State::SKIP_CACHE,
context.cache_type)};
+ return FileBlocksHolder(std::move(file_blocks));
+ }
+
FileBlock::Range range(offset, offset + size - 1);
std::lock_guard cache_lock(_mutex);
@@ -827,6 +852,7 @@ Status
LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
std::vector<std::pair<Key, size_t>> queue_entries;
std::vector<std::string> need_to_check_if_empty_dir;
Status st = Status::OK();
+ size_t scan_file_num = 0;
auto scan_file_cache = [&](fs::directory_iterator& key_it) {
for (; key_it != fs::directory_iterator(); ++key_it) {
key = Key(
@@ -888,6 +914,11 @@ Status
LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
}
need_to_check_if_empty_dir.push_back(key_it->path());
}
+ scan_file_num += 1;
+ if (scan_file_num %
config::async_file_cache_init_file_num_interval == 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ config::async_file_cache_init_sleep_interval_ms));
+ }
}
}
};
diff --git a/be/src/io/cache/block/block_lru_file_cache.h
b/be/src/io/cache/block/block_lru_file_cache.h
index c7644ecbd8a..bcf00d938a7 100644
--- a/be/src/io/cache/block/block_lru_file_cache.h
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -53,6 +53,9 @@ public:
LRUFileCache(const std::string& cache_base_path, const FileCacheSettings&
cache_settings);
~LRUFileCache() override {
_close = true;
+ if (_cache_background_load_thread.joinable()) {
+ _cache_background_thread.join();
+ }
if (_cache_background_thread.joinable()) {
_cache_background_thread.join();
}
@@ -201,6 +204,8 @@ public:
private:
std::atomic_bool _close {false};
std::thread _cache_background_thread;
+ std::atomic_bool _lazy_open_done {true};
+ std::thread _cache_background_load_thread;
size_t _num_read_segments = 0;
size_t _num_hit_segments = 0;
size_t _num_removed_segments = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]