This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e010d8ddbca [enhancement](filecache) add filesystem leak cleaner 
(#59269)
e010d8ddbca is described below

commit e010d8ddbcab2f66512742151cd1d210eb56c009
Author: zhengyu <[email protected]>
AuthorDate: Fri Feb 6 11:47:42 2026 +0800

    [enhancement](filecache) add filesystem leak cleaner (#59269)
    
    cache directory could be inconsistence with filecache meta store
    somehow, so extend FSFileCacheStorage with inode-based stats,
    leak-cleaner thread, and orphan cleanup helpers to clean such leakage.
---
 be/src/common/config.cpp                           |   5 +
 be/src/common/config.h                             |   5 +
 be/src/io/cache/cache_block_meta_store.cpp         |  27 +
 be/src/io/cache/cache_block_meta_store.h           |   3 +
 be/src/io/cache/fs_file_cache_storage.cpp          | 771 +++++++++++++++++++--
 be/src/io/cache/fs_file_cache_storage.h            |  69 +-
 .../io/cache/block_file_cache_test_meta_store.cpp  | 178 +----
 .../fs_file_cache_storage_leak_cleaner_test.cpp    | 718 +++++++++++++++++++
 8 files changed, 1556 insertions(+), 220 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1648c502f09..051e431c4f0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1201,6 +1201,11 @@ 
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
 // if difference below this threshold, we consider cache's progressive 
upgrading (2.0->3.0) successful
 DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3");
 DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");
+DEFINE_mDouble(file_cache_leak_fs_to_meta_ratio_threshold, "1.3");
+DEFINE_mInt64(file_cache_leak_scan_interval_seconds, "86400");
+DEFINE_mInt32(file_cache_leak_scan_batch_files, "2048");
+DEFINE_mInt32(file_cache_leak_scan_pause_ms, "500");
+DEFINE_mInt64(file_cache_leak_grace_seconds, "3600");
 
 DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4e1ce9c96fc..5dec9c3f765 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1230,6 +1230,11 @@ DECLARE_mBool(enable_file_cache_adaptive_write);
 DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
 DECLARE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold);
 DECLARE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio);
+DECLARE_mDouble(file_cache_leak_fs_to_meta_ratio_threshold);
+DECLARE_mInt64(file_cache_leak_scan_interval_seconds);
+DECLARE_mInt32(file_cache_leak_scan_batch_files);
+DECLARE_mInt32(file_cache_leak_scan_pause_ms);
+DECLARE_mInt64(file_cache_leak_grace_seconds);
 DECLARE_mInt64(file_cache_remove_block_qps_limit);
 DECLARE_mInt64(file_cache_background_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
diff --git a/be/src/io/cache/cache_block_meta_store.cpp 
b/be/src/io/cache/cache_block_meta_store.cpp
index 472886152c7..c42dd3f8003 100644
--- a/be/src/io/cache/cache_block_meta_store.cpp
+++ b/be/src/io/cache/cache_block_meta_store.cpp
@@ -332,6 +332,33 @@ std::unique_ptr<BlockMetaIterator> 
CacheBlockMetaStore::get_all() {
     return std::unique_ptr<BlockMetaIterator>(new RocksDBIterator(iter));
 }
 
+size_t CacheBlockMetaStore::approximate_entry_count() const {
+    if (!_db) {
+        LOG(WARNING) << "Database not initialized when counting entries";
+        return 0;
+    }
+
+    rocksdb::ReadOptions read_options;
+    std::unique_ptr<rocksdb::Iterator> iter(
+            _db->NewIterator(read_options, _file_cache_meta_cf_handle.get()));
+    if (!iter) {
+        LOG(WARNING) << "Failed to create iterator when counting entries";
+        return 0;
+    }
+
+    size_t count = 0;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+        ++count;
+    }
+
+    if (!iter->status().ok()) {
+        LOG(WARNING) << "Iterator encountered error when counting entries: "
+                     << iter->status().ToString();
+    }
+
+    return count;
+}
+
 void CacheBlockMetaStore::delete_key(const BlockMetaKey& key) {
     std::string key_str = serialize_key(key);
 
diff --git a/be/src/io/cache/cache_block_meta_store.h 
b/be/src/io/cache/cache_block_meta_store.h
index bd3c6501c14..1a6659596e1 100644
--- a/be/src/io/cache/cache_block_meta_store.h
+++ b/be/src/io/cache/cache_block_meta_store.h
@@ -111,6 +111,9 @@ public:
     // Get the approximate size of the write queue
     size_t get_write_queue_size() const;
 
+    // Count entries stored in rocksdb (ignoring pending writes)
+    size_t approximate_entry_count() const;
+
 private:
     void async_write_worker();
 
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp 
b/be/src/io/cache/fs_file_cache_storage.cpp
index 53e4f0c4dd9..fe06505df65 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -21,13 +21,27 @@
 #include <rapidjson/document.h>
 #include <rapidjson/stringbuffer.h>
 #include <rapidjson/writer.h>
+#include <sys/stat.h>
 #include <sys/statvfs.h>
-
+#include <unistd.h>
+
+#include <algorithm>
+#include <atomic>
+#include <cctype>
+#include <chrono>
+#include <cmath>
+#include <condition_variable>
+#include <ctime>
 #include <filesystem>
+#include <limits>
 #include <mutex>
+#include <random>
 #include <system_error>
+#include <thread>
+#include <unordered_set>
 #include <vector>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "cpp/sync_point.h"
@@ -46,6 +60,21 @@
 
 namespace doris::io {
 
+#ifdef BE_TEST
+namespace {
+FSFileCacheStorage::InodeEstimationTestHooks* g_inode_estimation_hooks = 
nullptr;
+
+FSFileCacheStorage::InodeEstimationTestHooks* inode_test_hooks() {
+    return g_inode_estimation_hooks;
+}
+} // namespace
+
+void FSFileCacheStorage::set_inode_estimation_test_hooks(
+        FSFileCacheStorage::InodeEstimationTestHooks* hooks) {
+    g_inode_estimation_hooks = hooks;
+}
+#endif
+
 struct BatchLoadArgs {
     UInt128Wrapper hash;
     CacheContext ctx;
@@ -111,12 +140,16 @@ size_t FDCache::file_reader_cache_size() {
     return _file_reader_list.size();
 }
 
-Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
+Status FSFileCacheStorage::init(BlockFileCache* mgr) {
+    const char* metrics_prefix = mgr->_cache_base_path.c_str();
     _iterator_dir_retry_cnt = std::make_shared<bvar::LatencyRecorder>(
-            _cache_base_path.c_str(), 
"file_cache_fs_storage_iterator_dir_retry_cnt");
-    _cache_base_path = _mgr->_cache_base_path;
+            metrics_prefix, "file_cache_fs_storage_iterator_dir_retry_cnt");
+    _leak_scan_removed_files = std::make_shared<bvar::Adder<size_t>>(
+            metrics_prefix, "file_cache_leak_removed_files_cnt");
+    _cache_base_path = mgr->_cache_base_path;
+    _mgr = mgr;
     _meta_store = std::make_unique<CacheBlockMetaStore>(_cache_base_path + 
"/meta", 10000);
-    _cache_background_load_thread = std::thread([this, mgr = _mgr]() {
+    _cache_background_load_thread = std::thread([this, mgr]() {
         try {
             auto mem_tracker = MemTrackerLimiter::create_shared(
                     MemTrackerLimiter::Type::OTHER, 
fmt::format("FileCacheVersionReader"));
@@ -142,6 +175,7 @@ Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
             load_cache_info_into_memory(mgr);
             mgr->_async_open_done = true;
             LOG_INFO("file cache {} lazy load done.", _cache_base_path);
+            start_leak_cleaner(mgr);
         } catch (const std::exception& e) {
             LOG(ERROR) << "Background cache loading thread failed with 
exception: " << e.what();
         } catch (...) {
@@ -608,12 +642,12 @@ bool FSFileCacheStorage::handle_already_loaded_block(
     return true;
 }
 
-void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* 
_mgr) const {
+void FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* 
mgr) const {
     int scan_length = 10000;
     std::vector<BatchLoadArgs> batch_load_buffer;
     batch_load_buffer.reserve(scan_length);
     auto add_cell_batch_func = [&]() {
-        SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
+        SCOPED_CACHE_LOCK(mgr->_mutex, mgr);
 
         auto f = [&](const BatchLoadArgs& args) {
             // in async load mode, a cell may be added twice.
@@ -623,8 +657,8 @@ void 
FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
             }
             // if the file is tmp, it means it is the old file and it should 
be removed
             if (!args.is_tmp) {
-                _mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
-                               FileBlock::State::DOWNLOADED, cache_lock);
+                mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
+                              FileBlock::State::DOWNLOADED, cache_lock);
                 return;
             }
             std::error_code ec;
@@ -642,7 +676,9 @@ void 
FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
         for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
             auto key_with_suffix = key_it->path().filename().native();
             auto delim_pos = key_with_suffix.find('_');
-            DCHECK(delim_pos != std::string::npos);
+            if (delim_pos == std::string::npos || delim_pos != 
sizeof(uint128_t) * 2) {
+                continue;
+            }
             std::string key_str = key_with_suffix.substr(0, delim_pos);
             std::string expiration_time_str = key_with_suffix.substr(delim_pos 
+ 1);
             auto hash = 
UInt128Wrapper(vectorized::unhex_uint<uint128_t>(key_str.c_str()));
@@ -699,7 +735,7 @@ void 
FSFileCacheStorage::load_cache_info_into_memory_from_fs(BlockFileCache* _mg
             // skip version file
             continue;
         }
-        if (key_prefix_it->path().filename().native() == "meta") {
+        if (key_prefix_it->path().filename().native() == META_DIR_NAME) {
             // skip rocksdb dir
             continue;
         }
@@ -756,7 +792,9 @@ Status 
FSFileCacheStorage::get_file_cache_infos(std::vector<FileCacheInfo>& info
         for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
             auto key_with_suffix = key_it->path().filename().native();
             auto delim_pos = key_with_suffix.find('_');
-            DCHECK(delim_pos != std::string::npos);
+            if (delim_pos == std::string::npos || delim_pos != 
sizeof(uint128_t) * 2) {
+                continue;
+            }
             std::string key_str = key_with_suffix.substr(0, delim_pos);
             std::string expiration_time_str = key_with_suffix.substr(delim_pos 
+ 1);
             long expiration_time = std::stoul(expiration_time_str);
@@ -789,12 +827,13 @@ Status 
FSFileCacheStorage::get_file_cache_infos(std::vector<FileCacheInfo>& info
     return Status::OK();
 }
 
-void FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* 
_mgr) const {
+void FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* 
mgr) const {
+    TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile1");
     int scan_length = 10000;
     std::vector<BatchLoadArgs> batch_load_buffer;
     batch_load_buffer.reserve(scan_length);
     auto add_cell_batch_func = [&]() {
-        SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
+        SCOPED_CACHE_LOCK(mgr->_mutex, mgr);
 
         auto f = [&](const BatchLoadArgs& args) {
             // in async load mode, a cell may be added twice.
@@ -802,8 +841,8 @@ void 
FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg
                                             args.ctx.tablet_id, cache_lock)) {
                 return;
             }
-            _mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
-                           FileBlock::State::DOWNLOADED, cache_lock);
+            mgr->add_cell(args.hash, args.ctx, args.offset, args.size, 
FileBlock::State::DOWNLOADED,
+                          cache_lock);
             return;
         };
         std::for_each(batch_load_buffer.begin(), batch_load_buffer.end(), f);
@@ -868,9 +907,9 @@ void 
FSFileCacheStorage::load_cache_info_into_memory_from_db(BlockFileCache* _mg
     TEST_SYNC_POINT_CALLBACK("BlockFileCache::TmpFile2");
 }
 
-void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) 
const {
+void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* mgr) 
const {
     // First load from database
-    load_cache_info_into_memory_from_db(_mgr);
+    load_cache_info_into_memory_from_db(mgr);
 
     std::string version;
     auto st = read_file_cache_version(&version);
@@ -882,17 +921,45 @@ void 
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
         return;
     }
 
+    // If cache directory is effectively empty (no cache data entries), write 
version hint and
+    // return directly.
+    auto is_cache_base_path_empty = [&]() -> bool {
+        std::error_code ec;
+        std::filesystem::directory_iterator it {_cache_base_path, ec};
+        if (ec) {
+            LOG(WARNING) << "Failed to list cache directory: " << 
_cache_base_path
+                         << ", error: " << ec.message();
+            return false;
+        }
+
+        for (; it != std::filesystem::directory_iterator(); ++it) {
+            auto name = it->path().filename().native();
+            if (name == META_DIR_NAME || name == "version") {
+                continue;
+            }
+            return false;
+        }
+        return true;
+    };
+
+    if (is_cache_base_path_empty()) {
+        if (st = write_file_cache_version(); !st.ok()) {
+            LOG(WARNING) << "Failed to write version hints for file cache, 
err=" << st.to_string();
+        }
+        return;
+    }
+
     // Count blocks loaded from database
     size_t db_block_count = 0;
     {
-        std::lock_guard<std::mutex> lock(_mgr->_mutex);
-        for (const auto& hash_entry : _mgr->_files) {
+        std::lock_guard<std::mutex> lock(mgr->_mutex);
+        for (const auto& hash_entry : mgr->_files) {
             db_block_count += hash_entry.second.size();
         }
     }
 
     // Estimate file count from filesystem using statfs
-    size_t estimated_file_count = estimate_file_count_from_statfs();
+    size_t estimated_file_count = estimate_file_count_from_inode();
 
     LOG(INFO) << "Cache loading statistics - DB blocks: " << db_block_count
               << ", Estimated FS files: " << estimated_file_count;
@@ -965,7 +1032,7 @@ Status FSFileCacheStorage::clear(std::string& msg) {
     auto t0 = std::chrono::steady_clock::now();
     for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
         if (!key_it->is_directory()) continue; // all file cache data is in 
sub-directories
-        if (key_it->path().filename().native() == "meta") continue;
+        if (key_it->path().filename().native() == META_DIR_NAME) continue;
         ++total;
         std::string cache_key = key_it->path().string();
         auto st = global_local_filesystem()->delete_directory(cache_key);
@@ -996,82 +1063,632 @@ FSFileCacheStorage::~FSFileCacheStorage() {
     if (_cache_background_load_thread.joinable()) {
         _cache_background_load_thread.join();
     }
+    stop_leak_cleaner();
 }
 
-size_t FSFileCacheStorage::estimate_file_count_from_statfs() const {
-    struct statvfs vfs;
-    if (statvfs(_cache_base_path.c_str(), &vfs) != 0) {
-        LOG(WARNING) << "Failed to get filesystem statistics for path: " << 
_cache_base_path
-                     << ", error: " << strerror(errno);
+size_t FSFileCacheStorage::estimate_file_count_from_inode() const {
+    int64_t duration_ns = 0;
+    size_t cache_files = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        do {
+            struct statvfs vfs {};
+            int statvfs_res = 0;
+#ifdef BE_TEST
+            if (auto* hooks = inode_test_hooks(); hooks && 
hooks->statvfs_override) {
+                statvfs_res = hooks->statvfs_override(_cache_base_path, &vfs);
+            } else
+#endif
+            {
+                statvfs_res = statvfs(_cache_base_path.c_str(), &vfs);
+            }
+            if (statvfs_res != 0) {
+                LOG(WARNING) << "Failed to get filesystem statistics for path: 
" << _cache_base_path
+                             << ", error: " << strerror(errno);
+                break;
+            }
+
+            if (vfs.f_files == 0) {
+                LOG(WARNING) << "Filesystem returned zero total inodes for 
path "
+                             << _cache_base_path;
+                break;
+            }
+
+            struct stat cache_stat {};
+            int lstat_res = 0;
+#ifdef BE_TEST
+            if (auto* hooks = inode_test_hooks(); hooks && 
hooks->lstat_override) {
+                lstat_res = hooks->lstat_override(_cache_base_path, 
&cache_stat);
+            } else
+#endif
+            {
+                lstat_res = lstat(_cache_base_path.c_str(), &cache_stat);
+            }
+            if (lstat_res != 0) {
+                LOG(WARNING) << "Failed to stat cache base path " << 
_cache_base_path << ": "
+                             << strerror(errno);
+                break;
+            }
+
+            size_t total_inodes_used = vfs.f_files - vfs.f_ffree;
+            size_t non_cache_inodes = estimate_non_cache_inode_usage();
+            size_t directory_inodes = estimate_cache_directory_inode_usage();
+
+            if (total_inodes_used > non_cache_inodes + directory_inodes) {
+                cache_files = total_inodes_used - non_cache_inodes - 
directory_inodes;
+            } else {
+                LOG(WARNING) << fmt::format(
+                        "Inode subtraction underflow: total={} non_cache={} 
directory={}",
+                        total_inodes_used, non_cache_inodes, directory_inodes);
+            }
+
+            LOG(INFO) << fmt::format(
+                    "Cache inode estimation: total_used={}, non_cache={}, 
directories≈{}, files≈{}",
+                    total_inodes_used, non_cache_inodes, directory_inodes, 
cache_files);
+        } while (false);
+    }
+    const double duration_ms = static_cast<double>(duration_ns) / 1'000'000.0;
+    LOG(INFO) << fmt::format("estimate_file_count_from_inode 
duration_ms={:.3f}, files={}",
+                             duration_ms, cache_files);
+    return cache_files;
+}
+
+size_t FSFileCacheStorage::count_inodes_for_path(
+        const std::filesystem::path& path, dev_t target_dev,
+        const std::filesystem::path& excluded_root,
+        std::unordered_set<InodeKey, InodeKeyHash>& visited) const {
+#ifdef BE_TEST
+    if (auto* hooks = inode_test_hooks(); hooks && 
hooks->count_inodes_override) {
+        return hooks->count_inodes_override(*this, path, target_dev, 
excluded_root, visited);
+    }
+#endif
+    if (!excluded_root.empty()) {
+        std::error_code eq_ec;
+        bool is_excluded = std::filesystem::equivalent(path, excluded_root, 
eq_ec);
+        if (eq_ec) {
+            LOG(WARNING) << "Failed to compare " << path << " with " << 
excluded_root << ": "
+                         << eq_ec.message();
+        } else if (is_excluded) {
+            return 0;
+        }
+    }
+
+    struct stat st {};
+    if (lstat(path.c_str(), &st) != 0) {
+        LOG(WARNING) << "Failed to stat path " << path << ": " << 
strerror(errno);
+        return 0;
+    }
+    if (st.st_dev != target_dev) {
+        return 0;
+    }
+    InodeKey key {st.st_dev, st.st_ino};
+    if (!visited.insert(key).second) {
         return 0;
     }
 
-    // Get total size of cache directory to estimate file count
+    size_t count = 1;
+    if (S_ISDIR(st.st_mode)) {
+        std::error_code ec;
+        for (std::filesystem::directory_iterator it {path, ec};
+             !ec && it != std::filesystem::directory_iterator(); ++it) {
+            count += count_inodes_for_path(it->path(), target_dev, 
excluded_root, visited);
+        }
+        if (ec) {
+            LOG(WARNING) << "Failed to iterate directory " << path << ": " << 
ec.message();
+        }
+    }
+    return count;
+}
+
+bool FSFileCacheStorage::is_cache_prefix_directory(
+        const std::filesystem::directory_entry& entry) const {
+    if (!entry.is_directory()) {
+        return false;
+    }
+    auto name = entry.path().filename().native();
+    if (name == META_DIR_NAME || name.empty()) {
+        return false;
+    }
+    if (name.size() != KEY_PREFIX_LENGTH) {
+        return false;
+    }
+    return std::all_of(name.begin(), name.end(), [](unsigned char c) { return 
std::isxdigit(c); });
+}
+
+std::filesystem::path FSFileCacheStorage::find_mount_root(dev_t cache_dev) 
const {
+#ifdef BE_TEST
+    if (auto* hooks = inode_test_hooks(); hooks && 
hooks->find_mount_root_override) {
+        return hooks->find_mount_root_override(*this, cache_dev);
+    }
+#endif
     std::error_code ec;
-    uintmax_t total_size = 0;
-    std::vector<std::filesystem::path> pending_dirs 
{std::filesystem::path(_cache_base_path)};
-    while (!pending_dirs.empty()) {
-        auto current_dir = pending_dirs.back();
-        pending_dirs.pop_back();
+    std::filesystem::path current = 
std::filesystem::absolute(_cache_base_path, ec);
+    if (ec) {
+        LOG(WARNING) << "Failed to resolve absolute cache base path " << 
_cache_base_path << ": "
+                     << ec.message();
+        current = _cache_base_path;
+    }
 
-        std::filesystem::directory_iterator it(current_dir, ec);
-        if (ec) {
-            LOG(WARNING) << "Failed to list directory while estimating file 
count, dir="
-                         << current_dir << ", err=" << ec.message();
-            ec.clear();
+    std::filesystem::path result = current;
+    while (result.has_parent_path()) {
+        auto parent = result.parent_path();
+        if (parent.empty() || parent == result) {
+            break;
+        }
+        struct stat st {};
+        if (lstat(parent.c_str(), &st) != 0) {
+            LOG(WARNING) << "Failed to stat parent path " << parent << ": " << 
strerror(errno);
+            break;
+        }
+        if (st.st_dev != cache_dev) {
+            break;
+        }
+        result = parent;
+    }
+    return result;
+}
+
+size_t FSFileCacheStorage::estimate_non_cache_inode_usage() const {
+#ifdef BE_TEST
+    if (auto* hooks = inode_test_hooks(); hooks && hooks->non_cache_override) {
+        return hooks->non_cache_override(*this);
+    }
+#endif
+    struct stat cache_stat {};
+    if (lstat(_cache_base_path.c_str(), &cache_stat) != 0) {
+        LOG(WARNING) << "Failed to stat cache base path " << _cache_base_path 
<< ": "
+                     << strerror(errno);
+        return 0;
+    }
+
+    auto mount_root = find_mount_root(cache_stat.st_dev);
+    if (mount_root.empty()) {
+        LOG(WARNING) << "Failed to determine mount root for cache path " << 
_cache_base_path;
+        return 0;
+    }
+
+    std::unordered_set<InodeKey, InodeKeyHash> visited;
+    std::error_code abs_ec;
+    std::filesystem::path excluded = 
std::filesystem::absolute(_cache_base_path, abs_ec);
+    if (abs_ec) {
+        LOG(WARNING) << "Failed to get absolute cache base path " << 
_cache_base_path << ": "
+                     << abs_ec.message();
+        excluded = _cache_base_path;
+    }
+
+    return count_inodes_for_path(mount_root, cache_stat.st_dev, excluded, 
visited);
+}
+
+size_t FSFileCacheStorage::estimate_cache_directory_inode_usage() const {
+#ifdef BE_TEST
+    if (auto* hooks = inode_test_hooks(); hooks && hooks->cache_dir_override) {
+        return hooks->cache_dir_override(*this);
+    }
+#endif
+    constexpr size_t kSampleLimit = 3;
+    size_t prefix_dirs = 0;
+    std::vector<std::filesystem::path> samples;
+
+    std::error_code ec;
+    std::filesystem::directory_iterator it {_cache_base_path, ec};
+    if (ec) {
+        LOG(WARNING) << "Failed to list cache base path for directory 
estimation: " << ec.message();
+        return 0;
+    }
+
+    for (; it != std::filesystem::directory_iterator(); ++it) {
+        if (!is_cache_prefix_directory(*it)) {
             continue;
         }
+        ++prefix_dirs;
+        if (samples.size() < kSampleLimit) {
+            samples.emplace_back(it->path());
+        }
+    }
 
-        for (; it != std::filesystem::directory_iterator(); ++it) {
-            std::error_code status_ec;
-            auto entry_status = it->symlink_status(status_ec);
-            TEST_SYNC_POINT_CALLBACK(
-                    
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterEntryStatus",
-                    &status_ec);
-            if (status_ec) {
-                LOG(WARNING) << "Failed to stat entry while estimating file 
count, path="
-                             << it->path() << ", err=" << status_ec.message();
-                continue;
+    if (prefix_dirs == 0 || samples.empty()) {
+        return 0;
+    }
+
+    size_t sampled_second_level = 0;
+    for (const auto& prefix_path : samples) {
+        size_t local_count = 0;
+        std::error_code sample_ec;
+        for (std::filesystem::directory_iterator prefix_it {prefix_path, 
sample_ec};
+             !sample_ec && prefix_it != std::filesystem::directory_iterator(); 
++prefix_it) {
+            if (prefix_it->is_directory()) {
+                ++local_count;
             }
+        }
+        if (sample_ec) {
+            LOG(WARNING) << "Failed to enumerate prefix directory " << 
prefix_path << ": "
+                         << sample_ec.message();
+            sample_ec.clear();
+        }
+        sampled_second_level += local_count;
+    }
 
-            if (std::filesystem::is_directory(entry_status)) {
-                auto next_dir = it->path();
-                TEST_SYNC_POINT_CALLBACK(
-                        
"FSFileCacheStorage::estimate_file_count_from_statfs::OnDirectory",
-                        &next_dir);
-                pending_dirs.emplace_back(next_dir);
-                continue;
+    double average_second_level = static_cast<double>(sampled_second_level) / 
samples.size();
+    size_t estimated_second_level =
+            static_cast<size_t>(std::llround(average_second_level * 
prefix_dirs));
+    return prefix_dirs + estimated_second_level;
+}
+
+size_t FSFileCacheStorage::snapshot_metadata_block_count(BlockFileCache* 
/*mgr*/) const {
+    // TODO(zhengyu): if the cache_lock problem is solved, we can then use _mgr
+    int64_t duration_ns = 0;
+    size_t block_count = 0;
+    {
+        SCOPED_RAW_TIMER(&duration_ns);
+        if (_meta_store) {
+            block_count = _meta_store->approximate_entry_count();
+        } else {
+            LOG(INFO) << "snapshot_metadata_block_count skipped because meta 
store is null";
+            block_count = 0;
+        }
+    }
+    const double duration_ms = static_cast<double>(duration_ns) / 1'000'000.0;
+    LOG(INFO) << fmt::format("snapshot_metadata_block_count 
duration_ms={:.3f}, blocks={}",
+                             duration_ms, block_count);
+    return block_count;
+}
+
+std::vector<size_t> FSFileCacheStorage::snapshot_metadata_for_hash_offsets(
+        BlockFileCache* mgr, const UInt128Wrapper& hash) const {
+    std::vector<size_t> offsets;
+    std::lock_guard<std::mutex> lock(mgr->_mutex);
+    auto it = mgr->_files.find(hash);
+    if (it == mgr->_files.end()) {
+        return offsets;
+    }
+    offsets.reserve(it->second.size());
+    for (const auto& [offset, _] : it->second) {
+        offsets.push_back(offset);
+    }
+    return offsets;
+}
+
+void FSFileCacheStorage::start_leak_cleaner(BlockFileCache* mgr) {
+    if (config::file_cache_leak_scan_interval_seconds <= 0) {
+        LOG(WARNING) << "File cache leak cleaner disabled because interval <= 
0";
+        return;
+    }
+
+    // if version file not 3.0 then just return, clean nothing
+    std::string version;
+    if (auto st = read_file_cache_version(&version); !st.ok()) {
+        LOG(WARNING) << "Failed to read file cache version: " << 
st.to_string();
+        return;
+    }
+    if (version != "3.0") {
+        LOG(WARNING) << "File cache leak cleaner skipped because version is 
not 3.0";
+        return;
+    }
+
+    _stop_leak_cleaner.store(false, std::memory_order_relaxed);
+    _cache_leak_cleaner_thread = std::thread([this]() { leak_cleaner_loop(); 
});
+}
+
+void FSFileCacheStorage::stop_leak_cleaner() {
+    _stop_leak_cleaner.store(true, std::memory_order_relaxed);
+    _leak_cleaner_cv.notify_all();
+    if (_cache_leak_cleaner_thread.joinable()) {
+        _cache_leak_cleaner_thread.join();
+    }
+}
+
+void FSFileCacheStorage::leak_cleaner_loop() {
+    Thread::set_self_name("leak_cleaner_loop");
+
+    // randomly waiting before start the loop helps avoid thundering herd 
problem
+    // for all strorages.
+    const int64_t interval_seconds =
+            std::max<int64_t>(1, 
config::file_cache_leak_scan_interval_seconds);
+    std::mt19937_64 rng(std::random_device {}());
+    std::uniform_int_distribution<int64_t> dist(0, interval_seconds);
+    int64_t initial_delay = dist(rng);
+    
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::leak_cleaner_loop::initial_delay",
+                             &initial_delay);
+    if (initial_delay > 0) {
+        std::unique_lock<std::mutex> lock(_leak_cleaner_mutex);
+        _leak_cleaner_cv.wait_for(lock, std::chrono::seconds(initial_delay), 
[this]() {
+            return _stop_leak_cleaner.load(std::memory_order_relaxed);
+        });
+        lock.unlock();
+        if (_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+            return;
+        }
+    }
+
+    while (!_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+        int64_t interval_s = interval_seconds;
+        
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::leak_cleaner_loop::interval", 
&interval_s);
+        auto interval = std::chrono::seconds(interval_s);
+        std::unique_lock<std::mutex> lock(_leak_cleaner_mutex);
+        _leak_cleaner_cv.wait_for(lock, interval, [this]() {
+            return _stop_leak_cleaner.load(std::memory_order_relaxed);
+        });
+        lock.unlock();
+        if (_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+            break;
+        }
+        try {
+            
TEST_SYNC_POINT_CALLBACK("FSFileCacheStorage::leak_cleaner_loop::before_run");
+            run_leak_cleanup(_mgr);
+        } catch (const std::exception& e) {
+            LOG(WARNING) << "File cache leak cleaner encountered exception: " 
<< e.what();
+        } catch (...) {
+            LOG(WARNING) << "File cache leak cleaner encountered unknown 
exception";
+        }
+    }
+}
+
+void FSFileCacheStorage::run_leak_cleanup(BlockFileCache* mgr) {
+    size_t metadata_blocks = snapshot_metadata_block_count(mgr);
+    if (metadata_blocks == 0) {
+        LOG(INFO) << "file cache leak scan found zero metadata blocks, skip 
cleanup";
+        return;
+    }
+
+    size_t fs_files = estimate_file_count_from_inode();
+    double ratio = static_cast<double>(fs_files) / 
static_cast<double>(metadata_blocks);
+
+    LOG(INFO) << fmt::format(
+            "file cache leak scan stats: fs_files={}, metadata_blocks={}, 
ratio={:.4f}", fs_files,
+            metadata_blocks, ratio);
+
+    double threshold = config::file_cache_leak_fs_to_meta_ratio_threshold;
+    if (ratio <= threshold) {
+        LOG_INFO("file cache leak ratio {0:.4f} within threshold {1:.4f}, no 
cleanup needed", ratio,
+                 threshold);
+        return;
+    }
+
+    LOG(WARNING) << fmt::format(
+            "file cache leak ratio {0:.4f} exceeds threshold {1:.4f}, start 
cleanup", ratio,
+            threshold);
+
+    cleanup_leaked_files(mgr, metadata_blocks);
+}
+
+void FSFileCacheStorage::cleanup_leaked_files(BlockFileCache* mgr, size_t 
metadata_block_count) {
+    const size_t batch_size = std::max<int32_t>(1, 
config::file_cache_leak_scan_batch_files);
+    const size_t pause_ms = std::max<int32_t>(0, 
config::file_cache_leak_scan_pause_ms);
+
+    int64_t cleanup_wall_time_ns = 0;
+    int64_t metadata_hash_time_ns = 0;
+    int64_t metadata_index_time_ns = 0;
+    int64_t remove_candidates_time_ns = 0;
+    int64_t directory_loop_time_ns = 0;
+    size_t removed_files = 0;
+    size_t examined_files = 0;
+
+    std::vector<UInt128Wrapper> hash_keys;
+
+    {
+        SCOPED_RAW_TIMER(&cleanup_wall_time_ns);
+        {
+            SCOPED_RAW_TIMER(&metadata_hash_time_ns);
+            std::lock_guard<std::mutex> lock(mgr->_mutex);
+            hash_keys.reserve(mgr->_files.size());
+            for (const auto& [hash, _] : mgr->_files) {
+                hash_keys.push_back(hash);
             }
+        }
 
-            if (std::filesystem::is_regular_file(entry_status)) {
-                std::error_code size_ec;
-                auto file_size = it->file_size(size_ec);
-                TEST_SYNC_POINT_CALLBACK(
-                        
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterFileSize",
-                        &size_ec);
-                if (size_ec) {
-                    LOG(WARNING) << "Failed to get file size while estimating 
file count, path="
-                                 << it->path() << ", err=" << 
size_ec.message();
-                    continue;
+        std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> 
metadata_index;
+        if (metadata_block_count > 0) {
+            metadata_index.reserve(metadata_block_count * 2);
+        }
+
+        {
+            SCOPED_RAW_TIMER(&metadata_index_time_ns);
+            for (const auto& hash : hash_keys) {
+                auto offsets = snapshot_metadata_for_hash_offsets(mgr, hash);
+                for (const auto& offset : offsets) {
+                    metadata_index.emplace(hash, offset);
                 }
-                total_size += file_size;
             }
         }
-    }
 
-    if (total_size == 0) {
-        return 0;
-    }
+        struct OrphanCandidate {
+            std::string path;
+            UInt128Wrapper hash;
+            size_t offset;
+            std::string key_dir;
+        };
+
+        auto try_remove_empty_directory = [&](const std::string& dir) {
+            std::error_code ec;
+            std::filesystem::directory_iterator it(dir, ec);
+            if (ec || it != std::filesystem::directory_iterator()) {
+                return;
+            }
+            auto st = fs->delete_directory(dir);
+            if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+                LOG_WARNING("delete_directory {} failed", dir).error(st);
+            }
+        };
+
+        std::vector<OrphanCandidate> candidates;
+        candidates.reserve(batch_size);
+
+        auto remove_candidates = [&]() {
+            if (candidates.empty()) {
+                return;
+            }
+            int64_t remove_once_ns = 0;
+            {
+                SCOPED_RAW_TIMER(&remove_once_ns);
+                for (auto& candidate : candidates) {
+                    auto st = fs->delete_file(candidate.path);
+                    if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+                        LOG_WARNING("delete orphan cache file {} failed", 
candidate.path).error(st);
+                        continue;
+                    }
+                    removed_files++;
+                    try_remove_empty_directory(candidate.key_dir);
+                    auto prefix_dir =
+                            
std::filesystem::path(candidate.key_dir).parent_path().string();
+                    try_remove_empty_directory(prefix_dir);
+                }
+                candidates.clear();
+            }
+            remove_candidates_time_ns += remove_once_ns;
+            if (pause_ms > 0) {
+                
std::this_thread::sleep_for(std::chrono::milliseconds(pause_ms));
+            }
+        };
+
+        std::error_code ec;
+        std::filesystem::directory_iterator prefix_it {_cache_base_path, ec};
+        if (ec) {
+            LOG(WARNING) << "Leak scan failed to list cache directory: " << 
_cache_base_path
+                         << ", error: " << ec.message();
+            return;
+        }
+
+        for (; prefix_it != std::filesystem::directory_iterator(); 
++prefix_it) {
+            int64_t loop_once_ns = 0;
+            {
+                SCOPED_RAW_TIMER(&loop_once_ns);
+                std::string prefix_name = 
prefix_it->path().filename().native();
+                if (!prefix_it->is_directory() || prefix_name == META_DIR_NAME 
||
+                    prefix_name.size() != KEY_PREFIX_LENGTH) {
+                    continue;
+                }
+
+                std::filesystem::directory_iterator key_it {prefix_it->path(), 
ec};
+                if (ec) {
+                    LOG(WARNING) << "Leak scan failed to list prefix " << 
prefix_it->path().native()
+                                 << ", error: " << ec.message();
+                    continue;
+                }
 
-    // Estimate file count based on average file size
-    // Assuming average file size of 1MB for cache blocks
-    const uintmax_t average_file_size = 1024 * 1024; // 1MB
-    size_t estimated_file_count = total_size / average_file_size;
+                for (; key_it != std::filesystem::directory_iterator(); 
++key_it) {
+                    if (!key_it->is_directory()) {
+                        continue;
+                    }
+                    auto key_with_suffix = key_it->path().filename().native();
+                    auto delim_pos = key_with_suffix.find('_');
+                    if (delim_pos == std::string::npos || delim_pos != 
sizeof(uint128_t) * 2) {
+                        continue;
+                    }
+
+                    UInt128Wrapper hash;
+                    try {
+                        hash = 
UInt128Wrapper(vectorized::unhex_uint<uint128_t>(
+                                key_with_suffix.substr(0, delim_pos).c_str()));
+                    } catch (...) {
+                        LOG(WARNING) << "Leak scan failed to parse hash from " 
<< key_with_suffix;
+                        continue;
+                    }
+
+                    long expiration = 0;
+                    try {
+                        expiration = 
std::stol(key_with_suffix.substr(delim_pos + 1));
+                    } catch (...) {
+                        LOG(WARNING)
+                                << "Leak scan failed to parse expiration from 
" << key_with_suffix;
+                        continue;
+                    }
+
+                    std::filesystem::directory_iterator offset_it 
{key_it->path(), ec};
+                    if (ec) {
+                        LOG(WARNING) << "Leak scan failed to list key 
directory "
+                                     << key_it->path().native() << ", error: " 
<< ec.message();
+                        continue;
+                    }
+
+                    for (; offset_it != std::filesystem::directory_iterator(); 
++offset_it) {
+                        if (!offset_it->is_regular_file()) {
+                            continue;
+                        }
+                        const auto file_path = offset_it->path();
+                        const std::string file_path_str = file_path.string();
+                        size_t file_size = offset_it->file_size(ec);
+                        if (ec) {
+                            LOG(WARNING) << "Leak scan failed to fetch file 
size of "
+                                         << file_path.native() << ": " << 
ec.message();
+                            continue;
+                        }
+
+                        size_t offset = 0;
+                        bool is_tmp = false;
+                        FileCacheType cache_type = FileCacheType::NORMAL;
+                        Status st = parse_filename_suffix_to_cache_type(
+                                fs, offset_it->path().filename().native(), 
expiration, file_size,
+                                &offset, &is_tmp, &cache_type);
+                        if (!st.ok()) {
+                            continue;
+                        }
+
+                        AccessKeyAndOffset meta_key {hash, offset};
+
+                        // If the file is present in metadata and not a tmp 
file, skip it.
+                        if (!is_tmp && metadata_index.find(meta_key) != 
metadata_index.end()) {
+                            continue;
+                        }
+
+                        // For any file that is not referenced by metadata (or 
tmp files),
+                        // protect recently-created files from immediate 
deletion. This avoids
+                        // racing with writers. The grace window is configured 
by
+                        // file_cache_leak_grace_seconds and applies to all 
orphan files.
+                        const int64_t grace_seconds =
+                                std::max<int64_t>(0, 
config::file_cache_leak_grace_seconds);
+                        if (grace_seconds > 0) {
+                            struct stat st_buf {};
+                            if (::stat(file_path.c_str(), &st_buf) != 0) {
+                                LOG(WARNING) << "Leak scan failed to stat file 
" << file_path_str
+                                             << ": " << strerror(errno);
+                            } else {
+                                const std::time_t now = std::time(nullptr);
+                                if (now == static_cast<std::time_t>(-1)) {
+                                    LOG(WARNING)
+                                            << "Leak scan failed to get 
current time when checking "
+                                            << file_path_str;
+                                } else {
+                                    const int64_t age_seconds =
+                                            static_cast<int64_t>(now) -
+                                            
static_cast<int64_t>(st_buf.st_mtime);
+                                    if (age_seconds < grace_seconds) {
+                                        VLOG_DEBUG << fmt::format(
+                                                "Leak scan skipping young 
orphan file {} because "
+                                                "age={}s < grace={}s",
+                                                file_path_str, age_seconds, 
grace_seconds);
+                                        continue;
+                                    }
+                                }
+                            }
+                        }
+
+                        candidates.emplace_back(file_path_str, hash, offset,
+                                                key_it->path().string());
+                        examined_files++;
+                        if (candidates.size() >= batch_size) {
+                            remove_candidates();
+                        }
+                    }
+                }
+            }
+            directory_loop_time_ns += loop_once_ns;
+        }
 
-    LOG(INFO) << "Estimated file count for cache path " << _cache_base_path
-              << ": total_size=" << total_size << ", estimated_files=" << 
estimated_file_count;
+        remove_candidates();
+    }
 
-    return estimated_file_count;
+    auto ns_to_ms = [](int64_t ns) { return static_cast<double>(ns) / 
1'000'000.0; };
+
+    LOG(INFO) << fmt::format(
+            "file cache leak cleanup finished: examined_files={}, 
removed_orphans={}, "
+            "wall_time_ms={:.3f}, metadata_hash_time_ms={:.3f}, 
metadata_index_ms={:.3f}, "
+            "remove_candidates_ms={:.3f}, prefix_loop_ms={:.3f}",
+            examined_files, removed_files, ns_to_ms(cleanup_wall_time_ns),
+            ns_to_ms(metadata_hash_time_ns), ns_to_ms(metadata_index_time_ns),
+            ns_to_ms(remove_candidates_time_ns), 
ns_to_ms(directory_loop_time_ns));
+    if (_leak_scan_removed_files) {
+        *_leak_scan_removed_files << removed_files;
+    }
 }
 
 } // namespace doris::io
diff --git a/be/src/io/cache/fs_file_cache_storage.h 
b/be/src/io/cache/fs_file_cache_storage.h
index d486552d2b6..44b04805419 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -18,10 +18,21 @@
 #pragma once
 
 #include <bvar/bvar.h>
-
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <filesystem>
+#include <functional>
 #include <memory>
+#include <mutex>
 #include <shared_mutex>
 #include <thread>
+#include <unordered_set>
+#include <vector>
 
 #include "io/cache/cache_block_meta_store.h"
 #include "io/cache/file_cache_common.h"
@@ -58,6 +69,7 @@ public:
     /// version 1.0: cache_base_path / key / offset
     /// version 2.0: cache_base_path / key_prefix / key / offset
     static constexpr int KEY_PREFIX_LENGTH = 3;
+    static constexpr std::string META_DIR_NAME = "meta";
 
     FSFileCacheStorage() = default;
     ~FSFileCacheStorage() override;
@@ -90,6 +102,36 @@ public:
     // Get the meta store instance (only available for DISK storage type)
     CacheBlockMetaStore* get_meta_store() { return _meta_store.get(); }
 
+    struct InodeKey {
+        dev_t device;
+        ino_t inode;
+        bool operator==(const InodeKey& other) const {
+            return device == other.device && inode == other.inode;
+        }
+    };
+    struct InodeKeyHash {
+        size_t operator()(const InodeKey& key) const {
+            return std::hash<uint64_t>()((static_cast<uint64_t>(key.device) << 
32) ^
+                                         static_cast<uint64_t>(key.inode));
+        }
+    };
+
+#ifdef BE_TEST
+    struct InodeEstimationTestHooks {
+        std::function<int(const std::string&, struct statvfs*)> 
statvfs_override;
+        std::function<int(const std::string&, struct stat*)> lstat_override;
+        std::function<size_t(const FSFileCacheStorage&)> non_cache_override;
+        std::function<size_t(const FSFileCacheStorage&)> cache_dir_override;
+        std::function<std::filesystem::path(const FSFileCacheStorage&, dev_t)>
+                find_mount_root_override;
+        std::function<size_t(const FSFileCacheStorage&, const 
std::filesystem::path&, dev_t,
+                             const std::filesystem::path&,
+                             std::unordered_set<InodeKey, InodeKeyHash>&)>
+                count_inodes_override;
+    };
+    static void set_inode_estimation_test_hooks(InodeEstimationTestHooks* 
hooks);
+#endif
+
 private:
     void remove_old_version_directories();
 
@@ -116,8 +158,23 @@ private:
                                      std::lock_guard<std::mutex>& cache_lock) 
const;
 
 private:
-    // Helper function to count files in cache directory using statfs
-    size_t estimate_file_count_from_statfs() const;
+    // Helper function to count files in cache directory using inode stats
+    size_t estimate_file_count_from_inode() const;
+    size_t estimate_non_cache_inode_usage() const;
+    size_t estimate_cache_directory_inode_usage() const;
+    size_t count_inodes_for_path(const std::filesystem::path& path, dev_t 
target_dev,
+                                 const std::filesystem::path& excluded_root,
+                                 std::unordered_set<InodeKey, InodeKeyHash>& 
visited) const;
+    std::filesystem::path find_mount_root(dev_t cache_dev) const;
+    bool is_cache_prefix_directory(const std::filesystem::directory_entry& 
entry) const;
+    size_t snapshot_metadata_block_count(BlockFileCache* mgr) const;
+    std::vector<size_t> snapshot_metadata_for_hash_offsets(BlockFileCache* mgr,
+                                                           const 
UInt128Wrapper& hash) const;
+    void start_leak_cleaner(BlockFileCache* mgr);
+    void stop_leak_cleaner();
+    void leak_cleaner_loop();
+    void run_leak_cleanup(BlockFileCache* mgr);
+    void cleanup_leaked_files(BlockFileCache* mgr, size_t 
metadata_block_count);
     void load_cache_info_into_memory_from_fs(BlockFileCache* _mgr) const;
     void load_cache_info_into_memory_from_db(BlockFileCache* _mgr) const;
 
@@ -125,12 +182,18 @@ private:
                                 std::lock_guard<std::mutex>& cache_lock) const 
override;
 
     std::string _cache_base_path;
+    BlockFileCache* _mgr {nullptr};
     std::thread _cache_background_load_thread;
+    std::thread _cache_leak_cleaner_thread;
+    std::atomic<bool> _stop_leak_cleaner {false};
+    std::condition_variable _leak_cleaner_cv;
+    std::mutex _leak_cleaner_mutex;
     const std::shared_ptr<LocalFileSystem>& fs = global_local_filesystem();
     // TODO(Lchangliang): use a more efficient data structure
     std::mutex _mtx;
     std::unordered_map<FileWriterMapKey, FileWriterPtr, FileWriterMapKeyHash> 
_key_to_writer;
     std::shared_ptr<bvar::LatencyRecorder> _iterator_dir_retry_cnt;
+    std::shared_ptr<bvar::Adder<size_t>> _leak_scan_removed_files;
     std::unique_ptr<CacheBlockMetaStore> _meta_store;
 };
 
diff --git a/be/test/io/cache/block_file_cache_test_meta_store.cpp 
b/be/test/io/cache/block_file_cache_test_meta_store.cpp
index 585c359dd73..261efbc6072 100644
--- a/be/test/io/cache/block_file_cache_test_meta_store.cpp
+++ b/be/test/io/cache/block_file_cache_test_meta_store.cpp
@@ -430,6 +430,44 @@ TEST_F(BlockFileCacheTest, version3_add_remove_restart) {
     }
 }
 
+TEST_F(BlockFileCacheTest, version3_write_version_when_cache_dir_empty) {
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+
+    io::FileCacheSettings settings;
+    settings.storage = "disk";
+    settings.capacity = 10_mb;
+    settings.max_file_block_size = 1_mb;
+    settings.max_query_cache_size = settings.capacity;
+    settings.disposable_queue_size = settings.capacity;
+    settings.disposable_queue_elements = 8;
+    settings.index_queue_size = settings.capacity;
+    settings.index_queue_elements = 8;
+    settings.query_queue_size = settings.capacity;
+    settings.query_queue_elements = 8;
+    settings.ttl_queue_size = settings.capacity;
+    settings.ttl_queue_elements = 8;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+
+    for (int i = 0; i < 100; ++i) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    std::ifstream ifs(cache_base_path + "/version", std::ios::binary);
+    ASSERT_TRUE(ifs.good());
+    char buf[3] = {0};
+    ifs.read(buf, 3);
+    ASSERT_EQ(std::string(buf, static_cast<size_t>(ifs.gcount())), "3.0");
+}
+
 TEST_F(BlockFileCacheTest, 
clear_retains_meta_directory_and_clears_meta_entries) {
     config::enable_evict_file_cache_in_advance = false;
     if (fs::exists(cache_base_path)) {
@@ -580,146 +618,6 @@ TEST_F(BlockFileCacheTest, 
handle_already_loaded_block_updates_size_and_tablet)
         fs::remove_all(cache_base_path);
     }
 }
-TEST_F(BlockFileCacheTest, estimate_file_count_skips_removed_directory) {
-    std::string test_dir = cache_base_path + 
"/estimate_file_count_removed_dir";
-    if (fs::exists(test_dir)) {
-        fs::remove_all(test_dir);
-    }
-    auto keep_dir = fs::path(test_dir) / "keep";
-    auto remove_dir = fs::path(test_dir) / "remove";
-    fs::create_directories(keep_dir);
-    fs::create_directories(remove_dir);
-
-    auto keep_file = keep_dir / "data.bin";
-    std::string one_mb(1024 * 1024, 'd');
-    {
-        std::ofstream ofs(keep_file, std::ios::binary);
-        ASSERT_TRUE(ofs.good());
-        for (int i = 0; i < 3; ++i) {
-            ofs.write(one_mb.data(), one_mb.size());
-            ASSERT_TRUE(ofs.good());
-        }
-    }
-
-    FSFileCacheStorage storage;
-    storage._cache_base_path = test_dir;
-
-    const std::string sync_point_name =
-            "FSFileCacheStorage::estimate_file_count_from_statfs::OnDirectory";
-    auto* sync_point = doris::SyncPoint::get_instance();
-    doris::SyncPoint::CallbackGuard guard(sync_point_name);
-    sync_point->set_call_back(
-            sync_point_name,
-            [remove_dir](std::vector<std::any>&& args) {
-                auto* path = 
doris::try_any_cast<std::filesystem::path*>(args[0]);
-                if (*path == remove_dir) {
-                    fs::remove_all(remove_dir);
-                }
-            },
-            &guard);
-    sync_point->enable_processing();
-
-    size_t estimated_files = storage.estimate_file_count_from_statfs();
-
-    sync_point->disable_processing();
-
-    ASSERT_EQ(3, estimated_files);
-    ASSERT_FALSE(fs::exists(remove_dir));
-
-    if (fs::exists(test_dir)) {
-        fs::remove_all(test_dir);
-    }
-}
-
-TEST_F(BlockFileCacheTest, estimate_file_count_handles_stat_failure) {
-    std::string test_dir = cache_base_path + 
"/estimate_file_count_stat_failure";
-    if (fs::exists(test_dir)) {
-        fs::remove_all(test_dir);
-    }
-    fs::create_directories(test_dir);
-
-    auto data_file = fs::path(test_dir) / "data.bin";
-    std::string one_mb(1024 * 1024, 'x');
-    {
-        std::ofstream ofs(data_file, std::ios::binary);
-        ASSERT_TRUE(ofs.good());
-        ofs.write(one_mb.data(), one_mb.size());
-        ASSERT_TRUE(ofs.good());
-    }
-
-    FSFileCacheStorage storage;
-    storage._cache_base_path = test_dir;
-
-    const std::string sync_point_name =
-            
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterEntryStatus";
-    auto* sync_point = doris::SyncPoint::get_instance();
-    doris::SyncPoint::CallbackGuard guard(sync_point_name);
-    sync_point->set_call_back(
-            sync_point_name,
-            [](std::vector<std::any>&& args) {
-                auto* ec = doris::try_any_cast<std::error_code*>(args[0]);
-                if (ec != nullptr) {
-                    *ec = std::make_error_code(std::errc::io_error);
-                }
-            },
-            &guard);
-    sync_point->enable_processing();
-
-    size_t estimated_files = storage.estimate_file_count_from_statfs();
-
-    sync_point->disable_processing();
-
-    ASSERT_EQ(0, estimated_files);
-
-    if (fs::exists(test_dir)) {
-        fs::remove_all(test_dir);
-    }
-}
-
-TEST_F(BlockFileCacheTest, estimate_file_count_handles_file_size_failure) {
-    std::string test_dir = cache_base_path + 
"/estimate_file_count_file_size_failure";
-    if (fs::exists(test_dir)) {
-        fs::remove_all(test_dir);
-    }
-    fs::create_directories(test_dir);
-
-    auto data_file = fs::path(test_dir) / "data.bin";
-    std::string one_mb(1024 * 1024, 'x');
-    {
-        std::ofstream ofs(data_file, std::ios::binary);
-        ASSERT_TRUE(ofs.good());
-        ofs.write(one_mb.data(), one_mb.size());
-        ASSERT_TRUE(ofs.good());
-    }
-
-    FSFileCacheStorage storage;
-    storage._cache_base_path = test_dir;
-
-    const std::string sync_point_name =
-            
"FSFileCacheStorage::estimate_file_count_from_statfs::AfterFileSize";
-    auto* sync_point = doris::SyncPoint::get_instance();
-    doris::SyncPoint::CallbackGuard guard(sync_point_name);
-    sync_point->set_call_back(
-            sync_point_name,
-            [](std::vector<std::any>&& args) {
-                auto* ec = doris::try_any_cast<std::error_code*>(args[0]);
-                if (ec != nullptr) {
-                    *ec = std::make_error_code(std::errc::io_error);
-                }
-            },
-            &guard);
-    sync_point->enable_processing();
-
-    size_t estimated_files = storage.estimate_file_count_from_statfs();
-
-    sync_point->disable_processing();
-
-    ASSERT_EQ(0, estimated_files);
-
-    if (fs::exists(test_dir)) {
-        fs::remove_all(test_dir);
-    }
-}
 
 //TODO(zhengyu): check lazy load
 //TODO(zhengyu): check version2 start
diff --git a/be/test/io/cache/fs_file_cache_storage_leak_cleaner_test.cpp 
b/be/test/io/cache/fs_file_cache_storage_leak_cleaner_test.cpp
new file mode 100644
index 00000000000..58ccc623e14
--- /dev/null
+++ b/be/test/io/cache/fs_file_cache_storage_leak_cleaner_test.cpp
@@ -0,0 +1,718 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <atomic>
+#include <cerrno>
+#include <chrono>
+#include <cstring>
+#include <filesystem>
+#include <fstream>
+#include <stdexcept>
+#include <string>
+#include <thread>
+#include <unordered_set>
+
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wkeyword-macro"
+#endif
+#define private public
+#define protected public
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+#include "io/cache/block_file_cache.h"
+#include "io/cache/fs_file_cache_storage.h"
+#undef private
+#undef protected
+
+#include "block_file_cache_test_common.h"
+
+namespace doris::io {
+
+namespace fs = std::filesystem;
+
+class ScopedLeakCleanerConfig {
+public:
+    ScopedLeakCleanerConfig()
+            : ratio(config::file_cache_leak_fs_to_meta_ratio_threshold),
+              interval(config::file_cache_leak_scan_interval_seconds),
+              batch(config::file_cache_leak_scan_batch_files),
+              pause(config::file_cache_leak_scan_pause_ms),
+              grace(config::file_cache_leak_grace_seconds) {
+        config::file_cache_leak_grace_seconds = 0;
+    }
+
+    ~ScopedLeakCleanerConfig() {
+        config::file_cache_leak_fs_to_meta_ratio_threshold = ratio;
+        config::file_cache_leak_scan_interval_seconds = interval;
+        config::file_cache_leak_scan_batch_files = batch;
+        config::file_cache_leak_scan_pause_ms = pause;
+        config::file_cache_leak_grace_seconds = grace;
+    }
+
+private:
+    double ratio;
+    int64_t interval;
+    int32_t batch;
+    int32_t pause;
+    int64_t grace;
+};
+
+class ScopedInodeTestHooks {
+public:
+    ScopedInodeTestHooks() { 
FSFileCacheStorage::set_inode_estimation_test_hooks(&hooks); }
+    ~ScopedInodeTestHooks() { 
FSFileCacheStorage::set_inode_estimation_test_hooks(nullptr); }
+
+    FSFileCacheStorage::InodeEstimationTestHooks hooks;
+};
+
+class FSFileCacheLeakCleanerTest : public BlockFileCacheTest {
+protected:
+    static FileCacheSettings default_settings() {
+        FileCacheSettings settings;
+        settings.capacity = 10 * 1024 * 1024;
+        settings.max_file_block_size = 1 * 1024 * 1024;
+        settings.max_query_cache_size = settings.capacity;
+        settings.disposable_queue_size = settings.capacity;
+        settings.disposable_queue_elements = 8;
+        settings.index_queue_size = settings.capacity;
+        settings.index_queue_elements = 8;
+        settings.query_queue_size = settings.capacity;
+        settings.query_queue_elements = 8;
+        settings.ttl_queue_size = settings.capacity;
+        settings.ttl_queue_elements = 8;
+        settings.storage = "disk";
+        return settings;
+    }
+
+    fs::path prepare_test_dir(const std::string& name) const {
+        fs::path dir = caches_dir / "leak_cleaner" / name;
+        std::error_code ec;
+        fs::remove_all(dir, ec);
+        fs::create_directories(dir, ec);
+        return dir;
+    }
+
+    static std::string current_test_name() {
+        if (auto* info = 
::testing::UnitTest::GetInstance()->current_test_info()) {
+            return std::string(info->name());
+        }
+        return "unknown";
+    }
+
+    fs::path prepare_test_dir() const { return 
prepare_test_dir(current_test_name()); }
+
+    void setup_storage(FSFileCacheStorage& storage, BlockFileCache& mgr, const 
fs::path& dir) {
+        storage._cache_base_path = dir.string();
+        storage._mgr = &mgr;
+        storage._meta_store = 
std::make_unique<CacheBlockMetaStore>(dir.string() + "/meta", 10000);
+        EXPECT_TRUE(storage._meta_store->init().ok());
+        EXPECT_TRUE(storage.write_file_cache_version().ok());
+    }
+
+    static void add_metadata_entry(BlockFileCache& mgr, FSFileCacheStorage& 
storage,
+                                   const UInt128Wrapper& hash, size_t offset) {
+        {
+            std::lock_guard<std::mutex> l(mgr._mutex);
+            mgr._files[hash].try_emplace(offset);
+        }
+        if (storage._meta_store) {
+            BlockMetaKey mkey(0, hash, offset);
+            BlockMeta meta(FileCacheType::NORMAL, 16, 0);
+            storage._meta_store->put(mkey, meta);
+            // Wait for async write to complete for test stability
+            for (int i = 0; i < 100 && 
storage._meta_store->get_write_queue_size() > 0; ++i) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(50));
+            }
+        }
+    }
+
+    static void create_regular_file(const std::string& path, char fill = 'x') {
+        fs::create_directories(fs::path(path).parent_path());
+        std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
+        ASSERT_TRUE(ofs.good());
+        std::string payload(16, fill);
+        ofs.write(payload.data(), payload.size());
+        ofs.close();
+        ASSERT_TRUE(std::filesystem::exists(path));
+    }
+};
+
+TEST_F(FSFileCacheLeakCleanerTest, disable_when_interval_non_positive) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_scan_interval_seconds = 0;
+    auto dir = prepare_test_dir();
+
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    storage.start_leak_cleaner(&mgr);
+    EXPECT_FALSE(storage._cache_leak_cleaner_thread.joinable());
+    EXPECT_FALSE(storage._stop_leak_cleaner.load(std::memory_order_relaxed));
+
+    storage.stop_leak_cleaner();
+    EXPECT_TRUE(storage._stop_leak_cleaner.load(std::memory_order_relaxed));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, start_and_stop_thread) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+    config::file_cache_leak_fs_to_meta_ratio_threshold = 1e12;
+    config::file_cache_leak_scan_batch_files = 4;
+    config::file_cache_leak_scan_pause_ms = 0;
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    add_metadata_entry(mgr, storage, BlockFileCache::hash("thread_guard"), 0);
+
+    storage.start_leak_cleaner(&mgr);
+    ASSERT_TRUE(storage._cache_leak_cleaner_thread.joinable());
+
+    storage.stop_leak_cleaner();
+    EXPECT_TRUE(storage._stop_leak_cleaner.load(std::memory_order_relaxed));
+    EXPECT_FALSE(storage._cache_leak_cleaner_thread.joinable());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, skip_cleanup_when_ratio_below_threshold) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_fs_to_meta_ratio_threshold = 1e12;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    ScopedInodeTestHooks hooks_guard;
+    std::atomic<int> statvfs_calls {0};
+    hooks_guard.hooks.statvfs_override = [&statvfs_calls](const std::string&, 
struct statvfs* vfs) {
+        statvfs_calls.fetch_add(1, std::memory_order_relaxed);
+        *vfs = {};
+        vfs->f_files = 100;
+        vfs->f_ffree = 36;
+        return 0;
+    };
+    hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st) 
{
+        *st = {};
+        st->st_dev = 1;
+        return 0;
+    };
+    hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) { 
return 0; };
+    hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) { 
return 0; };
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    const auto metadata_hash = BlockFileCache::hash("metadata_key");
+    for (size_t i = 0; i < 64; ++i) {
+        add_metadata_entry(mgr, storage, metadata_hash, i);
+    }
+
+    const auto orphan_hash = BlockFileCache::hash("ratio_skip_orphan");
+    const auto orphan_dir = storage.get_path_in_local_cache_v3(orphan_hash);
+    
create_regular_file(FSFileCacheStorage::get_path_in_local_cache_v3(orphan_dir, 
0, false));
+
+    storage.run_leak_cleanup(&mgr);
+    EXPECT_TRUE(std::filesystem::exists(
+            FSFileCacheStorage::get_path_in_local_cache_v3(orphan_dir, 0, 
false)));
+    EXPECT_EQ(1, statvfs_calls.load(std::memory_order_relaxed));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, remove_orphan_and_tmp_files) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_scan_batch_files = 1;
+    config::file_cache_leak_scan_pause_ms = 0;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    ScopedInodeTestHooks hooks_guard;
+    hooks_guard.hooks.statvfs_override = [](const std::string&, struct 
statvfs* vfs) {
+        *vfs = {};
+        vfs->f_files = 100;
+        vfs->f_ffree = 10; // 90 used
+        return 0;
+    };
+    hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) { 
return 0; };
+    hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) { 
return 0; };
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    auto kept_hash = BlockFileCache::hash("kept_hash");
+    add_metadata_entry(mgr, storage, kept_hash, 0);
+
+    auto kept_dir = storage.get_path_in_local_cache_v3(kept_hash);
+    auto kept_file = FSFileCacheStorage::get_path_in_local_cache_v3(kept_dir, 
0, false);
+    auto tmp_file = FSFileCacheStorage::get_path_in_local_cache_v3(kept_dir, 
8, true);
+    create_regular_file(kept_file, 'k');
+    create_regular_file(tmp_file, 't');
+
+    auto orphan_hash = BlockFileCache::hash("orphan_hash");
+    auto orphan_dir = storage.get_path_in_local_cache_v3(orphan_hash);
+    auto orphan_file = 
FSFileCacheStorage::get_path_in_local_cache_v3(orphan_dir, 4, false);
+    create_regular_file(orphan_file, 'o');
+
+    storage.run_leak_cleanup(&mgr);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(5000));
+
+    EXPECT_TRUE(std::filesystem::exists(kept_file));
+    EXPECT_FALSE(std::filesystem::exists(tmp_file));
+    EXPECT_FALSE(std::filesystem::exists(orphan_file));
+    EXPECT_FALSE(std::filesystem::exists(orphan_dir));
+
+    std::error_code ec;
+    fs::remove_all(dir, ec);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
snapshot_metadata_for_hash_offsets_handles_missing_hash) {
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    auto missing_hash = BlockFileCache::hash("missing_hash_case");
+    auto offsets = storage.snapshot_metadata_for_hash_offsets(&mgr, 
missing_hash);
+    EXPECT_TRUE(offsets.empty());
+
+    add_metadata_entry(mgr, storage, missing_hash, 7);
+    add_metadata_entry(mgr, storage, missing_hash, 3);
+
+    offsets = storage.snapshot_metadata_for_hash_offsets(&mgr, missing_hash);
+    std::sort(offsets.begin(), offsets.end());
+    ASSERT_EQ(2, offsets.size());
+    EXPECT_EQ(3u, offsets[0]);
+    EXPECT_EQ(7u, offsets[1]);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, leak_cleaner_loop_catches_std_exception) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    std::atomic<int> callback_count {0};
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::initial_delay",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0; 
});
+    sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::interval",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0; 
});
+    sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::before_run",
+                      [&storage, &callback_count](auto&&) {
+                          callback_count.fetch_add(1, 
std::memory_order_relaxed);
+                          storage._stop_leak_cleaner.store(true, 
std::memory_order_relaxed);
+                          storage._leak_cleaner_cv.notify_all();
+                          throw std::runtime_error("injected std exception");
+                      });
+    sp->enable_processing();
+
+    storage._stop_leak_cleaner.store(false, std::memory_order_relaxed);
+    std::thread worker([&]() { storage.leak_cleaner_loop(); });
+
+    for (int i = 0; i < 100 && callback_count.load(std::memory_order_relaxed) 
== 0; ++i) {
+        storage._leak_cleaner_cv.notify_all();
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    storage._stop_leak_cleaner.store(true, std::memory_order_relaxed);
+    storage._leak_cleaner_cv.notify_all();
+    worker.join();
+
+    sp->disable_processing();
+    sp->clear_all_call_backs();
+
+    ASSERT_GE(callback_count.load(std::memory_order_relaxed), 1);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
leak_cleaner_loop_catches_unknown_exception) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    struct NonStdException {};
+
+    std::atomic<int> callback_count {0};
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::initial_delay",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0; 
});
+    sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::interval",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 0; 
});
+    sp->set_call_back("FSFileCacheStorage::leak_cleaner_loop::before_run",
+                      [&storage, &callback_count](auto&&) {
+                          callback_count.fetch_add(1, 
std::memory_order_relaxed);
+                          storage._stop_leak_cleaner.store(true, 
std::memory_order_relaxed);
+                          storage._leak_cleaner_cv.notify_all();
+                          throw NonStdException {};
+                      });
+    sp->enable_processing();
+
+    storage._stop_leak_cleaner.store(false, std::memory_order_relaxed);
+    std::thread worker([&]() { storage.leak_cleaner_loop(); });
+
+    for (int i = 0; i < 100 && callback_count.load(std::memory_order_relaxed) 
== 0; ++i) {
+        storage._leak_cleaner_cv.notify_all();
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+
+    storage._stop_leak_cleaner.store(true, std::memory_order_relaxed);
+    storage._leak_cleaner_cv.notify_all();
+    worker.join();
+
+    sp->disable_processing();
+    sp->clear_all_call_backs();
+
+    ASSERT_GE(callback_count.load(std::memory_order_relaxed), 1);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
run_leak_cleanup_removes_orphan_when_metadata_missing) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_fs_to_meta_ratio_threshold = 0.5;
+    config::file_cache_leak_grace_seconds = 0;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    ScopedInodeTestHooks hooks_guard;
+    hooks_guard.hooks.statvfs_override = [](const std::string&, struct 
statvfs* vfs) {
+        *vfs = {};
+        vfs->f_files = 1000;
+        vfs->f_ffree = 900; // 100 used
+        return 0;
+    };
+    hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) { 
return 0; };
+    hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) { 
return 0; };
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    auto hash = BlockFileCache::hash("zero_meta_orphan");
+    auto key_dir = storage.get_path_in_local_cache_v3(hash);
+    fs::create_directories(key_dir);
+    auto orphan_path = FSFileCacheStorage::get_path_in_local_cache_v3(key_dir, 
0, false);
+    create_regular_file(orphan_path, 'z');
+
+    auto dummy_hash = BlockFileCache::hash("dummy");
+    add_metadata_entry(mgr, storage, dummy_hash, 0);
+
+    storage.run_leak_cleanup(&mgr);
+
+    auto prefix_dir = fs::path(key_dir).parent_path();
+    EXPECT_FALSE(fs::exists(orphan_path));
+    EXPECT_FALSE(fs::exists(key_dir));
+    EXPECT_FALSE(fs::exists(prefix_dir));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, cleanup_handles_missing_base_directory) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir / "missing_root");
+    fs::path missing_path(storage._cache_base_path);
+    if (fs::exists(missing_path)) {
+        fs::remove_all(missing_path);
+    }
+
+    storage.cleanup_leaked_files(&mgr, 0);
+    EXPECT_FALSE(fs::exists(missing_path));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, cleanup_skips_invalid_prefixes_and_keys) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_grace_seconds = 0;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    fs::create_directories(dir);
+    create_regular_file((dir / "root_file").string());               // 
non-directory prefix entry
+    fs::create_directories(dir / FSFileCacheStorage::META_DIR_NAME); // meta 
dir skip
+    fs::create_directories(dir / "abcd");                            // 
invalid prefix length
+
+    auto prefix_dir = dir / "abc";
+    fs::create_directories(prefix_dir);
+    create_regular_file((prefix_dir / "plain_file").string()); // 
!key_it->is_directory branch
+    fs::create_directories(prefix_dir / "deadbeef" /* missing '_' */);
+    fs::create_directories(prefix_dir / "zzzg000_0" /* invalid hex */);
+    fs::create_directories(prefix_dir / "123abc_bad" /* invalid expiration */);
+
+    storage.cleanup_leaked_files(&mgr, 0);
+
+    EXPECT_TRUE(fs::exists(prefix_dir));
+    EXPECT_TRUE(fs::exists(dir / "abcd"));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, cleanup_flush_candidates_when_empty) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_grace_seconds = 0;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    auto hash = BlockFileCache::hash("metadata_kept_hash");
+    add_metadata_entry(mgr, storage, hash, 0);
+
+    auto key_dir = storage.get_path_in_local_cache_v3(hash);
+    fs::create_directories(key_dir);
+    auto file_path = FSFileCacheStorage::get_path_in_local_cache_v3(key_dir, 
0, false);
+    create_regular_file(file_path, 'm');
+
+    storage.cleanup_leaked_files(&mgr, 1);
+
+    EXPECT_TRUE(fs::exists(file_path));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
cleanup_flush_candidates_remove_directories) {
+    ScopedLeakCleanerConfig guard;
+    config::file_cache_leak_grace_seconds = 0;
+    config::file_cache_leak_scan_batch_files = 2;
+    config::file_cache_leak_scan_interval_seconds = 1;
+
+    auto dir = prepare_test_dir();
+    FileCacheSettings settings = default_settings();
+    BlockFileCache mgr(dir.string(), settings);
+
+    FSFileCacheStorage storage;
+    setup_storage(storage, mgr, dir);
+
+    auto hash = BlockFileCache::hash("cleanup_orphan_batch");
+    auto key_dir = storage.get_path_in_local_cache_v3(hash);
+    fs::create_directories(key_dir);
+    auto orphan_path = FSFileCacheStorage::get_path_in_local_cache_v3(key_dir, 
4, false);
+    create_regular_file(orphan_path, 'c');
+
+    storage.cleanup_leaked_files(&mgr, 0);
+
+    auto prefix_dir = fs::path(key_dir).parent_path();
+    EXPECT_FALSE(fs::exists(orphan_path));
+    EXPECT_FALSE(fs::exists(key_dir));
+    EXPECT_FALSE(fs::exists(prefix_dir));
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
estimate_file_count_handles_statvfs_failure) {
+    ScopedInodeTestHooks hooks_guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+    hooks_guard.hooks.statvfs_override = [](const std::string&, struct 
statvfs* vfs) {
+        *vfs = {};
+        errno = EIO;
+        return -1;
+    };
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = "/tmp/nonexistent_statvfs";
+    EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
estimate_file_count_handles_zero_total_inodes) {
+    ScopedInodeTestHooks hooks_guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+    hooks_guard.hooks.statvfs_override = [](const std::string&, struct 
statvfs* vfs) {
+        *vfs = {};
+        vfs->f_files = 0;
+        return 0;
+    };
+    hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st) 
{
+        *st = {};
+        st->st_dev = 1;
+        return 0;
+    };
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = "/tmp/cache_zero_inodes";
+    EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, estimate_file_count_handles_lstat_failure) {
+    ScopedInodeTestHooks hooks_guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+    hooks_guard.hooks.statvfs_override = [](const std::string&, struct 
statvfs* vfs) {
+        *vfs = {};
+        vfs->f_files = 100;
+        vfs->f_ffree = 10;
+        return 0;
+    };
+    hooks_guard.hooks.lstat_override = [](const std::string&, struct stat*) {
+        errno = ENOENT;
+        return -1;
+    };
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = "/tmp/cache_lstat_failure";
+    EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, estimate_file_count_handles_underflow) {
+    ScopedInodeTestHooks hooks_guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+    hooks_guard.hooks.statvfs_override = [](const std::string&, struct 
statvfs* vfs) {
+        *vfs = {};
+        vfs->f_files = 200;
+        vfs->f_ffree = 150;
+        return 0;
+    };
+    hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st) 
{
+        *st = {};
+        st->st_dev = 9;
+        return 0;
+    };
+    hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) { 
return 80; };
+    hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) { 
return 90; };
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = "/tmp/cache_underflow";
+    EXPECT_EQ(0u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, estimate_file_count_combines_counts) {
+    ScopedInodeTestHooks hooks_guard;
+    config::file_cache_leak_scan_interval_seconds = 1;
+    hooks_guard.hooks.statvfs_override = [](const std::string&, struct 
statvfs* vfs) {
+        *vfs = {};
+        vfs->f_files = 500;
+        vfs->f_ffree = 200;
+        return 0;
+    };
+    hooks_guard.hooks.lstat_override = [](const std::string&, struct stat* st) 
{
+        *st = {};
+        st->st_dev = 7;
+        return 0;
+    };
+    hooks_guard.hooks.non_cache_override = [](const FSFileCacheStorage&) { 
return 50; };
+    hooks_guard.hooks.cache_dir_override = [](const FSFileCacheStorage&) { 
return 30; };
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = "/tmp/cache_estimation";
+    EXPECT_EQ(220u, storage.estimate_file_count_from_inode());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
estimate_non_cache_inode_usage_counts_other_paths) {
+    auto root = prepare_test_dir("inode_non_cache_root");
+    auto cache_dir = root / "cache";
+    auto other_dir = root / "others";
+    auto nested_dir = other_dir / "nested";
+    fs::create_directories(cache_dir);
+    fs::create_directories(nested_dir);
+    create_regular_file((root / "root_file.bin").string());
+    create_regular_file((other_dir / "leaf.txt").string());
+    create_regular_file((nested_dir / "inner.bin").string());
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = cache_dir.string();
+
+    ScopedInodeTestHooks hooks_guard;
+    hooks_guard.hooks.find_mount_root_override = [root](const 
FSFileCacheStorage&, dev_t) {
+        return root;
+    };
+
+    EXPECT_EQ(6u, storage.estimate_non_cache_inode_usage());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, 
estimate_cache_directory_inode_usage_samples_prefixes) {
+    auto base = prepare_test_dir("inode_cache_directory");
+    auto prefix_a = base / "abc";
+    auto prefix_b = base / "def";
+    fs::create_directories(prefix_a);
+    fs::create_directories(prefix_b);
+    fs::create_directories(base / FSFileCacheStorage::META_DIR_NAME);
+    fs::create_directories(base / "abcd");
+    fs::create_directories(prefix_a / "deadbeef_0");
+    fs::create_directories(prefix_b / "feed000_0");
+    fs::create_directories(prefix_b / "feed000_1");
+    fs::create_directories(prefix_b / "feed000_2");
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = base.string();
+
+    EXPECT_EQ(6u, storage.estimate_cache_directory_inode_usage());
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, count_inodes_for_path_respects_exclusions) {
+    auto base = prepare_test_dir("inode_counting");
+    auto include_dir = base / "include";
+    auto exclude_dir = base / "exclude";
+    fs::create_directories(include_dir);
+    fs::create_directories(exclude_dir);
+    create_regular_file((base / "root.bin").string());
+    create_regular_file((include_dir / "child.bin").string());
+    create_regular_file((exclude_dir / "skip.bin").string());
+
+    FSFileCacheStorage storage;
+    struct stat st {};
+    ASSERT_EQ(0, lstat(base.c_str(), &st));
+    std::unordered_set<FSFileCacheStorage::InodeKey, 
FSFileCacheStorage::InodeKeyHash> visited;
+    size_t count = storage.count_inodes_for_path(base, st.st_dev, exclude_dir, 
visited);
+    EXPECT_EQ(4u, count);
+}
+
+TEST_F(FSFileCacheLeakCleanerTest, is_cache_prefix_directory_filters_entries) {
+    auto base = prepare_test_dir("inode_prefix_filter");
+    auto valid = base / "abc";
+    auto invalid = base / "abcd";
+    auto meta_dir = base / FSFileCacheStorage::META_DIR_NAME;
+    fs::create_directories(valid);
+    fs::create_directories(invalid);
+    fs::create_directories(meta_dir);
+    create_regular_file((base / "plain_file").string());
+
+    FSFileCacheStorage storage;
+    storage._cache_base_path = base.string();
+
+    EXPECT_TRUE(storage.is_cache_prefix_directory(fs::directory_entry(valid)));
+    
EXPECT_FALSE(storage.is_cache_prefix_directory(fs::directory_entry(invalid)));
+    
EXPECT_FALSE(storage.is_cache_prefix_directory(fs::directory_entry(meta_dir)));
+    EXPECT_FALSE(storage.is_cache_prefix_directory(fs::directory_entry(base / 
"plain_file")));
+}
+
+} // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to