This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e010d8ddbca [enhancement](filecache) add filesystem leak cleaner
(#59269)
e010d8ddbca is described below
commit e010d8ddbcab2f66512742151cd1d210eb56c009
Author: zhengyu <[email protected]>
AuthorDate: Fri Feb 6 11:47:42 2026 +0800
[enhancement](filecache) add filesystem leak cleaner (#59269)
cache directory could be inconsistence with filecache meta store
somehow, so extend FSFileCacheStorage with inode-based stats,
leak-cleaner thread, and orphan cleanup helpers to clean such leakage.
---
be/src/common/config.cpp | 5 +
be/src/common/config.h | 5 +
be/src/io/cache/cache_block_meta_store.cpp | 27 +
be/src/io/cache/cache_block_meta_store.h | 3 +
be/src/io/cache/fs_file_cache_storage.cpp | 771 +++++++++++++++++++--
be/src/io/cache/fs_file_cache_storage.h | 69 +-
.../io/cache/block_file_cache_test_meta_store.cpp | 178 +----
.../fs_file_cache_storage_leak_cleaner_test.cpp | 718 +++++++++++++++++++
8 files changed, 1556 insertions(+), 220 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1648c502f09..051e431c4f0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1201,6 +1201,11 @@
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
// if difference below this threshold, we consider cache's progressive
upgrading (2.0->3.0) successful
DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3");
DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");
+DEFINE_mDouble(file_cache_leak_fs_to_meta_ratio_threshold, "1.3");
+DEFINE_mInt64(file_cache_leak_scan_interval_seconds, "86400");
+DEFINE_mInt32(file_cache_leak_scan_batch_files, "2048");
+DEFINE_mInt32(file_cache_leak_scan_pause_ms, "500");
+DEFINE_mInt64(file_cache_leak_grace_seconds, "3600");
DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4e1ce9c96fc..5dec9c3f765 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1230,6 +1230,11 @@ DECLARE_mBool(enable_file_cache_adaptive_write);
DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
DECLARE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold);
DECLARE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio);
+DECLARE_mDouble(file_cache_leak_fs_to_meta_ratio_threshold);
+DECLARE_mInt64(file_cache_leak_scan_interval_seconds);
+DECLARE_mInt32(file_cache_leak_scan_batch_files);
+DECLARE_mInt32(file_cache_leak_scan_pause_ms);
+DECLARE_mInt64(file_cache_leak_grace_seconds);
DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
diff --git a/be/src/io/cache/cache_block_meta_store.cpp
b/be/src/io/cache/cache_block_meta_store.cpp
index 472886152c7..c42dd3f8003 100644
--- a/be/src/io/cache/cache_block_meta_store.cpp
+++ b/be/src/io/cache/cache_block_meta_store.cpp
@@ -332,6 +332,33 @@ std::unique_ptr<BlockMetaIterator>
CacheBlockMetaStore::get_all() {
return std::unique_ptr<BlockMetaIterator>(new RocksDBIterator(iter));
}
+size_t CacheBlockMetaStore::approximate_entry_count() const {
+ if (!_db) {
+ LOG(WARNING) << "Database not initialized when counting entries";
+ return 0;
+ }
+
+ rocksdb::ReadOptions read_options;
+ std::unique_ptr<rocksdb::Iterator> iter(
+ _db->NewIterator(read_options, _file_cache_meta_cf_handle.get()));
+ if (!iter) {
+ LOG(WARNING) << "Failed to create iterator when counting entries";
+ return 0;
+ }
+
+ size_t count = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ ++count;
+ }
+
+ if (!iter->status().ok()) {
+ LOG(WARNING) << "Iterator encountered error when counting entries: "
+ << iter->status().ToString();
+ }
+
+ return count;
+}
+
void CacheBlockMetaStore::delete_key(const BlockMetaKey& key) {
std::string key_str = serialize_key(key);
diff --git a/be/src/io/cache/cache_block_meta_store.h
b/be/src/io/cache/cache_block_meta_store.h
index bd3c6501c14..1a6659596e1 100644
--- a/be/src/io/cache/cache_block_meta_store.h
+++ b/be/src/io/cache/cache_block_meta_store.h
@@ -111,6 +111,9 @@ public:
// Get the approximate size of the write queue
size_t get_write_queue_size() const;
+ // Count entries stored in rocksdb (ignoring pending writes)
+ size_t approximate_entry_count() const;
+
private:
void async_write_worker();
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index 53e4f0c4dd9..fe06505df65 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -21,13 +21,27 @@
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
+#include <sys/stat.h>
#include <sys/statvfs.h>
-
+#include <unistd.h>
+
+#include <algorithm>
+#include <atomic>
+#include <cctype>
+#include <chrono>
+#include <cmath>
+#include <condition_variable>
+#include <ctime>
#include <filesystem>
+#include <limits>
#include <mutex>
+#include <random>
#include <system_error>
+#include <thread>
+#include <unordered_set>
#include <vector>
+#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
@@ -46,6 +60,21 @@
namespace doris::io {
+#ifdef BE_TEST
+namespace {
+FSFileCacheStorage::InodeEstimationTestHooks* g_inode_estimation_hooks =
nullptr;
+
+FSFileCacheStorage::InodeEstimationTestHooks* inode_test_hooks() {
+ return g_inode_estimation_hooks;
+}
+} // namespace
+
+void FSFileCacheStorage::set_inode_estimation_test_hooks(
+ FSFileCacheStorage::InodeEstimationTestHooks* hooks) {
+ g_inode_estimation_hooks = hooks;
+}
+#endif
+
struct BatchLoadArgs {
UInt128Wrapper hash;
CacheContext ctx;
@@ -111,12 +140,16 @@ size_t FDCache::file_reader_cache_size() {
return _file_reader_list.size();
}
-Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+Status FSFileCacheStorage::init(BlockFileCache* mgr) {
+ const char* metrics_prefix = mgr->_cache_base_path.c_str();
_iterator_dir_retry_cnt = std::make_shared<bvar::LatencyRecorder>(
- _cache_base_path.c_str(),
"file_cache_fs_storage_iterator_dir_retry_cnt");
- _cache_base_path = _mgr->_cache_base_path;
+ metrics_prefix, "file_cache_fs_storage_iterator_dir_retry_cnt");
+ _leak_scan_removed_files = std::make_shared<bvar::Adder<size_t>>(
+ metrics_prefix, "file_cache_leak_removed_files_cnt");
+ _cache_base_path = mgr->_cache_base_path;
+ _mgr = mgr;
_meta_store = std::make_unique<CacheBlockMetaStore>(_cache_base_path +
"/meta", 10000);
- _cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+ _cache_background_load_thread = std::thread([this, mgr]() {
try {
auto mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("FileCacheVersionReader"));
@@ -142,6 +175,7 @@ Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
load_cache_info_into_memory(mgr);
mgr->_async_open_done = true;
LOG_INFO("file cache {} lazy load done.", _cache_base_path);
+ start_leak_cleaner(mgr);
} catch (const std::exception& e) {
LOG(ERROR) << "Background cache loading thread failed with
exception: " << e.what();
} catch (...) {
@@ -608,12 +642,12 @@ bool FSFileCacheStorage::handle_already_loaded_block(
return true;
}
-void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache*
_mgr) const {
+void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache*
mgr) const {
int scan_length = 10000;
std::vector<BatchLoadArgs> batch_load_buffer;
batch_load_buffer.reserve(scan_length);
auto add_cell_batch_func = [&]() {
- SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
+ SCOPED_CACHE_LOCK(mgr->_mutex, mgr);
auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
@@ -623,8 +657,8 @@ void
FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
}
// if the file is tmp, it means it is the old file and it should
be removed
if (!args.is_tmp) {
- _mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
- FileBlock::State::DOWNLOADED, cache_lock);
+ mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
+ FileBlock::State::DOWNLOADED, cache_lock);
return;
}
std::error_code ec;
@@ -642,7 +676,9 @@ void
FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
auto key_with_suffix = key_it->path().filename().native();
auto delim_pos = key_with_suffix.find('_');
- DCHECK(delim_pos != std::string::npos);
+ if (delim_pos == std::string::npos || delim_pos !=
sizeof(uint128_t) * 2) {
+ continue;
+ }
std::string key_str = key_with_suffix.substr(0, delim_pos);
std::string expiration_time_str = key_with_suffix.substr(delim_pos
+ 1);
auto hash =
UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str()));
@@ -699,7 +735,7 @@ void
FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
// skip version file
continue;
}
- if (key_prefix_it->path().filename().native() == "meta") {
+ if (key_prefix_it->path().filename().native() == META_DIR_NAME) {
// skip rocksdb dir
continue;
}
@@ -756,7 +792,9 @@ Status
FSFileCacheStorage::get_file_cache_infos(std::vector<FileCacheInfo>& info
for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
auto key_with_suffix = key_it->path().filename().native();
auto delim_pos = key_with_suffix.find('_');
- DCHECK(delim_pos != std::string::npos);
+ if (delim_pos == std::string::npos || delim_pos !=
sizeof(uint128_t) * 2) {
+ continue;
+ }
std::string key_str = key_with_suffix.substr(0, delim_pos);
std::string expiration_time_str = key_with_suffix.substr(delim_pos
+ 1);
long expiration_time = std::stoul(expiration_time_str);
@@ -789,12 +827,13 @@ Status
FSFileCacheStorage::get_file_cache_infos(std::vector<FileCacheInfo>& info
return Status::OK();
}
-void FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache*
_mgr) const {
+void FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache*
mgr) const {
+ TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile1");
int scan_length = 10000;
std::vector<BatchLoadArgs> batch_load_buffer;
batch_load_buffer.reserve(scan_length);
auto add_cell_batch_func = [&]() {
- SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
+ SCOPED_CACHE_LOCK(mgr->_mutex, mgr);
auto f = [&](const BatchLoadArgs& args) {
// in async load mode, a cell may be added twice.
@@ -802,8 +841,8 @@ void
FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg
args.ctx.tablet_id, cache_lock)) {
return;
}
- _mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
- FileBlock::State::DOWNLOADED, cache_lock);
+ mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
FileBlock::State::DOWNLOADED,
+ cache_lock);
return;
};
std::for_each(batch_load_buffer.begin(), batch_load_buffer.end(), f);
@@ -868,9 +907,9 @@ void
FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg
TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2");
}
-void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr)
const {
+void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* mgr)
const {
// First load from database
- load_cache_info_into_memory_from_db(_mgr);
+ load_cache_info_into_memory_from_db(mgr);
std::string version;
auto st = read_file_cache_version(&version);
@@ -882,17 +921,45 @@ void
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
return;
}
+ // If cache directory is effectively empty (no cache data entries), write
version hint and
+ // return directly.
+ auto is_cache_base_path_empty = [&]() -> bool {
+ std::error_code ec;
+ std::filesystem::directory_iterator it {_cache_base_path, ec};
+ if (ec) {
+ LOG(WARNING) << "Failed to list cache directory: " <<
_cache_base_path
+ << ", error: " << ec.message();
+ return false;
+ }
+
+ for (; it != std::filesystem::directory_iterator(); ++it) {
+ auto name = it->path().filename().native();
+ if (name == META_DIR_NAME || name == "version") {
+ continue;
+ }
+ return false;
+ }
+ return true;
+ };
+
+ if (is_cache_base_path_empty()) {
+ if (st = write_file_cache_version(); !st.ok()) {
+ LOG(WARNING) << "Failed to write version hints for file cache,
err=" << st.to_string();
+ }
+ return;
+ }
+
// Count blocks loaded from database
size_t db_block_count = 0;
{
- std::lock_guard<std::mutex> lock(_mgr->_mutex);
- for (const auto& hash_entry : _mgr->_files) {
+ std::lock_guard<std::mutex> lock(mgr->_mutex);
+ for (const auto& hash_entry : mgr->_files) {
db_block_count += hash_entry.second.size();
}
}
// Estimate file count from filesystem using statfs
- size_t estimated_file_count = estimate_file_count_from_statfs();
+ size_t estimated_file_count = estimate_file_count_from_inode();
LOG(INFO) << "Cache loading statistics - DB blocks: " << db_block_count
<< ", Estimated FS files: " << estimated_file_count;
@@ -965,7 +1032,7 @@ Status FSFileCacheStorage::clear(std::string& msg) {
auto t0 = std::chrono::steady_clock::now();
for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
if (!key_it->is_directory()) continue; // all file cache data is in
sub-directories
- if (key_it->path().filename().native() == "meta") continue;
+ if (key_it->path().filename().native() == META_DIR_NAME) continue;
++total;
std::string cache_key = key_it->path().string();
auto st = global_local_filesystem()->delete_directory(cache_key);
@@ -996,82 +1063,632 @@ FSFileCacheStorage::~FSFileCacheStorage() {
if (_cache_background_load_thread.joinable()) {
_cache_background_load_thread.join();
}
+ stop_leak_cleaner();
}
-size_t FSFileCacheStorage::estimate_file_count_from_statfs() const {
- struct statvfs vfs;
- if (statvfs(_cache_base_path.c_str(), &vfs) != 0) {
- LOG(WARNING) << "Failed to get filesystem statistics for path: " <<
_cache_base_path
- << ", error: " << strerror(errno);
+size_t FSFileCacheStorage::estimate_file_count_from_inode() const {
+ int64_t duration_ns = 0;
+ size_t cache_files = 0;
+ {
+ SCOPED_RAW_TIMER(&duration_ns);
+ do {
+ struct statvfs vfs {};
+ int statvfs_res = 0;
+#ifdef BE_TEST
+ if (auto* hooks = inode_test_hooks(); hooks &&
hooks->statvfs_override) {
+ statvfs_res = hooks->statvfs_override(_cache_base_path, &vfs);
+ } else
+#endif
+ {
+ statvfs_res = statvfs(_cache_base_path.c_str(), &vfs);
+ }
+ if (statvfs_res != 0) {
+ LOG(WARNING) << "Failed to get filesystem statistics for path:
" << _cache_base_path
+ << ", error: " << strerror(errno);
+ break;
+ }
+
+ if (vfs.f_files == 0) {
+ LOG(WARNING) << "Filesystem returned zero total inodes for
path "
+ << _cache_base_path;
+ break;
+ }
+
+ struct stat cache_stat {};
+ int lstat_res = 0;
+#ifdef BE_TEST
+ if (auto* hooks = inode_test_hooks(); hooks &&
hooks->lstat_override) {
+ lstat_res = hooks->lstat_override(_cache_base_path,
&cache_stat);
+ } else
+#endif
+ {
+ lstat_res = lstat(_cache_base_path.c_str(), &cache_stat);
+ }
+ if (lstat_res != 0) {
+ LOG(WARNING) << "Failed to stat cache base path " <<
_cache_base_path << ": "
+ << strerror(errno);
+ break;
+ }
+
+ size_t total_inodes_used = vfs.f_files - vfs.f_ffree;
+ size_t non_cache_inodes = estimate_non_cache_inode_usage();
+ size_t directory_inodes = estimate_cache_directory_inode_usage();
+
+ if (total_inodes_used > non_cache_inodes + directory_inodes) {
+ cache_files = total_inodes_used - non_cache_inodes -
directory_inodes;
+ } else {
+ LOG(WARNING) << fmt::format(
+ "Inode subtraction underflow: total={} non_cache={}
directory={}",
+ total_inodes_used, non_cache_inodes, directory_inodes);
+ }
+
+ LOG(INFO) << fmt::format(
+ "Cache inode estimation: total_used={}, non_cache={},
directories≈{}, files≈{}",
+ total_inodes_used, non_cache_inodes, directory_inodes,
cache_files);
+ } while (false);
+ }
+ const double duration_ms = static_cast<double>(duration_ns) / 1'000'000.0;
+ LOG(INFO) << fmt::format("estimate_file_count_from_inode
duration_ms={:.3f}, files={}",
+ duration_ms, cache_files);
+ return cache_files;
+}
+
+size_t FSFileCacheStorage::count_inodes_for_path(
+ const std::filesystem::path& path, dev_t target_dev,
+ const std::filesystem::path& excluded_root,
+ std::unordered_set<InodeKey, InodeKeyHash>& visited) const {
+#ifdef BE_TEST
+ if (auto* hooks = inode_test_hooks(); hooks &&
hooks->count_inodes_override) {
+ return hooks->count_inodes_override(*this, path, target_dev,
excluded_root, visited);
+ }
+#endif
+ if (!excluded_root.empty()) {
+ std::error_code eq_ec;
+ bool is_excluded = std::filesystem::equivalent(path, excluded_root,
eq_ec);
+ if (eq_ec) {
+ LOG(WARNING) << "Failed to compare " << path << " with " <<
excluded_root << ": "
+ << eq_ec.message();
+ } else if (is_excluded) {
+ return 0;
+ }
+ }
+
+ struct stat st {};
+ if (lstat(path.c_str(), &st) != 0) {
+ LOG(WARNING) << "Failed to stat path " << path << ": " <<
strerror(errno);
+ return 0;
+ }
+ if (st.st_dev != target_dev) {
+ return 0;
+ }
+ InodeKey key {st.st_dev, st.st_ino};
+ if (!visited.insert(key).second) {
return 0;
}
- // Get total size of cache directory to estimate file count
+ size_t count = 1;
+ if (S_ISDIR(st.st_mode)) {
+ std::error_code ec;
+ for (std::filesystem::directory_iterator it {path, ec};
+ !ec && it != std::filesystem::directory_iterator(); ++it) {
+ count += count_inodes_for_path(it->path(), target_dev,
excluded_root, visited);
+ }
+ if (ec) {
+ LOG(WARNING) << "Failed to iterate directory " << path << ": " <<
ec.message();
+ }
+ }
+ return count;
+}
+
+bool FSFileCacheStorage::is_cache_prefix_directory(
+ const std::filesystem::directory_entry& entry) const {
+ if (!entry.is_directory()) {
+ return false;
+ }
+ auto name = entry.path().filename().native();
+ if (name == META_DIR_NAME || name.empty()) {
+ return false;
+ }
+ if (name.size() != KEY_PREFIX_LENGTH) {
+ return false;
+ }
+ return std::all_of(name.begin(), name.end(), [](unsigned char c) { return
std::isxdigit(c); });
+}
+
+std::filesystem::path FSFileCacheStorage::find_mount_root(dev_t cache_dev)
const {
+#ifdef BE_TEST
+ if (auto* hooks = inode_test_hooks(); hooks &&
hooks->find_mount_root_override) {
+ return hooks->find_mount_root_override(*this, cache_dev);
+ }
+#endif
std::error_code ec;
- uintmax_t total_size = 0;
- std::vector<std::filesystem::path> pending_dirs
{std::filesystem::path(_cache_base_path)};
- while (!pending_dirs.empty()) {
- auto current_dir = pending_dirs.back();
- pending_dirs.pop_back();
+ std::filesystem::path current =
std::filesystem::absolute(_cache_base_path, ec);
+ if (ec) {
+ LOG(WARNING) << "Failed to resolve absolute cache base path " <<
_cache_base_path << ": "
+ << ec.message();
+ current = _cache_base_path;
+ }
- std::filesystem::directory_iterator it(current_dir, ec);
- if (ec) {
- LOG(WARNING) << "Failed to list directory while estimating file
count, dir="
- << current_dir << ", err=" << ec.message();
- ec.clear();
+ std::filesystem::path result = current;
+ while (result.has_parent_path()) {
+ auto parent = result.parent_path();
+ if (parent.empty() || parent == result) {
+ break;
+ }
+ struct stat st {};
+ if (lstat(parent.c_str(), &st) != 0) {
+ LOG(WARNING) << "Failed to stat parent path " << parent << ": " <<
strerror(errno);
+ break;
+ }
+ if (st.st_dev != cache_dev) {
+ break;
+ }
+ result = parent;
+ }
+ return result;
+}
+
+size_t FSFileCacheStorage::estimate_non_cache_inode_usage() const {
+#ifdef BE_TEST
+ if (auto* hooks = inode_test_hooks(); hooks && hooks->non_cache_override) {
+ return hooks->non_cache_override(*this);
+ }
+#endif
+ struct stat cache_stat {};
+ if (lstat(_cache_base_path.c_str(), &cache_stat) != 0) {
+ LOG(WARNING) << "Failed to stat cache base path " << _cache_base_path
<< ": "
+ << strerror(errno);
+ return 0;
+ }
+
+ auto mount_root = find_mount_root(cache_stat.st_dev);
+ if (mount_root.empty()) {
+ LOG(WARNING) << "Failed to determine mount root for cache path " <<
_cache_base_path;
+ return 0;
+ }
+
+ std::unordered_set<InodeKey, InodeKeyHash> visited;
+ std::error_code abs_ec;
+ std::filesystem::path excluded =
std::filesystem::absolute(_cache_base_path, abs_ec);
+ if (abs_ec) {
+ LOG(WARNING) << "Failed to get absolute cache base path " <<
_cache_base_path << ": "
+ << abs_ec.message();
+ excluded = _cache_base_path;
+ }
+
+ return count_inodes_for_path(mount_root, cache_stat.st_dev, excluded,
visited);
+}
+
+size_t FSFileCacheStorage::estimate_cache_directory_inode_usage() const {
+#ifdef BE_TEST
+ if (auto* hooks = inode_test_hooks(); hooks && hooks->cache_dir_override) {
+ return hooks->cache_dir_override(*this);
+ }
+#endif
+ constexpr size_t kSampleLimit = 3;
+ size_t prefix_dirs = 0;
+ std::vector<std::filesystem::path> samples;
+
+ std::error_code ec;
+ std::filesystem::directory_iterator it {_cache_base_path, ec};
+ if (ec) {
+ LOG(WARNING) << "Failed to list cache base path for directory
estimation: " << ec.message();
+ return 0;
+ }
+
+ for (; it != std::filesystem::directory_iterator(); ++it) {
+ if (!is_cache_prefix_directory(*it)) {
continue;
}
+ ++prefix_dirs;
+ if (samples.size() < kSampleLimit) {
+ samples.emplace_back(it->path());
+ }
+ }
- for (; it != std::filesystem::directory_iterator(); ++it) {
- std::error_code status_ec;
- auto entry_status = it->symlink_status(status_ec);
- TEST_SYNC_POINT_CALLBACK(
-
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterEntryStatus",
- &status_ec);
- if (status_ec) {
- LOG(WARNING) << "Failed to stat entry while estimating file
count, path="
- << it->path() << ", err=" << status_ec.message();
- continue;
+ if (prefix_dirs == 0 || samples.empty()) {
+ return 0;
+ }
+
+ size_t sampled_second_level = 0;
+ for (const auto& prefix_path : samples) {
+ size_t local_count = 0;
+ std::error_code sample_ec;
+ for (std::filesystem::directory_iterator prefix_it {prefix_path,
sample_ec};
+ !sample_ec && prefix_it != std::filesystem::directory_iterator();
++prefix_it) {
+ if (prefix_it->is_directory()) {
+ ++local_count;
}
+ }
+ if (sample_ec) {
+ LOG(WARNING) << "Failed to enumerate prefix directory " <<
prefix_path << ": "
+ << sample_ec.message();
+ sample_ec.clear();
+ }
+ sampled_second_level += local_count;
+ }
- if (std::filesystem::is_directory(entry_status)) {
- auto next_dir = it->path();
- TEST_SYNC_POINT_CALLBACK(
-
"FSFileCacheStorage::estimate_file_count_from_statfs::OnDirectory",
- &next_dir);
- pending_dirs.emplace_back(next_dir);
- continue;
+ double average_second_level = static_cast<double>(sampled_second_level) /
samples.size();
+ size_t estimated_second_level =
+ static_cast<size_t>(std::llround(average_second_level *
prefix_dirs));
+ return prefix_dirs + estimated_second_level;
+}
+
+size_t FSFileCacheStorage::snapshot_metadata_block_count(BlockFileCache*
/*mgr*/) const {
+ // TODO(zhengyu): if the cache_lock problem is solved, we can then use _mgr
+ int64_t duration_ns = 0;
+ size_t block_count = 0;
+ {
+ SCOPED_RAW_TIMER(&duration_ns);
+ if (_meta_store) {
+ block_count = _meta_store->approximate_entry_count();
+ } else {
+ LOG(INFO) << "snapshot_metadata_block_count skipped because meta
store is null";
+ block_count = 0;
+ }
+ }
+ const double duration_ms = static_cast<double>(duration_ns) / 1'000'000.0;
+ LOG(INFO) << fmt::format("snapshot_metadata_block_count
duration_ms={:.3f}, blocks={}",
+ duration_ms, block_count);
+ return block_count;
+}
+
+std::vector<size_t> FSFileCacheStorage::snapshot_metadata_for_hash_offsets(
+ BlockFileCache* mgr, const UInt128Wrapper& hash) const {
+ std::vector<size_t> offsets;
+ std::lock_guard<std::mutex> lock(mgr->_mutex);
+ auto it = mgr->_files.find(hash);
+ if (it == mgr->_files.end()) {
+ return offsets;
+ }
+ offsets.reserve(it->second.size());
+ for (const auto& [offset, _] : it->second) {
+ offsets.push_back(offset);
+ }
+ return offsets;
+}
+
+void FSFileCacheStorage::start_leak_cleaner(BlockFileCache* mgr) {
+ if (config::file_cache_leak_scan_interval_seconds <= 0) {
+ LOG(WARNING) << "File cache leak cleaner disabled because interval <=
0";
+ return;
+ }
+
+ // if version file not 3.0 then just return, clean nothing
+ std::string version;
+ if (auto st = read_file_cache_version(&version); !st.ok()) {
+ LOG(WARNING) << "Failed to read file cache version: " <<
st.to_string();
+ return;
+ }
+ if (version != "3.0") {
+ LOG(WARNING) << "File cache leak cleaner skipped because version is
not 3.0";
+ return;
+ }
+
+ _stop_leak_cleaner.store(false, std::memory_order_relaxed);
+ _cache_leak_cleaner_thread = std::thread([this]() { leak_cleaner_loop();
});
+}
+
+void FSFileCacheStorage::stop_leak_cleaner() {
+ _stop_leak_cleaner.store(true, std::memory_order_relaxed);
+ _leak_cleaner_cv.notify_all();
+ if (_cache_leak_cleaner_thread.joinable()) {
+ _cache_leak_cleaner_thread.join();
+ }
+}
+
+void FSFileCacheStorage::leak_cleaner_loop() {
+ Thread::set_self_name("leak_cleaner_loop");
+
+ // randomly waiting before start the loop helps avoid thundering herd
problem
+ // for all strorages.
+ const int64_t interval_seconds =
+ std::max<int64_t>(1,
config::file_cache_leak_scan_interval_seconds);
+ std::mt19937_64 rng(std::random_device {}());
+ std::uniform_int_distribution<int64_t> dist(0, interval_seconds);
+ int64_t initial_delay = dist(rng);
+
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::leak_cleaner_loop::initial_delay",
+ &initial_delay);
+ if (initial_delay > 0) {
+ std::unique_lock<std::mutex> lock(_leak_cleaner_mutex);
+ _leak_cleaner_cv.wait_for(lock, std::chrono::seconds(initial_delay),
[this]() {
+ return _stop_leak_cleaner.load(std::memory_order_relaxed);
+ });
+ lock.unlock();
+ if (_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+ return;
+ }
+ }
+
+ while (!_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+ int64_t interval_s = interval_seconds;
+
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::leak_cleaner_loop::interval",
&interval_s);
+ auto interval = std::chrono::seconds(interval_s);
+ std::unique_lock<std::mutex> lock(_leak_cleaner_mutex);
+ _leak_cleaner_cv.wait_for(lock, interval, [this]() {
+ return _stop_leak_cleaner.load(std::memory_order_relaxed);
+ });
+ lock.unlock();
+ if (_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+ break;
+ }
+ try {
+
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::leak_cleaner_loop::before_run");
+ run_leak_cleanup(_mgr);
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "File cache leak cleaner encountered exception: "
<< e.what();
+ } catch (...) {
+ LOG(WARNING) << "File cache leak cleaner encountered unknown
exception";
+ }
+ }
+}
+
+void FSFileCacheStorage::run_leak_cleanup(BlockFileCache* mgr) {
+ size_t metadata_blocks = snapshot_metadata_block_count(mgr);
+ if (metadata_blocks == 0) {
+ LOG(INFO) << "file cache leak scan found zero metadata blocks, skip
cleanup";
+ return;
+ }
+
+ size_t fs_files = estimate_file_count_from_inode();
+ double ratio = static_cast<double>(fs_files) /
static_cast<double>(metadata_blocks);
+
+ LOG(INFO) << fmt::format(
+ "file cache leak scan stats: fs_files={}, metadata_blocks={},
ratio={:.4f}", fs_files,
+ metadata_blocks, ratio);
+
+ double threshold = config::file_cache_leak_fs_to_meta_ratio_threshold;
+ if (ratio <= threshold) {
+ LOG_INFO("file cache leak ratio {0:.4f} within threshold {1:.4f}, no
cleanup needed", ratio,
+ threshold);
+ return;
+ }
+
+ LOG(WARNING) << fmt::format(
+ "file cache leak ratio {0:.4f} exceeds threshold {1:.4f}, start
cleanup", ratio,
+ threshold);
+
+ cleanup_leaked_files(mgr, metadata_blocks);
+}
+
+void FSFileCacheStorage::cleanup_leaked_files(BlockFileCache* mgr, size_t
metadata_block_count) {
+ const size_t batch_size = std::max<int32_t>(1,
config::file_cache_leak_scan_batch_files);
+ const size_t pause_ms = std::max<int32_t>(0,
config::file_cache_leak_scan_pause_ms);
+
+ int64_t cleanup_wall_time_ns = 0;
+ int64_t metadata_hash_time_ns = 0;
+ int64_t metadata_index_time_ns = 0;
+ int64_t remove_candidates_time_ns = 0;
+ int64_t directory_loop_time_ns = 0;
+ size_t removed_files = 0;
+ size_t examined_files = 0;
+
+ std::vector<UInt128Wrapper> hash_keys;
+
+ {
+ SCOPED_RAW_TIMER(&cleanup_wall_time_ns);
+ {
+ SCOPED_RAW_TIMER(&metadata_hash_time_ns);
+ std::lock_guard<std::mutex> lock(mgr->_mutex);
+ hash_keys.reserve(mgr->_files.size());
+ for (const auto& [hash, _] : mgr->_files) {
+ hash_keys.push_back(hash);
}
+ }
- if (std::filesystem::is_regular_file(entry_status)) {
- std::error_code size_ec;
- auto file_size = it->file_size(size_ec);
- TEST_SYNC_POINT_CALLBACK(
-
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterFileSize",
- &size_ec);
- if (size_ec) {
- LOG(WARNING) << "Failed to get file size while estimating
file count, path="
- << it->path() << ", err=" <<
size_ec.message();
- continue;
+ std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash>
metadata_index;
+ if (metadata_block_count > 0) {
+ metadata_index.reserve(metadata_block_count * 2);
+ }
+
+ {
+ SCOPED_RAW_TIMER(&metadata_index_time_ns);
+ for (const auto& hash : hash_keys) {
+ auto offsets = snapshot_metadata_for_hash_offsets(mgr, hash);
+ for (const auto& offset : offsets) {
+ metadata_index.emplace(hash, offset);
}
- total_size += file_size;
}
}
- }
- if (total_size == 0) {
- return 0;
- }
+ struct OrphanCandidate {
+ std::string path;
+ UInt128Wrapper hash;
+ size_t offset;
+ std::string key_dir;
+ };
+
+ auto try_remove_empty_directory = [&](const std::string& dir) {
+ std::error_code ec;
+ std::filesystem::directory_iterator it(dir, ec);
+ if (ec || it != std::filesystem::directory_iterator()) {
+ return;
+ }
+ auto st = fs->delete_directory(dir);
+ if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+ LOG_WARNING("delete_directory {} failed", dir).error(st);
+ }
+ };
+
+ std::vector<OrphanCandidate> candidates;
+ candidates.reserve(batch_size);
+
+ auto remove_candidates = [&]() {
+ if (candidates.empty()) {
+ return;
+ }
+ int64_t remove_once_ns = 0;
+ {
+ SCOPED_RAW_TIMER(&remove_once_ns);
+ for (auto& candidate : candidates) {
+ auto st = fs->delete_file(candidate.path);
+ if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+ LOG_WARNING("delete orphan cache file {} failed",
candidate.path).error(st);
+ continue;
+ }
+ removed_files++;
+ try_remove_empty_directory(candidate.key_dir);
+ auto prefix_dir =
+
std::filesystem::path(candidate.key_dir).parent_path().string();
+ try_remove_empty_directory(prefix_dir);
+ }
+ candidates.clear();
+ }
+ remove_candidates_time_ns += remove_once_ns;
+ if (pause_ms > 0) {
+
std::this_thread::sleep_for(std::chrono::milliseconds(pause_ms));
+ }
+ };
+
+ std::error_code ec;
+ std::filesystem::directory_iterator prefix_it {_cache_base_path, ec};
+ if (ec) {
+ LOG(WARNING) << "Leak scan failed to list cache directory: " <<
_cache_base_path
+ << ", error: " << ec.message();
+ return;
+ }
+
+ for (; prefix_it != std::filesystem::directory_iterator();
++prefix_it) {
+ int64_t loop_once_ns = 0;
+ {
+ SCOPED_RAW_TIMER(&loop_once_ns);
+ std::string prefix_name =
prefix_it->path().filename().native();
+ if (!prefix_it->is_directory() || prefix_name == META_DIR_NAME
||
+ prefix_name.size() != KEY_PREFIX_LENGTH) {
+ continue;
+ }
+
+ std::filesystem::directory_iterator key_it {prefix_it->path(),
ec};
+ if (ec) {
+ LOG(WARNING) << "Leak scan failed to list prefix " <<
prefix_it->path().native()
+ << ", error: " << ec.message();
+ continue;
+ }
- // Estimate file count based on average file size
- // Assuming average file size of 1MB for cache blocks
- const uintmax_t average_file_size = 1024 * 1024; // 1MB
- size_t estimated_file_count = total_size / average_file_size;
+ for (; key_it != std::filesystem::directory_iterator();
++key_it) {
+ if (!key_it->is_directory()) {
+ continue;
+ }
+ auto key_with_suffix = key_it->path().filename().native();
+ auto delim_pos = key_with_suffix.find('_');
+ if (delim_pos == std::string::npos || delim_pos !=
sizeof(uint128_t) * 2) {
+ continue;
+ }
+
+ UInt128Wrapper hash;
+ try {
+ hash =
UInt128Wrapper(vectorized::unhex_uint<uint128_t>(
+ key_with_suffix.substr(0, delim_pos).c_str()));
+ } catch (...) {
+ LOG(WARNING) << "Leak scan failed to parse hash from "
<< key_with_suffix;
+ continue;
+ }
+
+ long expiration = 0;
+ try {
+ expiration =
std::stol(key_with_suffix.substr(delim_pos + 1));
+ } catch (...) {
+ LOG(WARNING)
+ << "Leak scan failed to parse expiration from
" << key_with_suffix;
+ continue;
+ }
+
+ std::filesystem::directory_iterator offset_it
{key_it->path(), ec};
+ if (ec) {
+ LOG(WARNING) << "Leak scan failed to list key
directory "
+ << key_it->path().native() << ", error: "
<< ec.message();
+ continue;
+ }
+
+ for (; offset_it != std::filesystem::directory_iterator();
++offset_it) {
+ if (!offset_it->is_regular_file()) {
+ continue;
+ }
+ const auto file_path = offset_it->path();
+ const std::string file_path_str = file_path.string();
+ size_t file_size = offset_it->file_size(ec);
+ if (ec) {
+ LOG(WARNING) << "Leak scan failed to fetch file
size of "
+ << file_path.native() << ": " <<
ec.message();
+ continue;
+ }
+
+ size_t offset = 0;
+ bool is_tmp = false;
+ FileCacheType cache_type = FileCacheType::NORMAL;
+ Status st = parse_filename_suffix_to_cache_type(
+ fs, offset_it->path().filename().native(),
expiration, file_size,
+ &offset, &is_tmp, &cache_type);
+ if (!st.ok()) {
+ continue;
+ }
+
+ AccessKeyAndOffset meta_key {hash, offset};
+
+ // If the file is present in metadata and not a tmp
file, skip it.
+ if (!is_tmp && metadata_index.find(meta_key) !=
metadata_index.end()) {
+ continue;
+ }
+
+ // For any file that is not referenced by metadata (or
tmp files),
+ // protect recently-created files from immediate
deletion. This avoids
+ // racing with writers. The grace window is configured
by
+ // file_cache_leak_grace_seconds and applies to all
orphan files.
+ const int64_t grace_seconds =
+ std::max<int64_t>(0,
config::file_cache_leak_grace_seconds);
+ if (grace_seconds > 0) {
+ struct stat st_buf {};
+ if (::stat(file_path.c_str(), &st_buf) != 0) {
+ LOG(WARNING) << "Leak scan failed to stat file
" << file_path_str
+ << ": " << strerror(errno);
+ } else {
+ const std::time_t now = std::time(nullptr);
+ if (now == static_cast<std::time_t>(-1)) {
+ LOG(WARNING)
+ << "Leak scan failed to get
current time when checking "
+ << file_path_str;
+ } else {
+ const int64_t age_seconds =
+ static_cast<int64_t>(now) -
+
static_cast<int64_t>(st_buf.st_mtime);
+ if (age_seconds < grace_seconds) {
+ VLOG_DEBUG << fmt::format(
+ "Leak scan skipping young
orphan file {} because "
+ "age={}s < grace={}s",
+ file_path_str, age_seconds,
grace_seconds);
+ continue;
+ }
+ }
+ }
+ }
+
+ candidates.emplace_back(file_path_str, hash, offset,
+ key_it->path().string());
+ examined_files++;
+ if (candidates.size() >= batch_size) {
+ remove_candidates();
+ }
+ }
+ }
+ }
+ directory_loop_time_ns += loop_once_ns;
+ }
- LOG(INFO) << "Estimated file count for cache path " << _cache_base_path
- << ": total_size=" << total_size << ", estimated_files=" <<
estimated_file_count;
+ remove_candidates();
+ }
- return estimated_file_count;
+ auto ns_to_ms = [](int64_t ns) { return static_cast<double>(ns) /
1'000'000.0; };
+
+ LOG(INFO) << fmt::format(
+ "file cache leak cleanup finished: examined_files={},
removed_orphans={}, "
+ "wall_time_ms={:.3f}, metadata_hash_time_ms={:.3f},
metadata_index_ms={:.3f}, "
+ "remove_candidates_ms={:.3f}, prefix_loop_ms={:.3f}",
+ examined_files, removed_files, ns_to_ms(cleanup_wall_time_ns),
+ ns_to_ms(metadata_hash_time_ns), ns_to_ms(metadata_index_time_ns),
+ ns_to_ms(remove_candidates_time_ns),
ns_to_ms(directory_loop_time_ns));
+ if (_leak_scan_removed_files) {
+ *_leak_scan_removed_files << removed_files;
+ }
}
} // namespace doris::io
diff --git a/be/src/io/cache/fs_file_cache_storage.h
b/be/src/io/cache/fs_file_cache_storage.h
index d486552d2b6..44b04805419 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -18,10 +18,21 @@
#pragma once
#include <bvar/bvar.h>
-
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <filesystem>
+#include <functional>
#include <memory>
+#include <mutex>
#include <shared_mutex>
#include <thread>
+#include <unordered_set>
+#include <vector>
#include "io/cache/cache_block_meta_store.h"
#include "io/cache/file_cache_common.h"
@@ -58,6 +69,7 @@ public:
/// version 1.0: cache_base_path / key / offset
/// version 2.0: cache_base_path / key_prefix / key / offset
static constexpr int KEY_PREFIX_LENGTH = 3;
+ static constexpr std::string META_DIR_NAME = "meta";
FSFileCacheStorage() = default;
~FSFileCacheStorage() override;
@@ -90,6 +102,36 @@ public:
// Get the meta store instance (only available for DISK storage type)
CacheBlockMetaStore* get_meta_store() { return _meta_store.get(); }
+ struct InodeKey {
+ dev_t device;
+ ino_t inode;
+ bool operator==(const InodeKey& other) const {
+ return device == other.device && inode == other.inode;
+ }
+ };
+ struct InodeKeyHash {
+ size_t operator()(const InodeKey& key) const {
+ return std::hash<uint64_t>()((static_cast<uint64_t>(key.device) <<
32) ^
+ static_cast<uint64_t>(key.inode));
+ }
+ };
+
+#ifdef BE_TEST
+ struct InodeEstimationTestHooks {
+ std::function<int(const std::string&, struct statvfs*)>
statvfs_override;
+ std::function<int(const std::string&, struct stat*)> lstat_override;
+ std::function<size_t(const FSFileCacheStorage&)> non_cache_override;
+ std::function<size_t(const FSFileCacheStorage&)> cache_dir_override;
+ std::function<std::filesystem::path(const FSFileCacheStorage&, dev_t)>
+ find_mount_root_override;
+ std::function<size_t(const FSFileCacheStorage&, const
std::filesystem::path&, dev_t,
+ const std::filesystem::path&,
+ std::unordered_set<InodeKey, InodeKeyHash>&)>
+ count_inodes_override;
+ };
+ static void set_inode_estimation_test_hooks(InodeEstimationTestHooks*
hooks);
+#endif
+
private:
void remove_old_version_directories();
@@ -116,8 +158,23 @@ private:
std::lock_guard<std::mutex>& cache_lock)
const;
private:
- // Helper function to count files in cache directory using statfs
- size_t estimate_file_count_from_statfs() const;
+ // Helper function to count files in cache directory using inode stats
+ size_t estimate_file_count_from_inode() const;
+ size_t estimate_non_cache_inode_usage() const;
+ size_t estimate_cache_directory_inode_usage() const;
+ size_t count_inodes_for_path(const std::filesystem::path& path, dev_t
target_dev,
+ const std::filesystem::path& excluded_root,
+ std::unordered_set<InodeKey, InodeKeyHash>&
visited) const;
+ std::filesystem::path find_mount_root(dev_t cache_dev) const;
+ bool is_cache_prefix_directory(const std::filesystem::directory_entry&
entry) const;
+ size_t snapshot_metadata_block_count(BlockFileCache* mgr) const;
+ std::vector<size_t> snapshot_metadata_for_hash_offsets(BlockFileCache* mgr,
+ const
UInt128Wrapper& hash) const;
+ void start_leak_cleaner(BlockFileCache* mgr);
+ void stop_leak_cleaner();
+ void leak_cleaner_loop();
+ void run_leak_cleanup(BlockFileCache* mgr);
+ void cleanup_leaked_files(BlockFileCache* mgr, size_t
metadata_block_count);
void load_cache_info_into_memory_from_fs(BlockFileCache* _mgr) const;
void load_cache_info_into_memory_from_db(BlockFileCache* _mgr) const;
@@ -125,12 +182,18 @@ private:
std::lock_guard<std::mutex>& cache_lock) const
override;
std::string _cache_base_path;
+ BlockFileCache* _mgr {nullptr};
std::thread _cache_background_load_thread;
+ std::thread _cache_leak_cleaner_thread;
+ std::atomic<bool> _stop_leak_cleaner {false};
+ std::condition_variable _leak_cleaner_cv;
+ std::mutex _leak_cleaner_mutex;
const std::shared_ptr<LocalFileSystem>& fs = global_local_filesystem();
// TODO(Lchangliang): use a more efficient data structure
std::mutex _mtx;
std::unordered_map<FileWriterMapKey, FileWriterPtr, FileWriterMapKeyHash>
_key_to_writer;
std::shared_ptr<bvar::LatencyRecorder> _iterator_dir_retry_cnt;
+ std::shared_ptr<bvar::Adder<size_t>> _leak_scan_removed_files;
std::unique_ptr<CacheBlockMetaStore> _meta_store;
};
diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp
b/be/test/io/cache/block_file_cache_test_meta_store.cpp
index 585c359dd73..261efbc6072 100644
--- a/be/test/io/cache/block_file_cache_test_meta_store.cpp
+++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp
@@ -430,6 +430,44 @@ TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
}
}
+TEST_F(BlockFileCacheTest, version3_write_version_when_cache_dir_empty) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ io::FileCacheSettings settings;
+ settings.storage = "disk";
+ settings.capacity = 10_mb;
+ settings.max_file_block_size = 1_mb;
+ settings.max_query_cache_size = settings.capacity;
+ settings.disposable_queue_size = settings.capacity;
+ settings.disposable_queue_elements = 8;
+ settings.index_queue_size = settings.capacity;
+ settings.index_queue_elements = 8;
+ settings.query_queue_size = settings.capacity;
+ settings.query_queue_elements = 8;
+ settings.ttl_queue_size = settings.capacity;
+ settings.ttl_queue_elements = 8;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+
+ for (int i = 0; i < 100; ++i) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ std::ifstream ifs(cache_base_path + "/version", std::ios::binary);
+ ASSERT_TRUE(ifs.good());
+ char buf[3] = {0};
+ ifs.read(buf, 3);
+ ASSERT_EQ(std::string(buf, static_cast<size_t>(ifs.gcount())), "3.0");
+}
+
TEST_F(BlockFileCacheTest,
clear_retains_meta_directory_and_clears_meta_entries) {
config::enable_evict_file_cache_in_advance = false;
if (fs::exists(cache_base_path)) {
@@ -580,146 +618,6 @@ TEST_F(BlockFileCacheTest,
handle_already_loaded_block_updates_size_and_tablet)
fs::remove_all(cache_base_path);
}
}
-TEST_F(BlockFileCacheTest, estimate_file_count_skips_removed_directory) {
- std::string test_dir = cache_base_path +
"/estimate_file_count_removed_dir";
- if (fs::exists(test_dir)) {
- fs::remove_all(test_dir);
- }
- auto keep_dir = fs::path(test_dir) / "keep";
- auto remove_dir = fs::path(test_dir) / "remove";
- fs::create_directories(keep_dir);
- fs::create_directories(remove_dir);
-
- auto keep_file = keep_dir / "data.bin";
- std::string one_mb(1024 * 1024, 'd');
- {
- std::ofstream ofs(keep_file, std::ios::binary);
- ASSERT_TRUE(ofs.good());
- for (int i = 0; i < 3; ++i) {
- ofs.write(one_mb.data(), one_mb.size());
- ASSERT_TRUE(ofs.good());
- }
- }
-
- FSFileCacheStorage storage;
- storage._cache_base_path = test_dir;
-
- const std::string sync_point_name =
- "FSFileCacheStorage::estimate_file_count_from_statfs::OnDirectory";
- auto* sync_point = doris::SyncPoint::get_instance();
- doris::SyncPoint::CallbackGuard guard(sync_point_name);
- sync_point->set_call_back(
- sync_point_name,
- [remove_dir](std::vector<std::any>&& args) {
- auto* path =
doris::try_any_cast<std::filesystem::path*>(args[0]);
- if (*path == remove_dir) {
- fs::remove_all(remove_dir);
- }
- },
- &guard);
- sync_point->enable_processing();
-
- size_t estimated_files = storage.estimate_file_count_from_statfs();
-
- sync_point->disable_processing();
-
- ASSERT_EQ(3, estimated_files);
- ASSERT_FALSE(fs::exists(remove_dir));
-
- if (fs::exists(test_dir)) {
- fs::remove_all(test_dir);
- }
-}
-
-TEST_F(BlockFileCacheTest, estimate_file_count_handles_stat_failure) {
- std::string test_dir = cache_base_path +
"/estimate_file_count_stat_failure";
- if (fs::exists(test_dir)) {
- fs::remove_all(test_dir);
- }
- fs::create_directories(test_dir);
-
- auto data_file = fs::path(test_dir) / "data.bin";
- std::string one_mb(1024 * 1024, 'x');
- {
- std::ofstream ofs(data_file, std::ios::binary);
- ASSERT_TRUE(ofs.good());
- ofs.write(one_mb.data(), one_mb.size());
- ASSERT_TRUE(ofs.good());
- }
-
- FSFileCacheStorage storage;
- storage._cache_base_path = test_dir;
-
- const std::string sync_point_name =
-
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterEntryStatus";
- auto* sync_point = doris::SyncPoint::get_instance();
- doris::SyncPoint::CallbackGuard guard(sync_point_name);
- sync_point->set_call_back(
- sync_point_name,
- [](std::vector<std::any>&& args) {
- auto* ec = doris::try_any_cast<std::error_code*>(args[0]);
- if (ec != nullptr) {
- *ec = std::make_error_code(std::errc::io_error);
- }
- },
- &guard);
- sync_point->enable_processing();
-
- size_t estimated_files = storage.estimate_file_count_from_statfs();
-
- sync_point->disable_processing();
-
- ASSERT_EQ(0, estimated_files);
-
- if (fs::exists(test_dir)) {
- fs::remove_all(test_dir);
- }
-}
-
-TEST_F(BlockFileCacheTest, estimate_file_count_handles_file_size_failure) {
- std::string test_dir = cache_base_path +
"/estimate_file_count_file_size_failure";
- if (fs::exists(test_dir)) {
- fs::remove_all(test_dir);
- }
- fs::create_directories(test_dir);
-
- auto data_file = fs::path(test_dir) / "data.bin";
- std::string one_mb(1024 * 1024, 'x');
- {
- std::ofstream ofs(data_file, std::ios::binary);
- ASSERT_TRUE(ofs.good());
- ofs.write(one_mb.data(), one_mb.size());
- ASSERT_TRUE(ofs.good());
- }
-
- FSFileCacheStorage storage;
- storage._cache_base_path = test_dir;
-
- const std::string sync_point_name =
-
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterFileSize";
- auto* sync_point = doris::SyncPoint::get_instance();
- doris::SyncPoint::CallbackGuard guard(sync_point_name);
- sync_point->set_call_back(
- sync_point_name,
- [](std::vector<std::any>&& args) {
- auto* ec = doris::try_any_cast<std::error_code*>(args[0]);
- if (ec != nullptr) {
- *ec = std::make_error_code(std::errc::io_error);
- }
- },
- &guard);
- sync_point->enable_processing();
-
- size_t estimated_files = storage.estimate_file_count_from_statfs();
-
- sync_point->disable_processing();
-
- ASSERT_EQ(0, estimated_files);
-
- if (fs::exists(test_dir)) {
- fs::remove_all(test_dir);
- }
-}
//TODO(zhengyu): check lazy load
//TODO(zhengyu): check version2 start
diff --git a/be/test/io/cache/fs_file_cache_storage_leak_cleaner_test.cpp
b/be/test/io/cache/fs_file_cache_storage_leak_cleaner_test.cpp
new file mode 100644
index 00000000000..58ccc623e14
--- /dev/null
+++ b/be/test/io/cache/fs_file_cache_storage_leak_cleaner_test.cpp
@@ -0,0 +1,718 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <atomic>
+#include <cerrno>
+#include <chrono>
+#include <cstring>
+#include <filesystem>
+#include <fstream>
+#include <stdexcept>
+#include <string>
+#include <thread>
+#include <unordered_set>
+
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wkeyword-macro"
+#endif
+#define private public
+#define protected public
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+#include "io/cache/block_file_cache.h"
+#include "io/cache/fs_file_cache_storage.h"
+#undef private
+#undef protected
+
+#include "block_file_cache_test_common.h"
+
+namespace doris::io {
+
+namespace fs = std::filesystem;
+
+class ScopedLeakCleanerConfig {
+public:
+ ScopedLeakCleanerConfig()
+ : ratio(config::file_cache_leak_fs_to_meta_ratio_threshold),
+ interval(config::file_cache_leak_scan_interval_seconds),
+ batch(config::file_cache_leak_scan_batch_files),
+ pause(config::file_cache_leak_scan_pause_ms),
+ grace(config::file_cache_leak_grace_seconds) {
+ config::file_cache_leak_grace_seconds = 0;
+ }
+
+ ~ScopedLeakCleanerConfig() {
+ config::file_cache_leak_fs_to_meta_ratio_threshold = ratio;
+ config::file_cache_leak_scan_interval_seconds = interval;
+ config::file_cache_leak_scan_batch_files = batch;
+ config::file_cache_leak_scan_pause_ms = pause;
+ config::file_cache_leak_grace_seconds = grace;
+ }
+
+private:
+ double ratio;
+ int64_t interval;
+ int32_t batch;
+ int32_t pause;
+ int64_t grace;
+};
+
+class ScopedInodeTestHooks {
+public:
+ ScopedInodeTestHooks() {
FSFileCacheStorage::set_inode_estimation_test_hooks(&hooks); }
+ ~ScopedInodeTestHooks() {
FSFileCacheStorage::set_inode_estimation_test_hooks(nullptr); }
+
+ FSFileCacheStorage::InodeEstimationTestHooks hooks;
+};
+
+class FSFileCacheLeakCleanerTest : public BlockFileCacheTest {
+protected:
+ static FileCacheSettings default_settings() {
+ FileCacheSettings settings;
+ settings.capacity = 10 * 1024 * 1024;
+ settings.max_file_block_size = 1 * 1024 * 1024;
+ settings.max_query_cache_size = settings.capacity;
+ settings.disposable_queue_size = settings.capacity;
+ settings.disposable_queue_elements = 8;
+ settings.index_queue_size = settings.capacity;
+ settings.index_queue_elements = 8;
+ settings.query_queue_size = settings.capacity;
+ settings.query_queue_elements = 8;
+ settings.ttl_queue_size = settings.capacity;
+ settings.ttl_queue_elements = 8;
+ settings.storage = "disk";
+ return settings;
+ }
+
+ fs::path prepare_test_dir(const std::string& name) const {
+ fs::path dir = caches_dir / "leak_cleaner" / name;
+ std::error_code ec;
+ fs::remove_all(dir, ec);
+ fs::create_directories(dir, ec);
+ return dir;
+ }
+
+ static std::string current_test_name() {
+ if (auto* info =
::testing::UnitTest::GetInstance()->current_test_info()) {
+ return std::string(info->name());
+ }
+ return "unknown";
+ }
+
+ fs::path prepare_test_dir() const { return
prepare_test_dir(current_test_name()); }
+
+ void setup_storage(FSFileCacheStorage& storage, BlockFileCache& mgr, const
fs::path& dir) {
+ storage._cache_base_path = dir.string();
+ storage._mgr = &mgr;
+ storage._meta_store =
std::make_unique<CacheBlockMetaStore>(dir.string() + "/meta", 10000);
+ EXPECT_TRUE(storage._meta_store->init().ok());
+ EXPECT_TRUE(storage.write_file_cache_version().ok());
+ }
+
+ static void add_metadata_entry(BlockFileCache& mgr, FSFileCacheStorage&
storage,
+ const UInt128Wrapper& hash, size_t offset) {
+ {
+ std::lock_guard<std::mutex> l(mgr._mutex);
+ mgr._files[hash].try_emplace(offset);
+ }
+ if (storage._meta_store) {
+ BlockMetaKey mkey(0, hash, offset);
+ BlockMeta meta(FileCacheType::NORMAL, 16, 0);
+ storage._meta_store->put(mkey, meta);
+ // Wait for async write to complete for test stability
+ for (int i = 0; i < 100 &&
storage._meta_store->get_write_queue_size() > 0; ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ }
+ }
+ }
+
+ static void create_regular_file(const std::string& path, char fill = 'x') {
+ fs::create_directories(fs::path(path).parent_path());
+ std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
+ ASSERT_TRUE(ofs.good());
+ std::string payload(16, fill);
+ ofs.write(payload.data(), payload.size());
+ ofs.close();
+ ASSERT_TRUE(std::filesystem::exists(path));
+ }
+};
+
+TEST_F(FSFileCacheLeakCleanerTest, disable_when_interval_non_positive) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_scan_interval_seconds = 0;
+ auto dir = prepare_test_dir();
+
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ storage.start_leak_cleaner(&mgr);
+ EXPECT_FALSE(storage._cache_leak_cleaner_thread.joinable());
+ EXPECT_FALSE(storage._stop_leak_cleaner.load(std::memory_order_relaxed));
+
+ storage.stop_leak_cleaner();
+ EXPECT_TRUE(storage._stop_leak_cleaner.load(std::memory_order_relaxed));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, start_and_stop_thread) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+ config::file_cache_leak_fs_to_meta_ratio_threshold = 1e12;
+ config::file_cache_leak_scan_batch_files = 4;
+ config::file_cache_leak_scan_pause_ms = 0;
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ add_metadata_entry(mgr, storage, BlockFileCache::hash("thread_guard"), 0);
+
+ storage.start_leak_cleaner(&mgr);
+ ASSERT_TRUE(storage._cache_leak_cleaner_thread.joinable());
+
+ storage.stop_leak_cleaner();
+ EXPECT_TRUE(storage._stop_leak_cleaner.load(std::memory_order_relaxed));
+ EXPECT_FALSE(storage._cache_leak_cleaner_thread.joinable());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, skip_cleanup_when_ratio_below_threshold) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_fs_to_meta_ratio_threshold = 1e12;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ ScopedInodeTestHooks hooks_guard;
+ std::atomic<int> statvfs_calls {0};
+ hooks_guard.hooks.statvfs_override = [&statvfs_calls](const std::string&,
struct statvfs* vfs) {
+ statvfs_calls.fetch_add(1, std::memory_order_relaxed);
+ *vfs = {};
+ vfs->f_files = 100;
+ vfs->f_ffree = 36;
+ return 0;
+ };
+ hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st)
{
+ *st = {};
+ st->st_dev = 1;
+ return 0;
+ };
+ hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) {
return 0; };
+ hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) {
return 0; };
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ const auto metadata_hash = BlockFileCache::hash("metadata_key");
+ for (size_t i = 0; i < 64; ++i) {
+ add_metadata_entry(mgr, storage, metadata_hash, i);
+ }
+
+ const auto orphan_hash = BlockFileCache::hash("ratio_skip_orphan");
+ const auto orphan_dir = storage.get_path_in_local_cache_v3(orphan_hash);
+
create_regular_file(FSFileCacheStorage::get_path_in_local_cache_v3(orphan_dir,
0, false));
+
+ storage.run_leak_cleanup(&mgr);
+ EXPECT_TRUE(std::filesystem::exists(
+ FSFileCacheStorage::get_path_in_local_cache_v3(orphan_dir, 0,
false)));
+ EXPECT_EQ(1, statvfs_calls.load(std::memory_order_relaxed));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, remove_orphan_and_tmp_files) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_scan_batch_files = 1;
+ config::file_cache_leak_scan_pause_ms = 0;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ ScopedInodeTestHooks hooks_guard;
+ hooks_guard.hooks.statvfs_override = [](const std::string&, struct
statvfs* vfs) {
+ *vfs = {};
+ vfs->f_files = 100;
+ vfs->f_ffree = 10; // 90 used
+ return 0;
+ };
+ hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) {
return 0; };
+ hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) {
return 0; };
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ auto kept_hash = BlockFileCache::hash("kept_hash");
+ add_metadata_entry(mgr, storage, kept_hash, 0);
+
+ auto kept_dir = storage.get_path_in_local_cache_v3(kept_hash);
+ auto kept_file = FSFileCacheStorage::get_path_in_local_cache_v3(kept_dir,
0, false);
+ auto tmp_file = FSFileCacheStorage::get_path_in_local_cache_v3(kept_dir,
8, true);
+ create_regular_file(kept_file, 'k');
+ create_regular_file(tmp_file, 't');
+
+ auto orphan_hash = BlockFileCache::hash("orphan_hash");
+ auto orphan_dir = storage.get_path_in_local_cache_v3(orphan_hash);
+ auto orphan_file =
FSFileCacheStorage::get_path_in_local_cache_v3(orphan_dir, 4, false);
+ create_regular_file(orphan_file, 'o');
+
+ storage.run_leak_cleanup(&mgr);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(5000));
+
+ EXPECT_TRUE(std::filesystem::exists(kept_file));
+ EXPECT_FALSE(std::filesystem::exists(tmp_file));
+ EXPECT_FALSE(std::filesystem::exists(orphan_file));
+ EXPECT_FALSE(std::filesystem::exists(orphan_dir));
+
+ std::error_code ec;
+ fs::remove_all(dir, ec);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
snapshot_metadata_for_hash_offsets_handles_missing_hash) {
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ auto missing_hash = BlockFileCache::hash("missing_hash_case");
+ auto offsets = storage.snapshot_metadata_for_hash_offsets(&mgr,
missing_hash);
+ EXPECT_TRUE(offsets.empty());
+
+ add_metadata_entry(mgr, storage, missing_hash, 7);
+ add_metadata_entry(mgr, storage, missing_hash, 3);
+
+ offsets = storage.snapshot_metadata_for_hash_offsets(&mgr, missing_hash);
+ std::sort(offsets.begin(), offsets.end());
+ ASSERT_EQ(2, offsets.size());
+ EXPECT_EQ(3u, offsets[0]);
+ EXPECT_EQ(7u, offsets[1]);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, leak_cleaner_loop_catches_std_exception) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ std::atomic<int> callback_count {0};
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::initial_delay",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0;
});
+ sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::interval",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0;
});
+ sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::before_run",
+ [&storage, &callback_count](auto&&) {
+ callback_count.fetch_add(1,
std::memory_order_relaxed);
+ storage._stop_leak_cleaner.store(true,
std::memory_order_relaxed);
+ storage._leak_cleaner_cv.notify_all();
+ throw std::runtime_error("injected std exception");
+ });
+ sp->enable_processing();
+
+ storage._stop_leak_cleaner.store(false, std::memory_order_relaxed);
+ std::thread worker([&]() { storage.leak_cleaner_loop(); });
+
+ for (int i = 0; i < 100 && callback_count.load(std::memory_order_relaxed)
== 0; ++i) {
+ storage._leak_cleaner_cv.notify_all();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ storage._stop_leak_cleaner.store(true, std::memory_order_relaxed);
+ storage._leak_cleaner_cv.notify_all();
+ worker.join();
+
+ sp->disable_processing();
+ sp->clear_all_call_backs();
+
+ ASSERT_GE(callback_count.load(std::memory_order_relaxed), 1);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
leak_cleaner_loop_catches_unknown_exception) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ struct NonStdException {};
+
+ std::atomic<int> callback_count {0};
+ auto sp = SyncPoint::get_instance();
+ sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::initial_delay",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0;
});
+ sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::interval",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0;
});
+ sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::before_run",
+ [&storage, &callback_count](auto&&) {
+ callback_count.fetch_add(1,
std::memory_order_relaxed);
+ storage._stop_leak_cleaner.store(true,
std::memory_order_relaxed);
+ storage._leak_cleaner_cv.notify_all();
+ throw NonStdException {};
+ });
+ sp->enable_processing();
+
+ storage._stop_leak_cleaner.store(false, std::memory_order_relaxed);
+ std::thread worker([&]() { storage.leak_cleaner_loop(); });
+
+ for (int i = 0; i < 100 && callback_count.load(std::memory_order_relaxed)
== 0; ++i) {
+ storage._leak_cleaner_cv.notify_all();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ storage._stop_leak_cleaner.store(true, std::memory_order_relaxed);
+ storage._leak_cleaner_cv.notify_all();
+ worker.join();
+
+ sp->disable_processing();
+ sp->clear_all_call_backs();
+
+ ASSERT_GE(callback_count.load(std::memory_order_relaxed), 1);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
run_leak_cleanup_removes_orphan_when_metadata_missing) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_fs_to_meta_ratio_threshold = 0.5;
+ config::file_cache_leak_grace_seconds = 0;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ ScopedInodeTestHooks hooks_guard;
+ hooks_guard.hooks.statvfs_override = [](const std::string&, struct
statvfs* vfs) {
+ *vfs = {};
+ vfs->f_files = 1000;
+ vfs->f_ffree = 900; // 100 used
+ return 0;
+ };
+ hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) {
return 0; };
+ hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) {
return 0; };
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ auto hash = BlockFileCache::hash("zero_meta_orphan");
+ auto key_dir = storage.get_path_in_local_cache_v3(hash);
+ fs::create_directories(key_dir);
+ auto orphan_path = FSFileCacheStorage::get_path_in_local_cache_v3(key_dir,
0, false);
+ create_regular_file(orphan_path, 'z');
+
+ auto dummy_hash = BlockFileCache::hash("dummy");
+ add_metadata_entry(mgr, storage, dummy_hash, 0);
+
+ storage.run_leak_cleanup(&mgr);
+
+ auto prefix_dir = fs::path(key_dir).parent_path();
+ EXPECT_FALSE(fs::exists(orphan_path));
+ EXPECT_FALSE(fs::exists(key_dir));
+ EXPECT_FALSE(fs::exists(prefix_dir));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, cleanup_handles_missing_base_directory) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir / "missing_root");
+ fs::path missing_path(storage._cache_base_path);
+ if (fs::exists(missing_path)) {
+ fs::remove_all(missing_path);
+ }
+
+ storage.cleanup_leaked_files(&mgr, 0);
+ EXPECT_FALSE(fs::exists(missing_path));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, cleanup_skips_invalid_prefixes_and_keys) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_grace_seconds = 0;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ fs::create_directories(dir);
+ create_regular_file((dir / "root_file").string()); //
non-directory prefix entry
+ fs::create_directories(dir / FSFileCacheStorage::META_DIR_NAME); // meta
dir skip
+ fs::create_directories(dir / "abcd"); //
invalid prefix length
+
+ auto prefix_dir = dir / "abc";
+ fs::create_directories(prefix_dir);
+ create_regular_file((prefix_dir / "plain_file").string()); //
!key_it->is_directory branch
+ fs::create_directories(prefix_dir / "deadbeef" /* missing '_' */);
+ fs::create_directories(prefix_dir / "zzzg000_0" /* invalid hex */);
+ fs::create_directories(prefix_dir / "123abc_bad" /* invalid expiration */);
+
+ storage.cleanup_leaked_files(&mgr, 0);
+
+ EXPECT_TRUE(fs::exists(prefix_dir));
+ EXPECT_TRUE(fs::exists(dir / "abcd"));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, cleanup_flush_candidates_when_empty) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_grace_seconds = 0;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ auto hash = BlockFileCache::hash("metadata_kept_hash");
+ add_metadata_entry(mgr, storage, hash, 0);
+
+ auto key_dir = storage.get_path_in_local_cache_v3(hash);
+ fs::create_directories(key_dir);
+ auto file_path = FSFileCacheStorage::get_path_in_local_cache_v3(key_dir,
0, false);
+ create_regular_file(file_path, 'm');
+
+ storage.cleanup_leaked_files(&mgr, 1);
+
+ EXPECT_TRUE(fs::exists(file_path));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
cleanup_flush_candidates_remove_directories) {
+ ScopedLeakCleanerConfig guard;
+ config::file_cache_leak_grace_seconds = 0;
+ config::file_cache_leak_scan_batch_files = 2;
+ config::file_cache_leak_scan_interval_seconds = 1;
+
+ auto dir = prepare_test_dir();
+ FileCacheSettings settings = default_settings();
+ BlockFileCache mgr(dir.string(), settings);
+
+ FSFileCacheStorage storage;
+ setup_storage(storage, mgr, dir);
+
+ auto hash = BlockFileCache::hash("cleanup_orphan_batch");
+ auto key_dir = storage.get_path_in_local_cache_v3(hash);
+ fs::create_directories(key_dir);
+ auto orphan_path = FSFileCacheStorage::get_path_in_local_cache_v3(key_dir,
4, false);
+ create_regular_file(orphan_path, 'c');
+
+ storage.cleanup_leaked_files(&mgr, 0);
+
+ auto prefix_dir = fs::path(key_dir).parent_path();
+ EXPECT_FALSE(fs::exists(orphan_path));
+ EXPECT_FALSE(fs::exists(key_dir));
+ EXPECT_FALSE(fs::exists(prefix_dir));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
estimate_file_count_handles_statvfs_failure) {
+ ScopedInodeTestHooks hooks_guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+ hooks_guard.hooks.statvfs_override = [](const std::string&, struct
statvfs* vfs) {
+ *vfs = {};
+ errno = EIO;
+ return -1;
+ };
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = "/tmp/nonexistent_statvfs";
+ EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
estimate_file_count_handles_zero_total_inodes) {
+ ScopedInodeTestHooks hooks_guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+ hooks_guard.hooks.statvfs_override = [](const std::string&, struct
statvfs* vfs) {
+ *vfs = {};
+ vfs->f_files = 0;
+ return 0;
+ };
+ hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st)
{
+ *st = {};
+ st->st_dev = 1;
+ return 0;
+ };
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = "/tmp/cache_zero_inodes";
+ EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, estimate_file_count_handles_lstat_failure) {
+ ScopedInodeTestHooks hooks_guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+ hooks_guard.hooks.statvfs_override = [](const std::string&, struct
statvfs* vfs) {
+ *vfs = {};
+ vfs->f_files = 100;
+ vfs->f_ffree = 10;
+ return 0;
+ };
+ hooks_guard.hooks.lstat_override = [](const std::string&, struct stat*) {
+ errno = ENOENT;
+ return -1;
+ };
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = "/tmp/cache_lstat_failure";
+ EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, estimate_file_count_handles_underflow) {
+ ScopedInodeTestHooks hooks_guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+ hooks_guard.hooks.statvfs_override = [](const std::string&, struct
statvfs* vfs) {
+ *vfs = {};
+ vfs->f_files = 200;
+ vfs->f_ffree = 150;
+ return 0;
+ };
+ hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st)
{
+ *st = {};
+ st->st_dev = 9;
+ return 0;
+ };
+ hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) {
return 80; };
+ hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) {
return 90; };
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = "/tmp/cache_underflow";
+ EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, estimate_file_count_combines_counts) {
+ ScopedInodeTestHooks hooks_guard;
+ config::file_cache_leak_scan_interval_seconds = 1;
+ hooks_guard.hooks.statvfs_override = [](const std::string&, struct
statvfs* vfs) {
+ *vfs = {};
+ vfs->f_files = 500;
+ vfs->f_ffree = 200;
+ return 0;
+ };
+ hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st)
{
+ *st = {};
+ st->st_dev = 7;
+ return 0;
+ };
+ hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) {
return 50; };
+ hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) {
return 30; };
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = "/tmp/cache_estimation";
+ EXPECT_EQ(220u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
estimate_non_cache_inode_usage_counts_other_paths) {
+ auto root = prepare_test_dir("inode_non_cache_root");
+ auto cache_dir = root / "cache";
+ auto other_dir = root / "others";
+ auto nested_dir = other_dir / "nested";
+ fs::create_directories(cache_dir);
+ fs::create_directories(nested_dir);
+ create_regular_file((root / "root_file.bin").string());
+ create_regular_file((other_dir / "leaf.txt").string());
+ create_regular_file((nested_dir / "inner.bin").string());
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = cache_dir.string();
+
+ ScopedInodeTestHooks hooks_guard;
+ hooks_guard.hooks.find_mount_root_override = [root](const
FSFileCacheStorage&, dev_t) {
+ return root;
+ };
+
+ EXPECT_EQ(6u, storage.estimate_non_cache_inode_usage());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest,
estimate_cache_directory_inode_usage_samples_prefixes) {
+ auto base = prepare_test_dir("inode_cache_directory");
+ auto prefix_a = base / "abc";
+ auto prefix_b = base / "def";
+ fs::create_directories(prefix_a);
+ fs::create_directories(prefix_b);
+ fs::create_directories(base / FSFileCacheStorage::META_DIR_NAME);
+ fs::create_directories(base / "abcd");
+ fs::create_directories(prefix_a / "deadbeef_0");
+ fs::create_directories(prefix_b / "feed000_0");
+ fs::create_directories(prefix_b / "feed000_1");
+ fs::create_directories(prefix_b / "feed000_2");
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = base.string();
+
+ EXPECT_EQ(6u, storage.estimate_cache_directory_inode_usage());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, count_inodes_for_path_respects_exclusions) {
+ auto base = prepare_test_dir("inode_counting");
+ auto include_dir = base / "include";
+ auto exclude_dir = base / "exclude";
+ fs::create_directories(include_dir);
+ fs::create_directories(exclude_dir);
+ create_regular_file((base / "root.bin").string());
+ create_regular_file((include_dir / "child.bin").string());
+ create_regular_file((exclude_dir / "skip.bin").string());
+
+ FSFileCacheStorage storage;
+ struct stat st {};
+ ASSERT_EQ(0, lstat(base.c_str(), &st));
+ std::unordered_set<FSFileCacheStorage::InodeKey,
FSFileCacheStorage::InodeKeyHash> visited;
+ size_t count = storage.count_inodes_for_path(base, st.st_dev, exclude_dir,
visited);
+ EXPECT_EQ(4u, count);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, is_cache_prefix_directory_filters_entries) {
+ auto base = prepare_test_dir("inode_prefix_filter");
+ auto valid = base / "abc";
+ auto invalid = base / "abcd";
+ auto meta_dir = base / FSFileCacheStorage::META_DIR_NAME;
+ fs::create_directories(valid);
+ fs::create_directories(invalid);
+ fs::create_directories(meta_dir);
+ create_regular_file((base / "plain_file").string());
+
+ FSFileCacheStorage storage;
+ storage._cache_base_path = base.string();
+
+ EXPECT_TRUE(storage.is_cache_prefix_directory(fs::directory_entry(valid)));
+
EXPECT_FALSE(storage.is_cache_prefix_directory(fs::directory_entry(invalid)));
+
EXPECT_FALSE(storage.is_cache_prefix_directory(fs::directory_entry(meta_dir)));
+ EXPECT_FALSE(storage.is_cache_prefix_directory(fs::directory_entry(base /
"plain_file")));
+}
+
+} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]