gavinchou commented on code in PR #59269:
URL: https://github.com/apache/doris/pull/59269#discussion_r2644829813


##########
be/src/io/cache/fs_file_cache_storage.cpp:
##########
@@ -970,42 +981,317 @@ FSFileCacheStorage::~FSFileCacheStorage() {
     if (_cache_background_load_thread.joinable()) {
         _cache_background_load_thread.join();
     }
+    stop_leak_cleaner();
 }
 
-size_t FSFileCacheStorage::estimate_file_count_from_statfs() const {
+size_t FSFileCacheStorage::estimate_file_count_from_inode() const {
     struct statvfs vfs;
     if (statvfs(_cache_base_path.c_str(), &vfs) != 0) {
         LOG(WARNING) << "Failed to get filesystem statistics for path: " << 
_cache_base_path
                      << ", error: " << strerror(errno);
         return 0;
     }
 
-    // Get total size of cache directory to estimate file count
-    std::error_code ec;
-    uintmax_t total_size = 0;
-    for (const auto& entry : 
std::filesystem::recursive_directory_iterator(_cache_base_path, ec)) {
-        if (ec) {
-            LOG(WARNING) << "Error accessing directory entry: " << 
ec.message();
-            continue;
+    if (vfs.f_files == 0) {
+        LOG(WARNING) << "Filesystem returned zero total inodes for path " << 
_cache_base_path;
+        return 0;
+    }
+
+    size_t inode_used = vfs.f_files - vfs.f_ffree;
+    LOG(INFO) << "Inode usage for cache path " << _cache_base_path << ": 
total=" << vfs.f_files
+              << ", free=" << vfs.f_ffree << ", used=" << inode_used;
+    return inode_used;
+}
+
+size_t FSFileCacheStorage::snapshot_metadata_block_count(BlockFileCache* mgr) 
const {
+    if (mgr == nullptr) {
+        return 0;
+    }
+    std::lock_guard<std::mutex> lock(mgr->_mutex);
+    size_t total_blocks = 0;
+    for (const auto& [_, blocks_by_offset] : mgr->_files) {
+        total_blocks += blocks_by_offset.size();
+    }
+    return total_blocks;
+}
+
+std::vector<size_t> FSFileCacheStorage::snapshot_metadata_for_hash_offsets(
+        BlockFileCache* mgr, const UInt128Wrapper& hash) const {
+    std::vector<size_t> offsets;
+    if (mgr == nullptr) {
+        return offsets;
+    }
+    std::lock_guard<std::mutex> lock(mgr->_mutex);
+    auto it = mgr->_files.find(hash);
+    if (it == mgr->_files.end()) {
+        return offsets;
+    }
+    offsets.reserve(it->second.size());
+    for (const auto& [offset, _] : it->second) {
+        offsets.push_back(offset);
+    }
+    return offsets;
+}
+
+void FSFileCacheStorage::start_leak_cleaner(BlockFileCache* mgr) {
+    if (config::file_cache_leak_scan_interval_seconds <= 0) {
+        LOG(INFO) << "File cache leak cleaner disabled because interval <= 0";
+        return;
+    }
+    _stop_leak_cleaner.store(false, std::memory_order_relaxed);
+    _cache_leak_cleaner_thread = std::thread([this]() { leak_cleaner_loop(); 
});
+}
+
+void FSFileCacheStorage::stop_leak_cleaner() {
+    _stop_leak_cleaner.store(true, std::memory_order_relaxed);
+    _leak_cleaner_cv.notify_all();
+    if (_cache_leak_cleaner_thread.joinable()) {
+        _cache_leak_cleaner_thread.join();
+    }
+}
+
+void FSFileCacheStorage::leak_cleaner_loop() {
+    while (!_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+        auto interval = std::chrono::seconds(
+                std::max<int64_t>(1, 
config::file_cache_leak_scan_interval_seconds));
+        std::unique_lock<std::mutex> lock(_leak_cleaner_mutex);
+        _leak_cleaner_cv.wait_for(lock, interval, [this]() {
+            return _stop_leak_cleaner.load(std::memory_order_relaxed);
+        });
+        lock.unlock();
+        if (_stop_leak_cleaner.load(std::memory_order_relaxed)) {
+            break;
         }
-        if (entry.is_regular_file()) {
-            total_size += entry.file_size();
+        try {
+            run_leak_cleanup(_mgr);
+        } catch (const std::exception& e) {
+            LOG(WARNING) << "File cache leak cleaner encountered exception: " 
<< e.what();
+        } catch (...) {
+            LOG(WARNING) << "File cache leak cleaner encountered unknown 
exception";
         }
     }
+}
+
+void FSFileCacheStorage::run_leak_cleanup(BlockFileCache* mgr) {
+    if (mgr == nullptr) {
+        return;
+    }
 
-    if (total_size == 0) {
-        return 0;
+    size_t metadata_blocks = snapshot_metadata_block_count(mgr);
+    size_t fs_files = estimate_file_count_from_inode();
+
+    double ratio = 0.0;
+    if (metadata_blocks == 0) {
+        ratio = fs_files > 0 ? std::numeric_limits<double>::infinity() : 0.0;
+    } else {
+        ratio = static_cast<double>(fs_files) / 
static_cast<double>(metadata_blocks);
+    }
+
+    LOG(INFO) << fmt::format(
+            "file cache leak scan stats: fs_files={}, metadata_blocks={}, 
ratio={:.4f}", fs_files,
+            metadata_blocks, ratio);
+
+    double threshold = std::max(1.0, 
config::file_cache_leak_fs_to_meta_ratio_threshold);
+    if (ratio <= threshold) {
+        return;
+    }
+
+    LOG(WARNING) << fmt::format(
+            "file cache leak ratio {0:.4f} exceeds threshold {1:.4f}, start 
cleanup", ratio,
+            threshold);
+
+    cleanup_leaked_files(mgr, metadata_blocks);
+}
+
+void FSFileCacheStorage::cleanup_leaked_files(BlockFileCache* mgr, size_t 
metadata_block_count) {
+    if (mgr == nullptr) {
+        return;
+    }
+
+    const size_t batch_size = std::max<int32_t>(1, 
config::file_cache_leak_scan_batch_files);
+    const size_t pause_ms = std::max<int32_t>(0, 
config::file_cache_leak_scan_pause_ms);
+
+    std::vector<UInt128Wrapper> hash_keys;
+    {
+        std::lock_guard<std::mutex> lock(mgr->_mutex);
+        hash_keys.reserve(mgr->_files.size());
+        for (const auto& [hash, _] : mgr->_files) {
+            hash_keys.push_back(hash);
+        }
+    }
+
+    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> metadata_index;
+    if (metadata_block_count > 0) {
+        metadata_index.reserve(metadata_block_count * 2);
+    }
+
+    for (const auto& hash : hash_keys) {
+        auto offsets = snapshot_metadata_for_hash_offsets(mgr, hash);
+        for (const auto& offset : offsets) {
+            metadata_index.emplace(hash, offset);
+        }
+    }
+
+    struct OrphanCandidate {
+        std::string path;
+        UInt128Wrapper hash;
+        size_t offset;
+        std::string key_dir;
+    };
+
+    auto try_remove_empty_directory = [&](const std::string& dir) {
+        std::error_code ec;
+        std::filesystem::directory_iterator it(dir, ec);
+        if (ec || it != std::filesystem::directory_iterator()) {
+            return;
+        }
+        auto st = fs->delete_directory(dir);
+        if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+            LOG_WARNING("delete_directory {} failed", dir).error(st);
+        }
+    };
+
+    auto revalidate_with_metadata = [&](const UInt128Wrapper& hash, size_t 
offset) {
+        std::lock_guard<std::mutex> lock(mgr->_mutex);
+        auto it = mgr->_files.find(hash);
+        if (it == mgr->_files.end()) {
+            return true;
+        }
+        return !it->second.contains(offset);
+    };
+
+    std::vector<OrphanCandidate> candidates;
+    candidates.reserve(batch_size);
+
+    size_t removed_files = 0;
+    size_t examined_files = 0;
+
+    auto flush_candidates = [&]() {
+        if (candidates.empty()) {
+            return;
+        }
+        for (auto& candidate : candidates) {
+            if (!revalidate_with_metadata(candidate.hash, candidate.offset)) {
+                continue;
+            }
+            auto st = fs->delete_file(candidate.path);
+            if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
+                LOG_WARNING("delete orphan cache file {} failed", 
candidate.path).error(st);
+                continue;
+            }
+            removed_files++;
+            try_remove_empty_directory(candidate.key_dir);
+            auto prefix_dir = 
std::filesystem::path(candidate.key_dir).parent_path().string();
+            try_remove_empty_directory(prefix_dir);
+        }
+        candidates.clear();
+        if (pause_ms > 0) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(pause_ms));
+        }
+    };
+
+    std::error_code ec;
+    std::filesystem::directory_iterator prefix_it {_cache_base_path, ec};
+    if (ec) {
+        LOG(WARNING) << "Leak scan failed to list cache directory: " << 
_cache_base_path
+                     << ", error: " << ec.message();
+        return;
     }
 
-    // Estimate file count based on average file size
-    // Assuming average file size of 1MB for cache blocks
-    const uintmax_t average_file_size = 1024 * 1024; // 1MB
-    size_t estimated_file_count = total_size / average_file_size;
+    for (; prefix_it != std::filesystem::directory_iterator(); ++prefix_it) {
+        if (!prefix_it->is_directory()) {
+            continue;
+        }
+        std::string prefix_name = prefix_it->path().filename().native();
+        if (prefix_name == "meta") {
+            continue;
+        }
+        if (prefix_name.size() != KEY_PREFIX_LENGTH) {
+            continue;
+        }
+
+        std::filesystem::directory_iterator key_it {prefix_it->path(), ec};
+        if (ec) {
+            LOG(WARNING) << "Leak scan failed to list prefix " << 
prefix_it->path().native()
+                         << ", error: " << ec.message();
+            continue;
+        }
+
+        for (; key_it != std::filesystem::directory_iterator(); ++key_it) {
+            if (!key_it->is_directory()) {
+                continue;
+            }
+            auto key_with_suffix = key_it->path().filename().native();
+            auto delim_pos = key_with_suffix.find('_');
+            if (delim_pos == std::string::npos) {
+                continue;
+            }
+
+            UInt128Wrapper hash;
+            try {
+                hash = UInt128Wrapper(vectorized::unhex_uint<uint128_t>(
+                        key_with_suffix.substr(0, delim_pos).c_str()));
+            } catch (...) {
+                LOG(WARNING) << "Leak scan failed to parse hash from " << 
key_with_suffix;

Review Comment:
   better print e.what()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to