This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 1741dc695a1 [fix](filecache) self-heal stale DOWNLOADED entries on 
local NOT_FOUND (#60977) (#61205)
1741dc695a1 is described below

commit 1741dc695a10d3705a1a2f439ab4552d977f1f9c
Author: zhengyu <[email protected]>
AuthorDate: Sat Mar 14 00:12:58 2026 +0800

    [fix](filecache) self-heal stale DOWNLOADED entries on local NOT_FOUND 
(#60977) (#61205)
    
    Problem:
    In a rare restart window, BE can rebuild file-cache metadata in memory
    while
    the corresponding cache files are not yet durable on disk. If that
    metadata is
    also restored via LRU dump/load, blocks may appear as DOWNLOADED even
    though
    the local files are missing. Subsequent reads then produce
    false-positive cache
    hits, fail on local read, and repeatedly fall back to S3. This preserves
      correctness but causes avoidable cache thrashing and latency jitter.
    
    Root cause:
    The read path treated DOWNLOADED as a valid local hit source and fell
    back to
    remote reads on failure, but it did not actively invalidate stale
    metadata when
      the local cache file was gone.
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ---------
    
    Signed-off-by: zhengyu <[email protected]>
---
 be/src/io/cache/cached_remote_file_reader.cpp |  15 +++
 be/test/io/cache/block_file_cache_test.cpp    | 185 ++++++++++++++++++++++++++
 2 files changed, 200 insertions(+)

diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index bcddd63e7be..c2c51ac4518 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -79,6 +79,8 @@ bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes(
 bvar::Adder<uint64_t> 
g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes");
 bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes(
         "cached_remote_reader_cache_indirect_total_bytes");
+bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found(
+        "cached_remote_reader_self_heal_on_not_found");
 bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window(
         "cached_remote_reader_indirect_bytes_1min_window", 
&g_read_cache_indirect_bytes, 60);
 bvar::Window<bvar::Adder<uint64_t>> 
g_read_cache_indirect_total_bytes_1min_window(
@@ -473,6 +475,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
 
     size_t current_offset = offset;
     size_t end_offset = offset + bytes_req - 1;
+    bool need_self_heal = false;
     *bytes_read = 0;
     for (auto& block : holder.file_blocks) {
         if (current_offset > end_offset) {
@@ -527,6 +530,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                 }
             }
             if (!st || block_state != FileBlock::State::DOWNLOADED) {
+                if (block_state == FileBlock::State::DOWNLOADED && 
st.is<ErrorCode::NOT_FOUND>()) {
+                    need_self_heal = true;
+                    g_read_cache_self_heal_on_not_found << 1;
+                    LOG_EVERY_N(WARNING, 100)
+                            << "Cache block file is missing, will self-heal by 
clearing cache "
+                               "hash. "
+                            << "path=" << path().native() << ", hash=" << 
_cache_hash.to_string()
+                            << ", offset=" << left << ", err=" << st.msg();
+                }
                 LOG(WARNING) << "Read data failed from file cache downloaded 
by others. err="
                              << st.msg() << ", block state=" << block_state;
                 size_t bytes_read {0};
@@ -544,6 +556,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
         *bytes_read += read_size;
         current_offset = right + 1;
     }
+    if (need_self_heal && _cache != nullptr) {
+        _cache->remove_if_cached_async(_cache_hash);
+    }
     g_read_cache_indirect_bytes << indirect_read_bytes;
     g_read_cache_indirect_total_bytes << *bytes_read;
 
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index a34b29b0050..333619a0bf8 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3943,6 +3943,191 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_error_handle) {
     FileCacheFactory::instance()->_capacity = 0;
 }
 
+extern bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found;
+
+TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_self_heal_on_downloaded_not_found) {
+    bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+    config::enable_read_cache_file_directly = false;
+    Defer reset_direct_read {
+            [&] { config::enable_read_cache_file_directly = 
origin_enable_direct_read; }};
+
+    std::string cache_base_path =
+            caches_dir / 
"cached_remote_reader_self_heal_on_downloaded_not_found" / "";
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+    for (int i = 0; i < 100; i++) {
+        if (cache->get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    CachedRemoteFileReader reader(local_reader, opts);
+
+    uint64_t before_self_heal = 
g_read_cache_self_heal_on_not_found.get_value();
+
+    std::string buffer(64_kb, '\0');
+    IOContext io_ctx;
+    FileCacheStatistics stats;
+    io_ctx.file_cache_stats = &stats;
+    size_t bytes_read = 0;
+    ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+    auto key = io::BlockFileCache::hash("tmp_file");
+    {
+        io::CacheContext inspect_ctx;
+        ReadStatistics inspect_stats;
+        inspect_ctx.stats = &inspect_stats;
+        inspect_ctx.cache_type = io::FileCacheType::NORMAL;
+        auto inspect_holder = cache->get_or_set(key, 0, 64_kb, inspect_ctx);
+        auto inspect_blocks = fromHolder(inspect_holder);
+        ASSERT_EQ(inspect_blocks.size(), 1);
+        ASSERT_EQ(inspect_blocks[0]->state(), 
io::FileBlock::State::DOWNLOADED);
+        std::string cache_file = inspect_blocks[0]->get_cache_file();
+        ASSERT_TRUE(fs::exists(cache_file));
+        ASSERT_TRUE(global_local_filesystem()->delete_file(cache_file).ok());
+        ASSERT_FALSE(fs::exists(cache_file));
+    }
+
+    ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+    bool self_healed = false;
+    for (int i = 0; i < 100; ++i) {
+        io::CacheContext verify_ctx;
+        ReadStatistics verify_stats;
+        verify_ctx.stats = &verify_stats;
+        verify_ctx.cache_type = io::FileCacheType::NORMAL;
+        auto verify_holder = cache->get_or_set(key, 0, 64_kb, verify_ctx);
+        auto verify_blocks = fromHolder(verify_holder);
+        if (verify_blocks.size() == 1 && verify_blocks[0]->state() == 
io::FileBlock::State::EMPTY) {
+            self_healed = true;
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    EXPECT_TRUE(self_healed);
+    EXPECT_EQ(g_read_cache_self_heal_on_not_found.get_value(), 
before_self_heal + 1);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
+TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_no_self_heal_on_non_not_found_error) {
+    bool origin_enable_direct_read = config::enable_read_cache_file_directly;
+    config::enable_read_cache_file_directly = false;
+    Defer reset_direct_read {
+            [&] { config::enable_read_cache_file_directly = 
origin_enable_direct_read; }};
+
+    std::string cache_base_path =
+            caches_dir / 
"cached_remote_reader_no_self_heal_on_non_not_found_error" / "";
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+    for (int i = 0; i < 100; i++) {
+        if (cache->get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    CachedRemoteFileReader reader(local_reader, opts);
+
+    std::string buffer(64_kb, '\0');
+    IOContext io_ctx;
+    FileCacheStatistics stats;
+    io_ctx.file_cache_stats = &stats;
+    size_t bytes_read = 0;
+    ASSERT_TRUE(reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+
+    uint64_t before_self_heal = 
g_read_cache_self_heal_on_not_found.get_value();
+    auto* sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    Defer defer {[&] {
+        sp->clear_call_back("LocalFileReader::read_at_impl");
+        sp->disable_processing();
+    }};
+    sp->set_call_back("LocalFileReader::read_at_impl", [&](auto&& values) {
+        std::pair<Status, bool>* pair = try_any_cast<std::pair<Status, 
bool>*>(values.back());
+        pair->first = Status::IOError("inject io error for cache read");
+        pair->second = true;
+    });
+
+    auto st = reader.read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx);
+    ASSERT_FALSE(st.ok());
+    EXPECT_EQ(g_read_cache_self_heal_on_not_found.get_value(), 
before_self_heal);
+
+    sp->clear_call_back("LocalFileReader::read_at_impl");
+    sp->disable_processing();
+
+    io::CacheContext verify_ctx;
+    ReadStatistics verify_stats;
+    verify_ctx.stats = &verify_stats;
+    verify_ctx.cache_type = io::FileCacheType::NORMAL;
+    auto key = io::BlockFileCache::hash("tmp_file");
+    auto verify_holder = cache->get_or_set(key, 0, 64_kb, verify_ctx);
+    auto verify_blocks = fromHolder(verify_holder);
+    ASSERT_EQ(verify_blocks.size(), 1);
+    EXPECT_EQ(verify_blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) {
     std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_init" / "";
     if (fs::exists(cache_base_path)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to