This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5e95141bb87 branch-3.0: [enhancement](cloud) file cache evict in
advance #47473 (#47614)
5e95141bb87 is described below
commit 5e95141bb87b297ea6e5969730a5e9ae653949f9
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Feb 8 18:57:56 2025 +0800
branch-3.0: [enhancement](cloud) file cache evict in advance #47473 (#47614)
Cherry-picked from #47473
Co-authored-by: zhengyu <[email protected]>
---
be/src/common/config.cpp | 6 +
be/src/common/config.h | 5 +
be/src/io/cache/block_file_cache.cpp | 146 +++++++++++++++--
be/src/io/cache/block_file_cache.h | 37 ++++-
be/test/io/cache/block_file_cache_test.cpp | 243 ++++++++++++++++++++++++++++-
5 files changed, 416 insertions(+), 21 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 089766214cf..a6981ca7315 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1071,6 +1071,12 @@ DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
+DEFINE_mBool(enable_evict_file_cache_in_advance, "true");
+DEFINE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent, "78");
+DEFINE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent, "75");
+DEFINE_mInt32(file_cache_evict_in_advance_interval_ms, "1000");
+DEFINE_mInt64(file_cache_evict_in_advance_batch_bytes, "31457280"); // 30MB
+
DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
// If true, evict the ttl cache using LRU when full.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e3900c0dd86..40cf328d662 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1107,6 +1107,11 @@ DECLARE_Bool(clear_file_cache);
DECLARE_Bool(enable_file_cache_query_limit);
DECLARE_Int32(file_cache_enter_disk_resource_limit_mode_percent);
DECLARE_Int32(file_cache_exit_disk_resource_limit_mode_percent);
+DECLARE_mBool(enable_evict_file_cache_in_advance);
+DECLARE_mInt32(file_cache_enter_need_evict_cache_in_advance_percent);
+DECLARE_mInt32(file_cache_exit_need_evict_cache_in_advance_percent);
+DECLARE_mInt32(file_cache_evict_in_advance_interval_ms);
+DECLARE_mInt64(file_cache_evict_in_advance_batch_bytes);
DECLARE_mBool(enable_read_cache_file_directly);
DECLARE_Bool(file_cache_enable_evict_from_other_queue_by_size);
// If true, evict the ttl cache using LRU when full.
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index 3068d7ed439..7c49a61382c 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -201,6 +201,8 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
"file_cache_hit_ratio_1h", 0.0);
_disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
_cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
+ _need_evict_cache_in_advance_metrics =
std::make_shared<bvar::Status<size_t>>(
+ _cache_base_path.c_str(),
"file_cache_need_evict_cache_in_advance", 0);
_cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
@@ -212,6 +214,11 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
_cache_base_path.c_str(),
"file_cache_storage_retry_sync_remove_latency_us");
_storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(),
"file_cache_storage_async_remove_latency_us");
+ _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
+ _cache_base_path.c_str(),
"file_cache_evict_in_advance_latency_us");
+
+ _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
+ _cache_base_path.c_str(), "file_cache_recycle_keys_length");
_disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
cache_settings.disposable_queue_elements, 60
* 60);
@@ -339,6 +346,8 @@ Status
BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
_cache_background_monitor_thread =
std::thread(&BlockFileCache::run_background_monitor, this);
_cache_background_ttl_gc_thread =
std::thread(&BlockFileCache::run_background_ttl_gc, this);
_cache_background_gc_thread =
std::thread(&BlockFileCache::run_background_gc, this);
+ _cache_background_evict_in_advance_thread =
+ std::thread(&BlockFileCache::run_background_evict_in_advance,
this);
return Status::OK();
}
@@ -1021,6 +1030,16 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper&
hash, const CacheContext&
return true;
}
+void BlockFileCache::try_evict_in_advance(size_t size,
std::lock_guard<std::mutex>& cache_lock) {
+ UInt128Wrapper hash = UInt128Wrapper();
+ size_t offset = 0;
+ CacheContext context;
+ context.cache_type = FileCacheType::NORMAL;
+ try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock,
false);
+ context.cache_type = FileCacheType::TTL;
+ try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock,
false);
+}
+
bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key,
bool remove_directly,
std::lock_guard<std::mutex>&
cache_lock, bool sync) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
@@ -1178,7 +1197,7 @@ void BlockFileCache::reset_range(const UInt128Wrapper&
hash, size_t offset, size
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types,
size_t size,
- int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) {
+ int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool
sync_removal) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
@@ -1211,7 +1230,7 @@ bool
BlockFileCache::try_reserve_from_other_queue_by_time_interval(
}
*(_evict_by_time_metrics_matrix[cache_type][cur_type]) <<
remove_size_per_type;
}
- remove_file_blocks(to_evict, cache_lock, true);
+ remove_file_blocks(to_evict, cache_lock, sync_removal);
return !is_overflow(removed_size, size, cur_cache_size);
}
@@ -1229,7 +1248,7 @@ bool BlockFileCache::is_overflow(size_t removed_size,
size_t need_size,
bool BlockFileCache::try_reserve_from_other_queue_by_size(
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types,
size_t size,
- std::lock_guard<std::mutex>& cache_lock) {
+ std::lock_guard<std::mutex>& cache_lock, bool sync_removal) {
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
std::vector<FileBlockCell*> to_evict;
@@ -1249,17 +1268,18 @@ bool
BlockFileCache::try_reserve_from_other_queue_by_size(
cur_removed_size);
*(_evict_by_size_metrics_matrix[cache_type][cur_type]) <<
cur_removed_size;
}
- remove_file_blocks(to_evict, cache_lock, true);
+ remove_file_blocks(to_evict, cache_lock, sync_removal);
return !is_overflow(removed_size, size, cur_cache_size);
}
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType
cur_cache_type, size_t size,
int64_t cur_time,
- std::lock_guard<std::mutex>&
cache_lock) {
+ std::lock_guard<std::mutex>&
cache_lock,
+ bool sync_removal) {
// currently, TTL cache is not considered as a candidate
auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
bool reserve_success = try_reserve_from_other_queue_by_time_interval(
- cur_cache_type, other_cache_types, size, cur_time, cache_lock);
+ cur_cache_type, other_cache_types, size, cur_time, cache_lock,
sync_removal);
if (reserve_success ||
!config::file_cache_enable_evict_from_other_queue_by_size) {
return reserve_success;
}
@@ -1272,14 +1292,15 @@ bool
BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type,
if (_cur_cache_size + size > _capacity && cur_queue_size + size >
cur_queue_max_size) {
return false;
}
- return try_reserve_from_other_queue_by_size(cur_cache_type,
other_cache_types, size,
- cache_lock);
+ return try_reserve_from_other_queue_by_size(cur_cache_type,
other_cache_types, size, cache_lock,
+ sync_removal);
}
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
QueryFileCacheContextPtr
query_context,
const CacheContext& context, size_t
offset, size_t size,
- std::lock_guard<std::mutex>&
cache_lock) {
+ std::lock_guard<std::mutex>&
cache_lock,
+ bool sync_removal) {
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
@@ -1292,7 +1313,7 @@ bool BlockFileCache::try_reserve_for_lru(const
UInt128Wrapper& hash,
size_t cur_removed_size = 0;
find_evict_candidates(queue, size, cur_cache_size, removed_size,
to_evict, cache_lock,
cur_removed_size);
- remove_file_blocks(to_evict, cache_lock, true);
+ remove_file_blocks(to_evict, cache_lock, sync_removal);
*(_evict_by_self_lru_metrics_matrix[context.cache_type]) <<
cur_removed_size;
if (is_overflow(removed_size, size, cur_cache_size)) {
@@ -1345,7 +1366,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T&
cache_lock, U& block_lo
// so there will be a window that the file is not in the cache but
still in the storage
// but it's ok, because the rowset is stale already
bool ret = _recycle_keys.enqueue(key);
- if (!ret) {
+ if (ret) [[likely]] {
+ *_recycle_keys_length_recorder << _recycle_keys.size_approx();
+ } else {
LOG_WARNING("Failed to push recycle key to queue, do it
synchronously");
int64_t duration_ns = 0;
Status st;
@@ -1551,6 +1574,10 @@ int disk_used_percentage(const std::string& path,
std::pair<int, int>* percent)
int inode_percentage = int(inode_free * 1.0 / inode_total * 100);
percent->first = capacity_percentage;
percent->second = 100 - inode_percentage;
+
+ // Add sync point for testing
+ TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1",
percent);
+
return 0;
}
@@ -1643,7 +1670,7 @@ void BlockFileCache::check_disk_resource_limit() {
LOG_WARNING("config error, set to default value")
.tag("enter",
config::file_cache_enter_disk_resource_limit_mode_percent)
.tag("exit",
config::file_cache_exit_disk_resource_limit_mode_percent);
- config::file_cache_enter_disk_resource_limit_mode_percent = 90;
+ config::file_cache_enter_disk_resource_limit_mode_percent = 88;
config::file_cache_exit_disk_resource_limit_mode_percent = 80;
}
if (is_insufficient(space_percentage) ||
is_insufficient(inode_percentage)) {
@@ -1664,11 +1691,69 @@ void BlockFileCache::check_disk_resource_limit() {
}
}
+void BlockFileCache::check_need_evict_cache_in_advance() {
+ if (_storage->get_type() != FileCacheStorageType::DISK) {
+ return;
+ }
+
+ std::pair<int, int> percent;
+ int ret = disk_used_percentage(_cache_base_path, &percent);
+ if (ret != 0) {
+ LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error",
strerror(errno));
+ return;
+ }
+ auto [space_percentage, inode_percentage] = percent;
+ size_t size_percentage = static_cast<size_t>(
+ (static_cast<double>(_cur_cache_size) /
static_cast<double>(_capacity)) * 100);
+ auto is_insufficient = [](const int& percentage) {
+ return percentage >=
config::file_cache_enter_need_evict_cache_in_advance_percent;
+ };
+ DCHECK_GE(space_percentage, 0);
+ DCHECK_LE(space_percentage, 100);
+ DCHECK_GE(inode_percentage, 0);
+ DCHECK_LE(inode_percentage, 100);
+ // ATTN: due to that can be changed dynamically, set it to default value
if it's invalid
+ // FIXME: reject with config validator
+ if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
+ config::file_cache_exit_need_evict_cache_in_advance_percent) {
+ LOG_WARNING("config error, set to default value")
+ .tag("enter",
config::file_cache_enter_need_evict_cache_in_advance_percent)
+ .tag("exit",
config::file_cache_exit_need_evict_cache_in_advance_percent);
+ config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
+ config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
+ }
+ if (is_insufficient(space_percentage) || is_insufficient(inode_percentage)
||
+ is_insufficient(size_percentage)) {
+ _need_evict_cache_in_advance = true;
+ _need_evict_cache_in_advance_metrics->set_value(1);
+ } else if (_need_evict_cache_in_advance &&
+ (space_percentage <
config::file_cache_exit_need_evict_cache_in_advance_percent) &&
+ (inode_percentage <
config::file_cache_exit_need_evict_cache_in_advance_percent) &&
+ (size_percentage <
config::file_cache_exit_need_evict_cache_in_advance_percent)) {
+ _need_evict_cache_in_advance = false;
+ _need_evict_cache_in_advance_metrics->set_value(0);
+ }
+ if (_need_evict_cache_in_advance) {
+ LOG(WARNING) << "file_cache=" << get_base_path() << " space_percent="
<< space_percentage
+ << " inode_percent=" << inode_percentage << "
size_percent=" << size_percentage
+ << " is_space_insufficient=" <<
is_insufficient(space_percentage)
+ << " is_inode_insufficient=" <<
is_insufficient(inode_percentage)
+ << " is_size_insufficient=" <<
is_insufficient(size_percentage)
+ << " need evict cache in advance";
+ }
+}
+
void BlockFileCache::run_background_monitor() {
int64_t interval_time_seconds = 20;
while (!_close) {
TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time",
&interval_time_seconds);
check_disk_resource_limit();
+ if (config::enable_evict_file_cache_in_advance) {
+ check_need_evict_cache_in_advance();
+ } else {
+ _need_evict_cache_in_advance = false;
+ }
+
{
std::unique_lock close_lock(_close_mtx);
_close_cv.wait_for(close_lock,
std::chrono::seconds(interval_time_seconds));
@@ -1753,11 +1838,8 @@ void BlockFileCache::run_background_gc() {
break;
}
}
- while (_recycle_keys.try_dequeue(key)) {
- if (batch_count >= batch_limit) {
- break;
- }
+ while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
int64_t duration_ns = 0;
Status st;
{
@@ -1771,10 +1853,42 @@ void BlockFileCache::run_background_gc() {
}
batch_count++;
}
+ *_recycle_keys_length_recorder << _recycle_keys.size_approx();
batch_count = 0;
}
}
+void BlockFileCache::run_background_evict_in_advance() {
+ LOG(INFO) << "Starting background evict in advance thread";
+ int64_t batch = 0;
+ while (!_close) {
+ {
+ std::unique_lock close_lock(_close_mtx);
+ _close_cv.wait_for(
+ close_lock,
+
std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
+ if (_close) {
+ LOG(INFO) << "Background evict in advance thread exiting due
to cache closing";
+ break;
+ }
+ }
+ batch = config::file_cache_evict_in_advance_batch_bytes;
+
+ // Skip if eviction not needed or too many pending recycles
+ if (!_need_evict_cache_in_advance || _recycle_keys.size_approx() >=
(batch * 10)) {
+ continue;
+ }
+
+ int64_t duration_ns = 0;
+ {
+ SCOPED_CACHE_LOCK(_mutex, this);
+ SCOPED_RAW_TIMER(&duration_ns);
+ try_evict_in_advance(batch, cache_lock);
+ }
+ *_evict_in_advance_latency_us << (duration_ns / 1000);
+ }
+}
+
void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
uint64_t new_expiration_time) {
SCOPED_CACHE_LOCK(_mutex, this);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index ce8d13d4a14..5b998708241 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -109,6 +109,9 @@ public:
if (_cache_background_gc_thread.joinable()) {
_cache_background_gc_thread.join();
}
+ if (_cache_background_evict_in_advance_thread.joinable()) {
+ _cache_background_evict_in_advance_thread.join();
+ }
}
/// Restore cache from local filesystem.
@@ -190,6 +193,22 @@ public:
bool try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
size_t offset,
size_t size, std::lock_guard<std::mutex>& cache_lock);
+ /**
+ * Proactively evict cache blocks to free up space before cache is full.
+ *
+ * This function attempts to evict blocks from both NORMAL and TTL queues
to maintain
+ * cache size below high watermark. Unlike try_reserve() which blocks
until space is freed,
+ * this function initiates asynchronous eviction in background.
+ *
+ * @param size Number of bytes to try to evict
+ * @param cache_lock Lock that must be held while accessing cache data
structures
+ *
+ * @pre Caller must hold cache_lock
+ * @pre _need_evict_cache_in_advance must be true
+ * @pre _recycle_keys queue must have capacity for evicted blocks
+ */
+ void try_evict_in_advance(size_t size, std::lock_guard<std::mutex>&
cache_lock);
+
void update_ttl_atime(const UInt128Wrapper& hash);
std::map<std::string, double> get_stats();
@@ -395,7 +414,7 @@ private:
bool try_reserve_for_lru(const UInt128Wrapper& hash,
QueryFileCacheContextPtr query_context,
const CacheContext& context, size_t offset,
size_t size,
- std::lock_guard<std::mutex>& cache_lock);
+ std::lock_guard<std::mutex>& cache_lock, bool
sync_removal = true);
bool try_reserve_during_async_load(size_t size,
std::lock_guard<std::mutex>& cache_lock);
@@ -403,7 +422,8 @@ private:
std::vector<FileCacheType> get_other_cache_type_without_ttl(FileCacheType
cur_cache_type);
bool try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t
offset, int64_t cur_time,
- std::lock_guard<std::mutex>& cache_lock);
+ std::lock_guard<std::mutex>& cache_lock,
+ bool sync_removal = true);
size_t get_available_cache_size(FileCacheType cache_type) const;
@@ -426,6 +446,7 @@ private:
std::lock_guard<std::mutex>&
cache_lock) const;
void check_disk_resource_limit();
+ void check_need_evict_cache_in_advance();
size_t get_available_cache_size_unlocked(FileCacheType type,
std::lock_guard<std::mutex>&
cache_lock) const;
@@ -441,15 +462,18 @@ private:
void run_background_monitor();
void run_background_ttl_gc();
void run_background_gc();
+ void run_background_evict_in_advance();
bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
std::vector<FileCacheType> other_cache_types,
size_t size, int64_t
cur_time,
-
std::lock_guard<std::mutex>& cache_lock);
+
std::lock_guard<std::mutex>& cache_lock,
+ bool sync_removal);
bool try_reserve_from_other_queue_by_size(FileCacheType cur_type,
std::vector<FileCacheType>
other_cache_types,
- size_t size,
std::lock_guard<std::mutex>& cache_lock);
+ size_t size,
std::lock_guard<std::mutex>& cache_lock,
+ bool sync_removal);
bool is_overflow(size_t removed_size, size_t need_size, size_t
cur_cache_size) const;
@@ -476,9 +500,11 @@ private:
std::thread _cache_background_monitor_thread;
std::thread _cache_background_ttl_gc_thread;
std::thread _cache_background_gc_thread;
+ std::thread _cache_background_evict_in_advance_thread;
std::atomic_bool _async_open_done {false};
// disk space or inode is less than the specified value
bool _disk_resource_limit_mode {false};
+ bool _need_evict_cache_in_advance {false};
bool _is_initialized {false};
// strategy
@@ -536,12 +562,15 @@ private:
std::shared_ptr<bvar::Status<double>> _hit_ratio_5m;
std::shared_ptr<bvar::Status<double>> _hit_ratio_1h;
std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics;
+ std::shared_ptr<bvar::Status<size_t>> _need_evict_cache_in_advance_metrics;
std::shared_ptr<bvar::LatencyRecorder> _cache_lock_wait_time_us;
std::shared_ptr<bvar::LatencyRecorder> _get_or_set_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _storage_sync_remove_latency_us;
std::shared_ptr<bvar::LatencyRecorder>
_storage_retry_sync_remove_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency_us;
+ std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
+ std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
};
} // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 117f01d63e3..e42b6516d58 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -139,6 +139,10 @@ class BlockFileCacheTest : public testing::Test {
public:
static void SetUpTestSuite() {
config::file_cache_enter_disk_resource_limit_mode_percent = 99;
+ config::enable_evict_file_cache_in_advance = false; // disable evict in
+ // advance for most
+ // cases for simple
+ // verification
bool exists {false};
ASSERT_TRUE(global_local_filesystem()->exists(caches_dir,
&exists).ok());
if (!exists) {
@@ -4402,7 +4406,7 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_1) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
- EXPECT_EQ(config::file_cache_enter_disk_resource_limit_mode_percent, 90);
+ EXPECT_EQ(config::file_cache_enter_disk_resource_limit_mode_percent, 88);
EXPECT_EQ(config::file_cache_exit_disk_resource_limit_mode_percent, 80);
config::file_cache_enter_disk_resource_limit_mode_percent = 99;
if (fs::exists(cache_base_path)) {
@@ -6742,4 +6746,241 @@ TEST_F(BlockFileCacheTest,
evict_privilege_order_for_ttl) {
}
}
+TEST_F(BlockFileCacheTest, evict_in_advance) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ auto sp = SyncPoint::get_instance();
+ SyncPoint::CallbackGuard guard1;
+ sp->set_call_back(
+ "BlockFileCache::set_sleep_time",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; },
&guard1);
+ sp->enable_processing();
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+
+ settings.ttl_queue_size = 5000000;
+ settings.ttl_queue_elements = 50000;
+ settings.query_queue_size = 3000000;
+ settings.query_queue_elements = 30000;
+ settings.index_queue_size = 1000000;
+ settings.index_queue_elements = 10000;
+ settings.disposable_queue_size = 1000000;
+ settings.disposable_queue_elements = 10000;
+ settings.capacity = 10000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+
+ size_t limit = 1000000;
+ size_t cache_max = 10000000;
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.query_id = query_id;
+ // int64_t cur_time = UnixSeconds();
+ // context.expiration_time = cur_time + 120;
+ auto key1 = io::BlockFileCache::hash("key1");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ // fill the cache to its limit
+ for (; offset < limit; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(2, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ // grab more exceed the limit to max cache capacity
+ for (; offset < cache_max; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(3, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(4, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+
ASSERT_EQ(cache._evict_by_self_lru_metrics_matrix[FileCacheType::INDEX]->get_value(),
0);
+
+ // grab more exceed the cache capacity
+ size_t exceed = 2000000;
+ for (; offset < (cache_max + exceed); offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+
+ assert_range(5, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download(blocks[0]);
+ assert_range(6, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+
+ blocks.clear();
+ }
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max);
+
+ config::file_cache_evict_in_advance_batch_bytes = 200000; // evict 2
100000 blocks
+ config::enable_evict_file_cache_in_advance = true; // enable
evict in advance
+ std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // wait for
clear
+ ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
+ ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], cache_max -
200000);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+TEST_F(BlockFileCacheTest, test_check_need_evict_cache_in_advance) {
+ std::string cache_base_path = "./ut_file_cache_dir";
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.capacity = 100_mb;
+ settings.storage = "disk";
+ settings.query_queue_size = 50_mb;
+ settings.index_queue_size = 20_mb;
+ settings.disposable_queue_size = 20_mb;
+ settings.ttl_queue_size = 10_mb;
+
+ // this one for memory storage
+ {
+ settings.storage = "memory";
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_FALSE(cache._need_evict_cache_in_advance);
+ cache.check_need_evict_cache_in_advance();
+ ASSERT_FALSE(cache._need_evict_cache_in_advance);
+ }
+
+ // the rest for disk
+ settings.storage = "disk";
+
+ // bad disk path
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_FALSE(cache._need_evict_cache_in_advance);
+
+ cache._cache_base_path = "/non/existent/path/OOXXOO";
+ cache.check_need_evict_cache_in_advance();
+ ASSERT_FALSE(cache._need_evict_cache_in_advance);
+ }
+
+ // conditions for enter need evict cache in advance
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_FALSE(cache._need_evict_cache_in_advance);
+
+ // condition1 space usage rate exceed threshold
+ config::file_cache_enter_need_evict_cache_in_advance_percent = 70;
+ config::file_cache_exit_need_evict_cache_in_advance_percent = 65;
+
+ SyncPoint::get_instance()->set_call_back(
+ "BlockFileCache::disk_used_percentage:1",
[](std::vector<std::any>&& values) {
+ auto* percent = try_any_cast<std::pair<int,
int>*>(values.back());
+ percent->first = 75; // set high
+ percent->second = 60;
+ });
+
+ SyncPoint::get_instance()->enable_processing();
+ cache.check_need_evict_cache_in_advance();
+ ASSERT_TRUE(cache._need_evict_cache_in_advance);
+ SyncPoint::get_instance()->disable_processing();
+ SyncPoint::get_instance()->clear_all_call_backs();
+
+ // condition2 inode usage rate exceed threshold
+ cache._need_evict_cache_in_advance = false;
+
+ SyncPoint::get_instance()->set_call_back(
+ "BlockFileCache::disk_used_percentage:1",
[](std::vector<std::any>&& values) {
+ auto* percent = try_any_cast<std::pair<int,
int>*>(values.back());
+ percent->first = 60;
+ percent->second = 75; // set high
+ });
+
+ SyncPoint::get_instance()->enable_processing();
+ cache.check_need_evict_cache_in_advance();
+ ASSERT_TRUE(cache._need_evict_cache_in_advance);
+ SyncPoint::get_instance()->disable_processing();
+ SyncPoint::get_instance()->clear_all_call_backs();
+
+ // condition3 cache size usage rate exceed threshold
+ cache._need_evict_cache_in_advance = false;
+ cache._cur_cache_size = 80_mb; // set high
+ cache.check_need_evict_cache_in_advance();
+ ASSERT_TRUE(cache._need_evict_cache_in_advance);
+ }
+
+ // conditions for exit need evict cache in advance
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+ cache._need_evict_cache_in_advance = true;
+ cache._cur_cache_size = 50_mb; // set low
+
+ SyncPoint::get_instance()->set_call_back(
+ "BlockFileCache::disk_used_percentage:1",
[](std::vector<std::any>&& values) {
+ auto* percent = try_any_cast<std::pair<int,
int>*>(values.back());
+ percent->first = 50; // set low
+ percent->second = 50; // set low
+ });
+
+ SyncPoint::get_instance()->enable_processing();
+ cache.check_need_evict_cache_in_advance();
+ ASSERT_FALSE(cache._need_evict_cache_in_advance);
+ SyncPoint::get_instance()->disable_processing();
+ SyncPoint::get_instance()->clear_all_call_backs();
+ }
+
+ // config parameter validation
+ {
+ io::BlockFileCache cache(cache_base_path, settings);
+
+ // set wrong config value
+ config::file_cache_enter_need_evict_cache_in_advance_percent = 70;
+ config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
+
+ cache.check_need_evict_cache_in_advance();
+
+ // reset to default value
+
ASSERT_EQ(config::file_cache_enter_need_evict_cache_in_advance_percent, 78);
+ ASSERT_EQ(config::file_cache_exit_need_evict_cache_in_advance_percent,
75);
+ }
+
+ fs::remove_all(cache_base_path);
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]