This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 550bd2dc825 branch-4.1: [fix](filecache) add async lru update 
machanism and fix partial hit in cache reader (pick #61083) (#61812)
550bd2dc825 is described below

commit 550bd2dc825cde095a4d9031446b59c006c5b884
Author: zhengyu <[email protected]>
AuthorDate: Sat Mar 28 22:43:56 2026 +0800

    branch-4.1: [fix](filecache) add async lru update machanism and fix partial 
hit in cache reader (pick #61083) (#61812)
    
    - CachedRemoteFileReader::read_at_impl has incorrect initialization of
    subsequent traversal start point and count after direct partial hit
    (causes incorrect fallback / extra overhead)
    - Proposed LRU ordering async update solution: Decoupled LRU update from
    query read operations, slightly reducing read lock latency and laying
    groundwork for subsequent lock splitting
    - Established performance unit tests for cache lock
    
    Signed-off-by: zhengyu <[email protected]>
    
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ---------
    
    Signed-off-by: zhengyu <[email protected]>
---
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/io/cache/block_file_cache.cpp               |  23 +-
 be/src/io/cache/cached_remote_file_reader.cpp      |   6 +-
 .../io/cache/block_file_cache_test_lru_dump.cpp    | 107 ++++++
 .../cached_remote_file_reader_lock_wait_test.cpp   | 427 +++++++++++++++++++++
 .../io/cache/cached_remote_file_reader_test.cpp    | 124 ++++++
 7 files changed, 683 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 01c27c6e5f2..e919b644918 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1193,6 +1193,7 @@ DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
 DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
 DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
+DEFINE_mBool(enable_file_cache_async_touch_on_get_or_set, "false");
 DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
 DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
 DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "180000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7ebf0091ca8..ab36e426f81 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1237,6 +1237,7 @@ DECLARE_mInt64(file_cache_remove_block_qps_limit);
 DECLARE_mInt64(file_cache_background_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
 DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
+DECLARE_mBool(enable_file_cache_async_touch_on_get_or_set);
 DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 DECLARE_mInt64(file_cache_background_monitor_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index a6e80b0766e..f6f641a0a25 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -516,7 +516,8 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, 
FileBlocks* result, boo
 
     auto& queue = get_queue(cell.file_block->cache_type());
     /// Move to the end of the queue. The iterator remains valid.
-    if (cell.queue_iterator && move_iter_flag) {
+    if (!config::enable_file_cache_async_touch_on_get_or_set && 
cell.queue_iterator &&
+        move_iter_flag) {
         queue.move_to_end(*cell.queue_iterator, cache_lock);
         _lru_recorder->record_queue_event(cell.file_block->cache_type(),
                                           CacheLRULogType::MOVETOBACK, 
cell.file_block->_key.hash,
@@ -817,11 +818,13 @@ FileBlocksHolder BlockFileCache::get_or_set(const 
UInt128Wrapper& hash, size_t o
     DCHECK(stats != nullptr);
     MonotonicStopWatch sw;
     sw.start();
-    std::lock_guard cache_lock(_mutex);
-    stats->lock_wait_timer += sw.elapsed_time();
     FileBlocks file_blocks;
+    std::vector<FileBlockSPtr> need_update_lru_blocks;
+    const bool async_touch_on_get_or_set = 
config::enable_file_cache_async_touch_on_get_or_set;
     int64_t duration = 0;
     {
+        std::lock_guard cache_lock(_mutex);
+        stats->lock_wait_timer += sw.elapsed_time();
         SCOPED_RAW_TIMER(&duration);
         /// Get all blocks which intersect with the given range.
         {
@@ -842,6 +845,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const 
UInt128Wrapper& hash, size_t o
         if (!context.is_warmup) {
             *_no_warmup_num_read_blocks << file_blocks.size();
         }
+        if (async_touch_on_get_or_set) {
+            need_update_lru_blocks.reserve(file_blocks.size());
+        }
         for (auto& block : file_blocks) {
             size_t block_size = block->range().size();
             *_total_read_size_metrics << block_size;
@@ -851,9 +857,20 @@ FileBlocksHolder BlockFileCache::get_or_set(const 
UInt128Wrapper& hash, size_t o
                 if (!context.is_warmup) {
                     *_no_warmup_num_hit_blocks << 1;
                 }
+                if (async_touch_on_get_or_set &&
+                    need_to_move(block->cache_type(), context.cache_type)) {
+                    need_update_lru_blocks.emplace_back(block);
+                }
             }
         }
     }
+
+    if (async_touch_on_get_or_set) {
+        for (auto& block : need_update_lru_blocks) {
+            add_need_update_lru_block(std::move(block));
+        }
+    }
+
     *_get_or_set_latency_us << (duration / 1000);
     return FileBlocksHolder(std::move(file_blocks));
 }
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index e0970ab0cb9..8e4efaed97d 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -476,17 +476,17 @@ Status CachedRemoteFileReader::read_at_impl(size_t 
offset, Slice result, size_t*
         }
     }
 
-    size_t current_offset = offset;
+    size_t current_offset = offset + already_read;
     size_t end_offset = offset + bytes_req - 1;
     bool need_self_heal = false;
-    *bytes_read = 0;
+    *bytes_read = already_read;
     for (auto& block : holder.file_blocks) {
         if (current_offset > end_offset) {
             break;
         }
         size_t left = block->range().left;
         size_t right = block->range().right;
-        if (right < offset) {
+        if (right < current_offset) {
             continue;
         }
         size_t read_size =
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp 
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index e8a373568ef..9c31b0b1430 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -596,4 +596,111 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_direct_read_order_check) {
     FileCacheFactory::instance()->_capacity = 0;
 }
 
+TEST_F(BlockFileCacheTest, get_or_set_hit_order_check) {
+    std::string cache_base_path = caches_dir / 
"cache_get_or_set_hit_order_check" / "";
+
+    const auto old_async_touch = 
config::enable_file_cache_async_touch_on_get_or_set;
+    const auto old_update_interval_ms = 
config::file_cache_background_block_lru_update_interval_ms;
+    const auto old_update_qps_limit = 
config::file_cache_background_block_lru_update_qps_limit;
+    Defer defer {[old_async_touch, old_update_interval_ms, 
old_update_qps_limit] {
+        config::enable_file_cache_async_touch_on_get_or_set = old_async_touch;
+        config::file_cache_background_block_lru_update_interval_ms = 
old_update_interval_ms;
+        config::file_cache_background_block_lru_update_qps_limit = 
old_update_qps_limit;
+    }};
+
+    config::enable_file_cache_async_touch_on_get_or_set = true;
+    config::file_cache_background_block_lru_update_interval_ms = 200;
+    config::file_cache_background_block_lru_update_qps_limit = 100000;
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+
+    io::BlockFileCache cache(cache_base_path, settings);
+    ASSERT_TRUE(cache.initialize());
+    for (int i = 0; i < 100; i++) {
+        if (cache.get_async_open_success()) {
+            break;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    ASSERT_TRUE(cache.get_async_open_success());
+
+    io::CacheContext context;
+    ReadStatistics stats;
+    context.stats = &stats;
+    context.cache_type = io::FileCacheType::NORMAL;
+    context.query_id = query_id;
+
+    auto key = io::BlockFileCache::hash("get_or_set_hit_order_check_key");
+    constexpr size_t kBlockSize = 1024 * 1024;
+    std::vector<size_t> offsets {0, kBlockSize, 2 * kBlockSize};
+
+    for (size_t offset : offsets) {
+        auto holder = cache.get_or_set(key, offset, kBlockSize, context);
+        auto blocks = fromHolder(holder);
+        ASSERT_EQ(blocks.size(), 1);
+        ASSERT_TRUE(blocks[0]->get_or_set_downloader() == 
io::FileBlock::get_caller_id());
+        download_into_memory(blocks[0]);
+        ASSERT_EQ(blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
+    }
+
+    std::vector<size_t> initial_offsets;
+    for (auto it = cache._normal_queue.begin(); it != 
cache._normal_queue.end(); ++it) {
+        initial_offsets.push_back(it->offset);
+    }
+    ASSERT_EQ(initial_offsets.size(), 3);
+    ASSERT_EQ(initial_offsets[0], 0);
+    ASSERT_EQ(initial_offsets[1], kBlockSize);
+    ASSERT_EQ(initial_offsets[2], 2 * kBlockSize);
+
+    auto holder = cache.get_or_set(key, 0, kBlockSize, context);
+    auto blocks = fromHolder(holder);
+    ASSERT_EQ(blocks.size(), 1);
+    ASSERT_EQ(blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
+
+    std::vector<size_t> before_updated_offsets;
+    for (auto it = cache._normal_queue.begin(); it != 
cache._normal_queue.end(); ++it) {
+        before_updated_offsets.push_back(it->offset);
+    }
+    ASSERT_EQ(before_updated_offsets.size(), 3);
+    ASSERT_EQ(before_updated_offsets[0], 0);
+    ASSERT_EQ(before_updated_offsets[1], kBlockSize);
+    ASSERT_EQ(before_updated_offsets[2], 2 * kBlockSize);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_block_lru_update_interval_ms));
+
+    std::vector<size_t> updated_offsets;
+    for (auto it = cache._normal_queue.begin(); it != 
cache._normal_queue.end(); ++it) {
+        updated_offsets.push_back(it->offset);
+    }
+    ASSERT_EQ(updated_offsets.size(), 3);
+    std::vector<size_t> sorted_before = before_updated_offsets;
+    std::vector<size_t> sorted_after = updated_offsets;
+    std::sort(sorted_before.begin(), sorted_before.end());
+    std::sort(sorted_after.begin(), sorted_after.end());
+    ASSERT_EQ(sorted_after, sorted_before);
+    ASSERT_EQ(updated_offsets.back(), 0);
+
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
 } // namespace doris::io
diff --git a/be/test/io/cache/cached_remote_file_reader_lock_wait_test.cpp 
b/be/test/io/cache/cached_remote_file_reader_lock_wait_test.cpp
new file mode 100644
index 00000000000..6b788534167
--- /dev/null
+++ b/be/test/io/cache/cached_remote_file_reader_lock_wait_test.cpp
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <numeric>
+#include <random>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/cached_remote_file_reader.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_reader.h"
+#include "runtime/exec_env.h"
+#include "runtime/thread_context.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris::io {
+
+namespace {
+
+class MockPatternFileReader final : public FileReader {
+public:
+    MockPatternFileReader(std::string path, size_t file_size, uint32_t 
pattern_id)
+            : _path(std::move(path)), _size(file_size), 
_pattern_id(pattern_id) {}
+
+    Status close() override {
+        _closed = true;
+        return Status::OK();
+    }
+
+    const Path& path() const override { return _path; }
+
+    size_t size() const override { return _size; }
+
+    bool closed() const override { return _closed; }
+
+    int64_t mtime() const override { return 0; }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const IOContext* /*io_ctx*/) override {
+        if (offset >= _size) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+        const size_t readable = std::min(result.size, _size - offset);
+        memset(result.data, static_cast<int>('a' + (_pattern_id % 26)), 
readable);
+        *bytes_read = readable;
+        return Status::OK();
+    }
+
+private:
+    Path _path;
+    size_t _size;
+    uint32_t _pattern_id;
+    bool _closed {false};
+};
+
+struct LockWaitSummary {
+    int64_t total_ns {0};
+    int64_t max_ns {0};
+    int64_t p50_ns {0};
+    int64_t p95_ns {0};
+    int64_t p99_ns {0};
+    double avg_ns {0.0};
+    size_t non_zero_samples {0};
+};
+
+int64_t get_percentile_value(const std::vector<int64_t>& sorted_values, double 
percentile) {
+    if (sorted_values.empty()) {
+        return 0;
+    }
+    const double rank = percentile * static_cast<double>(sorted_values.size() 
- 1);
+    return sorted_values[static_cast<size_t>(rank)];
+}
+
+LockWaitSummary summarize_lock_wait(std::vector<int64_t>* values) {
+    LockWaitSummary summary;
+    if (values == nullptr || values->empty()) {
+        return summary;
+    }
+    std::sort(values->begin(), values->end());
+    summary.total_ns = std::accumulate(values->begin(), values->end(), int64_t 
{0});
+    summary.max_ns = values->back();
+    summary.p50_ns = get_percentile_value(*values, 0.50);
+    summary.p95_ns = get_percentile_value(*values, 0.95);
+    summary.p99_ns = get_percentile_value(*values, 0.99);
+    summary.avg_ns = static_cast<double>(summary.total_ns) / 
static_cast<double>(values->size());
+    summary.non_zero_samples =
+            std::count_if(values->begin(), values->end(), [](int64_t v) { 
return v > 0; });
+    return summary;
+}
+
+struct LockWaitWorkloadConfig {
+    size_t file_count {2048};
+    size_t file_size {64 * 1024};
+    size_t warmup_read_bytes {4 * 1024};
+    size_t stress_read_bytes {4 * 1024};
+    size_t ops_per_thread {2000};
+    size_t thread_count {16};
+    uint32_t seed_base {20260228};
+    std::string file_prefix {"default"};
+};
+
+struct LockWaitWorkloadResult {
+    LockWaitSummary summary;
+    size_t populated_keys {0};
+    size_t warmup_failed_reads {0};
+    size_t failed_reads {0};
+    size_t sample_count {0};
+};
+
+size_t calc_thread_count() {
+    const size_t hw_threads =
+            std::thread::hardware_concurrency() == 0 ? 16 : 
std::thread::hardware_concurrency();
+    return std::min<size_t>(48, std::max<size_t>(16, hw_threads));
+}
+
+} // namespace
+
+class CachedRemoteFileReaderLockWaitTest : public testing::Test {
+public:
+    static void SetUpTestSuite() {
+        auto* exec_env = ExecEnv::GetInstance();
+        if (exec_env->file_cache_factory() == nullptr) {
+            _suite_factory = std::make_unique<FileCacheFactory>();
+            exec_env->_file_cache_factory = _suite_factory.get();
+            _owns_factory = true;
+        }
+        if (!exec_env->_file_cache_open_fd_cache) {
+            exec_env->_file_cache_open_fd_cache = std::make_unique<FDCache>();
+            _owns_fd_cache = true;
+        }
+    }
+
+    static void TearDownTestSuite() {
+        auto* factory = ExecEnv::GetInstance()->file_cache_factory();
+        if (factory != nullptr) {
+            factory->clear_file_caches(true);
+            factory->_caches.clear();
+            factory->_path_to_cache.clear();
+            factory->_capacity = 0;
+        }
+        if (_owns_factory) {
+            ExecEnv::GetInstance()->_file_cache_factory = nullptr;
+            _suite_factory.reset();
+            _owns_factory = false;
+        }
+        if (_owns_fd_cache) {
+            ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr);
+            _owns_fd_cache = false;
+        }
+    }
+
+    void SetUp() override { recreate_memory_cache(); }
+
+    void TearDown() override {
+        auto* factory = FileCacheFactory::instance();
+        if (factory != nullptr) {
+            factory->clear_file_caches(true);
+            factory->_caches.clear();
+            factory->_path_to_cache.clear();
+            factory->_capacity = 0;
+        }
+    }
+
+protected:
+    void recreate_memory_cache() {
+        auto* factory = FileCacheFactory::instance();
+        ASSERT_NE(factory, nullptr);
+        factory->clear_file_caches(true);
+        factory->_caches.clear();
+        factory->_path_to_cache.clear();
+        factory->_capacity = 0;
+
+        constexpr size_t kCapacityBytes = 512ULL * 1024ULL * 1024ULL;
+        auto settings = get_file_cache_settings(kCapacityBytes, 0, 
DEFAULT_NORMAL_PERCENT,
+                                                DEFAULT_DISPOSABLE_PERCENT, 
DEFAULT_INDEX_PERCENT,
+                                                DEFAULT_TTL_PERCENT, "memory");
+        ASSERT_TRUE(factory->create_file_cache("memory", settings).ok());
+        _cache = factory->get_by_path(std::string("memory"));
+        ASSERT_NE(_cache, nullptr);
+        for (int i = 0; i < 100; ++i) {
+            if (_cache->get_async_open_success()) {
+                break;
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+        ASSERT_TRUE(_cache->get_async_open_success());
+    }
+
+    LockWaitWorkloadResult run_lock_wait_workload(const 
LockWaitWorkloadConfig& config) const {
+        DCHECK(_cache != nullptr);
+        DCHECK(config.file_count > 0);
+        DCHECK(config.file_size >= config.stress_read_bytes);
+        DCHECK(config.thread_count > 0);
+
+        LockWaitWorkloadResult result;
+
+        std::vector<std::shared_ptr<CachedRemoteFileReader>> readers;
+        readers.reserve(config.file_count);
+        std::vector<UInt128Wrapper> cache_keys;
+        cache_keys.reserve(config.file_count);
+
+        FileReaderOptions opts;
+        opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
+        opts.is_doris_table = true;
+
+        for (size_t i = 0; i < config.file_count; ++i) {
+            std::string path =
+                    "/mock/" + config.file_prefix + "_file_" + 
std::to_string(i) + ".dat";
+            auto remote_reader = std::make_shared<MockPatternFileReader>(path, 
config.file_size, i);
+            auto cached_reader = 
std::make_shared<CachedRemoteFileReader>(remote_reader, opts);
+            cache_keys.emplace_back(
+                    
BlockFileCache::hash(remote_reader->path().filename().native()));
+            readers.emplace_back(std::move(cached_reader));
+        }
+
+        std::vector<char> warmup_buffer(config.warmup_read_bytes, 0);
+        for (auto& reader : readers) {
+            IOContext io_ctx;
+            FileCacheStatistics stats;
+            io_ctx.file_cache_stats = &stats;
+            size_t bytes_read = 0;
+            Status st = reader->read_at(0, Slice(warmup_buffer.data(), 
warmup_buffer.size()),
+                                        &bytes_read, &io_ctx);
+            if (!st.ok() || bytes_read != warmup_buffer.size()) {
+                ++result.warmup_failed_reads;
+            }
+        }
+
+        for (const auto& hash : cache_keys) {
+            if (!_cache->get_blocks_by_key(hash).empty()) {
+                ++result.populated_keys;
+            }
+        }
+
+        std::atomic<size_t> ready_threads {0};
+        std::atomic<bool> start_flag {false};
+        std::atomic<size_t> failed_reads {0};
+        std::vector<std::vector<int64_t>> thread_samples(config.thread_count);
+        std::vector<std::thread> workers;
+        workers.reserve(config.thread_count);
+
+        for (size_t tid = 0; tid < config.thread_count; ++tid) {
+            workers.emplace_back([&, tid] {
+                SCOPED_INIT_THREAD_CONTEXT();
+                std::mt19937 gen(static_cast<uint32_t>(tid) + 
config.seed_base);
+                std::uniform_int_distribution<size_t> reader_dist(0, 
config.file_count - 1);
+                std::uniform_int_distribution<size_t> offset_dist(
+                        0, config.file_size - config.stress_read_bytes);
+                std::vector<char> buffer(config.stress_read_bytes, 0);
+                auto& samples = thread_samples[tid];
+                samples.reserve(config.ops_per_thread);
+
+                ready_threads.fetch_add(1, std::memory_order_release);
+                while (!start_flag.load(std::memory_order_acquire)) {
+                    std::this_thread::yield();
+                }
+
+                for (size_t op = 0; op < config.ops_per_thread; ++op) {
+                    const size_t reader_idx = reader_dist(gen);
+                    const size_t offset = offset_dist(gen);
+
+                    IOContext io_ctx;
+                    FileCacheStatistics stats;
+                    io_ctx.file_cache_stats = &stats;
+                    size_t bytes_read = 0;
+                    Status st = readers[reader_idx]->read_at(
+                            offset, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx);
+                    if (!st.ok() || bytes_read != config.stress_read_bytes) {
+                        failed_reads.fetch_add(1, std::memory_order_relaxed);
+                        continue;
+                    }
+                    samples.push_back(stats.lock_wait_timer);
+                }
+            });
+        }
+
+        while (ready_threads.load(std::memory_order_acquire) < 
config.thread_count) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+        start_flag.store(true, std::memory_order_release);
+
+        for (auto& worker : workers) {
+            if (worker.joinable()) {
+                worker.join();
+            }
+        }
+
+        result.failed_reads = failed_reads.load(std::memory_order_relaxed);
+
+        std::vector<int64_t> merged_samples;
+        merged_samples.reserve(config.thread_count * config.ops_per_thread);
+        for (auto& per_thread : thread_samples) {
+            merged_samples.insert(merged_samples.end(), per_thread.begin(), 
per_thread.end());
+        }
+
+        result.sample_count = merged_samples.size();
+        result.summary = summarize_lock_wait(&merged_samples);
+        return result;
+    }
+
+    inline static std::unique_ptr<FileCacheFactory> _suite_factory;
+    inline static bool _owns_factory = false;
+    inline static bool _owns_fd_cache = false;
+    BlockFileCache* _cache {nullptr};
+};
+
+TEST_F(CachedRemoteFileReaderLockWaitTest,
+       
HighConcurrencyReadViaCachedRemoteFileReaderLockWaitManyFilesMemoryCache) {
+    constexpr size_t kOpsPerThread = 20000;
+
+    const bool original_direct_read = config::enable_read_cache_file_directly;
+    const bool original_async_touch = 
config::enable_file_cache_async_touch_on_get_or_set;
+    Defer defer {[original_direct_read, original_async_touch] {
+        config::enable_read_cache_file_directly = original_direct_read;
+        config::enable_file_cache_async_touch_on_get_or_set = 
original_async_touch;
+    }};
+    config::enable_read_cache_file_directly = false;
+    config::enable_file_cache_async_touch_on_get_or_set = false;
+
+    LockWaitWorkloadConfig workload;
+    workload.ops_per_thread = kOpsPerThread;
+    workload.thread_count = calc_thread_count();
+    workload.file_prefix = "perf_lock_wait";
+
+    LockWaitWorkloadResult result = run_lock_wait_workload(workload);
+
+    EXPECT_EQ(result.warmup_failed_reads, 0);
+    EXPECT_EQ(result.populated_keys, workload.file_count);
+    EXPECT_EQ(result.failed_reads, 0);
+    EXPECT_EQ(result.sample_count, workload.thread_count * 
workload.ops_per_thread);
+
+    LOG(INFO) << "cached_remote_file_reader lock wait summary: samples=" << 
result.sample_count
+              << " total_ns=" << result.summary.total_ns << " avg_ns=" << 
result.summary.avg_ns
+              << " p50_ns=" << result.summary.p50_ns << " p95_ns=" << 
result.summary.p95_ns
+              << " p99_ns=" << result.summary.p99_ns << " max_ns=" << 
result.summary.max_ns
+              << " non_zero_samples=" << result.summary.non_zero_samples;
+
+    EXPECT_GT(result.summary.total_ns, 0);
+    EXPECT_GT(result.summary.non_zero_samples, 0);
+}
+
+TEST_F(CachedRemoteFileReaderLockWaitTest, 
AsyncTouchOnGetOrSetReducesLockWait) {
+    const bool original_direct_read = config::enable_read_cache_file_directly;
+    const bool original_async_touch = 
config::enable_file_cache_async_touch_on_get_or_set;
+    Defer defer {[original_direct_read, original_async_touch] {
+        config::enable_read_cache_file_directly = original_direct_read;
+        config::enable_file_cache_async_touch_on_get_or_set = 
original_async_touch;
+    }};
+
+    config::enable_read_cache_file_directly = false;
+
+    LockWaitWorkloadConfig workload;
+    workload.file_count = 1536;
+    workload.ops_per_thread = 12000;
+    workload.thread_count = calc_thread_count();
+
+    config::enable_file_cache_async_touch_on_get_or_set = false;
+    workload.file_prefix = "sync_touch";
+    LockWaitWorkloadResult sync_result = run_lock_wait_workload(workload);
+
+    EXPECT_EQ(sync_result.warmup_failed_reads, 0);
+    EXPECT_EQ(sync_result.populated_keys, workload.file_count);
+    EXPECT_EQ(sync_result.failed_reads, 0);
+    EXPECT_EQ(sync_result.sample_count, workload.thread_count * 
workload.ops_per_thread);
+
+    recreate_memory_cache();
+
+    config::enable_file_cache_async_touch_on_get_or_set = true;
+    workload.file_prefix = "async_touch";
+    LockWaitWorkloadResult async_result = run_lock_wait_workload(workload);
+
+    EXPECT_EQ(async_result.warmup_failed_reads, 0);
+    EXPECT_EQ(async_result.populated_keys, workload.file_count);
+    EXPECT_EQ(async_result.failed_reads, 0);
+    EXPECT_EQ(async_result.sample_count, workload.thread_count * 
workload.ops_per_thread);
+
+    LOG(INFO) << "sync_touch lock wait: total_ns=" << 
sync_result.summary.total_ns
+              << " avg_ns=" << sync_result.summary.avg_ns
+              << " p95_ns=" << sync_result.summary.p95_ns
+              << " p99_ns=" << sync_result.summary.p99_ns
+              << " non_zero_samples=" << sync_result.summary.non_zero_samples;
+    LOG(INFO) << "async_touch lock wait: total_ns=" << 
async_result.summary.total_ns
+              << " avg_ns=" << async_result.summary.avg_ns
+              << " p95_ns=" << async_result.summary.p95_ns
+              << " p99_ns=" << async_result.summary.p99_ns
+              << " non_zero_samples=" << async_result.summary.non_zero_samples;
+
+    EXPECT_GT(sync_result.summary.total_ns, 0);
+    EXPECT_GT(async_result.summary.total_ns, 0);
+    EXPECT_GT(sync_result.summary.non_zero_samples, 0);
+    EXPECT_GT(async_result.summary.non_zero_samples, 0);
+    EXPECT_LT(async_result.summary.total_ns, sync_result.summary.total_ns);
+    EXPECT_LT(async_result.summary.p95_ns, sync_result.summary.p95_ns);
+}
+
+} // namespace doris::io
diff --git a/be/test/io/cache/cached_remote_file_reader_test.cpp 
b/be/test/io/cache/cached_remote_file_reader_test.cpp
new file mode 100644
index 00000000000..39da1bfb6fd
--- /dev/null
+++ b/be/test/io/cache/cached_remote_file_reader_test.cpp
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "block_file_cache_test_common.h"
+
+namespace doris::io {
+
+TEST_F(BlockFileCacheTest,
+       
direct_partial_hit_with_downloaded_remainder_should_not_read_remote_again) {
+    std::string local_cache_base_path =
+            caches_dir / "cache_direct_partial_downloaded_no_remote_read" / "";
+    config::enable_read_cache_file_directly = true;
+    if (fs::exists(local_cache_base_path)) {
+        fs::remove_all(local_cache_base_path);
+    }
+    fs::create_directories(local_cache_base_path);
+
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.query_id = query_id;
+    ASSERT_TRUE(
+            
FileCacheFactory::instance()->create_file_cache(local_cache_base_path, 
settings).ok());
+
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+
+    {
+        FileReaderSPtr local_reader;
+        ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, 
&local_reader));
+        auto seed_reader = 
std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+        std::string buffer(64_kb, '\0');
+        IOContext io_ctx;
+        FileCacheStatistics stats;
+        io_ctx.file_cache_stats = &stats;
+        size_t bytes_read {0};
+        ASSERT_TRUE(
+                seed_reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                        .ok());
+        EXPECT_EQ(bytes_read, 64_kb);
+        EXPECT_EQ(std::string(64_kb, '0'), buffer);
+    }
+
+    FileReaderSPtr stale_local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, 
&stale_local_reader));
+    auto stale_reader = 
std::make_shared<CachedRemoteFileReader>(stale_local_reader, opts);
+    EXPECT_EQ(stale_reader->_cache_file_readers.size(), 1);
+
+    {
+        FileReaderSPtr local_reader;
+        ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, 
&local_reader));
+        auto updater_reader = 
std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+        std::string buffer(64_kb, '\0');
+        IOContext io_ctx;
+        FileCacheStatistics stats;
+        io_ctx.file_cache_stats = &stats;
+        size_t bytes_read {0};
+        ASSERT_TRUE(updater_reader
+                            ->read_at(1_mb + 100, Slice(buffer.data(), 
buffer.size()), &bytes_read,
+                                      &io_ctx)
+                            .ok());
+        EXPECT_EQ(bytes_read, 64_kb);
+        EXPECT_EQ(std::string(64_kb, '1'), buffer);
+    }
+
+    EXPECT_EQ(stale_reader->_cache_file_readers.size(), 1);
+
+    std::string cross_block_buffer(64_kb, '\0');
+    IOContext io_ctx;
+    FileCacheStatistics stats;
+    io_ctx.file_cache_stats = &stats;
+    size_t bytes_read {0};
+    ASSERT_TRUE(stale_reader
+                        ->read_at(1_mb - 100,
+                                  Slice(cross_block_buffer.data(), 
cross_block_buffer.size()),
+                                  &bytes_read, &io_ctx)
+                        .ok());
+    EXPECT_EQ(bytes_read, 64_kb);
+    EXPECT_EQ(std::string(100, '0') + std::string(64_kb - 100, '1'), 
cross_block_buffer);
+    EXPECT_EQ(stats.bytes_read_from_remote, 0);
+
+    EXPECT_TRUE(stale_reader->close().ok());
+    EXPECT_TRUE(stale_reader->closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(local_cache_base_path)) {
+        fs::remove_all(local_cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+    config::enable_read_cache_file_directly = false;
+}
+
+} // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to