This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 550bd2dc825 branch-4.1: [fix](filecache) add async lru update
machanism and fix partial hit in cache reader (pick #61083) (#61812)
550bd2dc825 is described below
commit 550bd2dc825cde095a4d9031446b59c006c5b884
Author: zhengyu <[email protected]>
AuthorDate: Sat Mar 28 22:43:56 2026 +0800
branch-4.1: [fix](filecache) add async lru update machanism and fix partial
hit in cache reader (pick #61083) (#61812)
- CachedRemoteFileReader::read_at_impl has incorrect initialization of
subsequent traversal start point and count after direct partial hit
(causes incorrect fallback / extra overhead)
- Proposed LRU ordering async update solution: Decoupled LRU update from
query read operations, slightly reducing read lock latency and laying
groundwork for subsequent lock splitting
- Established performance unit tests for cache lock
Signed-off-by: zhengyu <[email protected]>
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---------
Signed-off-by: zhengyu <[email protected]>
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/io/cache/block_file_cache.cpp | 23 +-
be/src/io/cache/cached_remote_file_reader.cpp | 6 +-
.../io/cache/block_file_cache_test_lru_dump.cpp | 107 ++++++
.../cached_remote_file_reader_lock_wait_test.cpp | 427 +++++++++++++++++++++
.../io/cache/cached_remote_file_reader_test.cpp | 124 ++++++
7 files changed, 683 insertions(+), 6 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 01c27c6e5f2..e919b644918 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1193,6 +1193,7 @@ DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
+DEFINE_mBool(enable_file_cache_async_touch_on_get_or_set, "false");
DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "180000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7ebf0091ca8..ab36e426f81 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1237,6 +1237,7 @@ DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
+DECLARE_mBool(enable_file_cache_async_touch_on_get_or_set);
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index a6e80b0766e..f6f641a0a25 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -516,7 +516,8 @@ void BlockFileCache::use_cell(const FileBlockCell& cell,
FileBlocks* result, boo
auto& queue = get_queue(cell.file_block->cache_type());
/// Move to the end of the queue. The iterator remains valid.
- if (cell.queue_iterator && move_iter_flag) {
+ if (!config::enable_file_cache_async_touch_on_get_or_set &&
cell.queue_iterator &&
+ move_iter_flag) {
queue.move_to_end(*cell.queue_iterator, cache_lock);
_lru_recorder->record_queue_event(cell.file_block->cache_type(),
CacheLRULogType::MOVETOBACK,
cell.file_block->_key.hash,
@@ -817,11 +818,13 @@ FileBlocksHolder BlockFileCache::get_or_set(const
UInt128Wrapper& hash, size_t o
DCHECK(stats != nullptr);
MonotonicStopWatch sw;
sw.start();
- std::lock_guard cache_lock(_mutex);
- stats->lock_wait_timer += sw.elapsed_time();
FileBlocks file_blocks;
+ std::vector<FileBlockSPtr> need_update_lru_blocks;
+ const bool async_touch_on_get_or_set =
config::enable_file_cache_async_touch_on_get_or_set;
int64_t duration = 0;
{
+ std::lock_guard cache_lock(_mutex);
+ stats->lock_wait_timer += sw.elapsed_time();
SCOPED_RAW_TIMER(&duration);
/// Get all blocks which intersect with the given range.
{
@@ -842,6 +845,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const
UInt128Wrapper& hash, size_t o
if (!context.is_warmup) {
*_no_warmup_num_read_blocks << file_blocks.size();
}
+ if (async_touch_on_get_or_set) {
+ need_update_lru_blocks.reserve(file_blocks.size());
+ }
for (auto& block : file_blocks) {
size_t block_size = block->range().size();
*_total_read_size_metrics << block_size;
@@ -851,9 +857,20 @@ FileBlocksHolder BlockFileCache::get_or_set(const
UInt128Wrapper& hash, size_t o
if (!context.is_warmup) {
*_no_warmup_num_hit_blocks << 1;
}
+ if (async_touch_on_get_or_set &&
+ need_to_move(block->cache_type(), context.cache_type)) {
+ need_update_lru_blocks.emplace_back(block);
+ }
}
}
}
+
+ if (async_touch_on_get_or_set) {
+ for (auto& block : need_update_lru_blocks) {
+ add_need_update_lru_block(std::move(block));
+ }
+ }
+
*_get_or_set_latency_us << (duration / 1000);
return FileBlocksHolder(std::move(file_blocks));
}
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index e0970ab0cb9..8e4efaed97d 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -476,17 +476,17 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
}
}
- size_t current_offset = offset;
+ size_t current_offset = offset + already_read;
size_t end_offset = offset + bytes_req - 1;
bool need_self_heal = false;
- *bytes_read = 0;
+ *bytes_read = already_read;
for (auto& block : holder.file_blocks) {
if (current_offset > end_offset) {
break;
}
size_t left = block->range().left;
size_t right = block->range().right;
- if (right < offset) {
+ if (right < current_offset) {
continue;
}
size_t read_size =
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index e8a373568ef..9c31b0b1430 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -596,4 +596,111 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_order_check) {
FileCacheFactory::instance()->_capacity = 0;
}
+TEST_F(BlockFileCacheTest, get_or_set_hit_order_check) {
+ std::string cache_base_path = caches_dir /
"cache_get_or_set_hit_order_check" / "";
+
+ const auto old_async_touch =
config::enable_file_cache_async_touch_on_get_or_set;
+ const auto old_update_interval_ms =
config::file_cache_background_block_lru_update_interval_ms;
+ const auto old_update_qps_limit =
config::file_cache_background_block_lru_update_qps_limit;
+ Defer defer {[old_async_touch, old_update_interval_ms,
old_update_qps_limit] {
+ config::enable_file_cache_async_touch_on_get_or_set = old_async_touch;
+ config::file_cache_background_block_lru_update_interval_ms =
old_update_interval_ms;
+ config::file_cache_background_block_lru_update_qps_limit =
old_update_qps_limit;
+ }};
+
+ config::enable_file_cache_async_touch_on_get_or_set = true;
+ config::file_cache_background_block_lru_update_interval_ms = 200;
+ config::file_cache_background_block_lru_update_qps_limit = 100000;
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+
+ io::CacheContext context;
+ ReadStatistics stats;
+ context.stats = &stats;
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.query_id = query_id;
+
+ auto key = io::BlockFileCache::hash("get_or_set_hit_order_check_key");
+ constexpr size_t kBlockSize = 1024 * 1024;
+ std::vector<size_t> offsets {0, kBlockSize, 2 * kBlockSize};
+
+ for (size_t offset : offsets) {
+ auto holder = cache.get_or_set(key, offset, kBlockSize, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ ASSERT_EQ(blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
+ }
+
+ std::vector<size_t> initial_offsets;
+ for (auto it = cache._normal_queue.begin(); it !=
cache._normal_queue.end(); ++it) {
+ initial_offsets.push_back(it->offset);
+ }
+ ASSERT_EQ(initial_offsets.size(), 3);
+ ASSERT_EQ(initial_offsets[0], 0);
+ ASSERT_EQ(initial_offsets[1], kBlockSize);
+ ASSERT_EQ(initial_offsets[2], 2 * kBlockSize);
+
+ auto holder = cache.get_or_set(key, 0, kBlockSize, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ ASSERT_EQ(blocks[0]->state(), io::FileBlock::State::DOWNLOADED);
+
+ std::vector<size_t> before_updated_offsets;
+ for (auto it = cache._normal_queue.begin(); it !=
cache._normal_queue.end(); ++it) {
+ before_updated_offsets.push_back(it->offset);
+ }
+ ASSERT_EQ(before_updated_offsets.size(), 3);
+ ASSERT_EQ(before_updated_offsets[0], 0);
+ ASSERT_EQ(before_updated_offsets[1], kBlockSize);
+ ASSERT_EQ(before_updated_offsets[2], 2 * kBlockSize);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ 2 * config::file_cache_background_block_lru_update_interval_ms));
+
+ std::vector<size_t> updated_offsets;
+ for (auto it = cache._normal_queue.begin(); it !=
cache._normal_queue.end(); ++it) {
+ updated_offsets.push_back(it->offset);
+ }
+ ASSERT_EQ(updated_offsets.size(), 3);
+ std::vector<size_t> sorted_before = before_updated_offsets;
+ std::vector<size_t> sorted_after = updated_offsets;
+ std::sort(sorted_before.begin(), sorted_before.end());
+ std::sort(sorted_after.begin(), sorted_after.end());
+ ASSERT_EQ(sorted_after, sorted_before);
+ ASSERT_EQ(updated_offsets.back(), 0);
+
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
} // namespace doris::io
diff --git a/be/test/io/cache/cached_remote_file_reader_lock_wait_test.cpp
b/be/test/io/cache/cached_remote_file_reader_lock_wait_test.cpp
new file mode 100644
index 00000000000..6b788534167
--- /dev/null
+++ b/be/test/io/cache/cached_remote_file_reader_lock_wait_test.cpp
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <numeric>
+#include <random>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/cached_remote_file_reader.h"
+#include "io/cache/file_cache_common.h"
+#include "io/fs/file_reader.h"
+#include "runtime/exec_env.h"
+#include "runtime/thread_context.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris::io {
+
+namespace {
+
+class MockPatternFileReader final : public FileReader {
+public:
+ MockPatternFileReader(std::string path, size_t file_size, uint32_t
pattern_id)
+ : _path(std::move(path)), _size(file_size),
_pattern_id(pattern_id) {}
+
+ Status close() override {
+ _closed = true;
+ return Status::OK();
+ }
+
+ const Path& path() const override { return _path; }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ int64_t mtime() const override { return 0; }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* /*io_ctx*/) override {
+ if (offset >= _size) {
+ *bytes_read = 0;
+ return Status::OK();
+ }
+ const size_t readable = std::min(result.size, _size - offset);
+ memset(result.data, static_cast<int>('a' + (_pattern_id % 26)),
readable);
+ *bytes_read = readable;
+ return Status::OK();
+ }
+
+private:
+ Path _path;
+ size_t _size;
+ uint32_t _pattern_id;
+ bool _closed {false};
+};
+
+struct LockWaitSummary {
+ int64_t total_ns {0};
+ int64_t max_ns {0};
+ int64_t p50_ns {0};
+ int64_t p95_ns {0};
+ int64_t p99_ns {0};
+ double avg_ns {0.0};
+ size_t non_zero_samples {0};
+};
+
+int64_t get_percentile_value(const std::vector<int64_t>& sorted_values, double
percentile) {
+ if (sorted_values.empty()) {
+ return 0;
+ }
+ const double rank = percentile * static_cast<double>(sorted_values.size()
- 1);
+ return sorted_values[static_cast<size_t>(rank)];
+}
+
+LockWaitSummary summarize_lock_wait(std::vector<int64_t>* values) {
+ LockWaitSummary summary;
+ if (values == nullptr || values->empty()) {
+ return summary;
+ }
+ std::sort(values->begin(), values->end());
+ summary.total_ns = std::accumulate(values->begin(), values->end(), int64_t
{0});
+ summary.max_ns = values->back();
+ summary.p50_ns = get_percentile_value(*values, 0.50);
+ summary.p95_ns = get_percentile_value(*values, 0.95);
+ summary.p99_ns = get_percentile_value(*values, 0.99);
+ summary.avg_ns = static_cast<double>(summary.total_ns) /
static_cast<double>(values->size());
+ summary.non_zero_samples =
+ std::count_if(values->begin(), values->end(), [](int64_t v) {
return v > 0; });
+ return summary;
+}
+
+struct LockWaitWorkloadConfig {
+ size_t file_count {2048};
+ size_t file_size {64 * 1024};
+ size_t warmup_read_bytes {4 * 1024};
+ size_t stress_read_bytes {4 * 1024};
+ size_t ops_per_thread {2000};
+ size_t thread_count {16};
+ uint32_t seed_base {20260228};
+ std::string file_prefix {"default"};
+};
+
+struct LockWaitWorkloadResult {
+ LockWaitSummary summary;
+ size_t populated_keys {0};
+ size_t warmup_failed_reads {0};
+ size_t failed_reads {0};
+ size_t sample_count {0};
+};
+
+size_t calc_thread_count() {
+ const size_t hw_threads =
+ std::thread::hardware_concurrency() == 0 ? 16 :
std::thread::hardware_concurrency();
+ return std::min<size_t>(48, std::max<size_t>(16, hw_threads));
+}
+
+} // namespace
+
+class CachedRemoteFileReaderLockWaitTest : public testing::Test {
+public:
+ static void SetUpTestSuite() {
+ auto* exec_env = ExecEnv::GetInstance();
+ if (exec_env->file_cache_factory() == nullptr) {
+ _suite_factory = std::make_unique<FileCacheFactory>();
+ exec_env->_file_cache_factory = _suite_factory.get();
+ _owns_factory = true;
+ }
+ if (!exec_env->_file_cache_open_fd_cache) {
+ exec_env->_file_cache_open_fd_cache = std::make_unique<FDCache>();
+ _owns_fd_cache = true;
+ }
+ }
+
+ static void TearDownTestSuite() {
+ auto* factory = ExecEnv::GetInstance()->file_cache_factory();
+ if (factory != nullptr) {
+ factory->clear_file_caches(true);
+ factory->_caches.clear();
+ factory->_path_to_cache.clear();
+ factory->_capacity = 0;
+ }
+ if (_owns_factory) {
+ ExecEnv::GetInstance()->_file_cache_factory = nullptr;
+ _suite_factory.reset();
+ _owns_factory = false;
+ }
+ if (_owns_fd_cache) {
+ ExecEnv::GetInstance()->_file_cache_open_fd_cache.reset(nullptr);
+ _owns_fd_cache = false;
+ }
+ }
+
+ void SetUp() override { recreate_memory_cache(); }
+
+ void TearDown() override {
+ auto* factory = FileCacheFactory::instance();
+ if (factory != nullptr) {
+ factory->clear_file_caches(true);
+ factory->_caches.clear();
+ factory->_path_to_cache.clear();
+ factory->_capacity = 0;
+ }
+ }
+
+protected:
+ void recreate_memory_cache() {
+ auto* factory = FileCacheFactory::instance();
+ ASSERT_NE(factory, nullptr);
+ factory->clear_file_caches(true);
+ factory->_caches.clear();
+ factory->_path_to_cache.clear();
+ factory->_capacity = 0;
+
+ constexpr size_t kCapacityBytes = 512ULL * 1024ULL * 1024ULL;
+ auto settings = get_file_cache_settings(kCapacityBytes, 0,
DEFAULT_NORMAL_PERCENT,
+ DEFAULT_DISPOSABLE_PERCENT,
DEFAULT_INDEX_PERCENT,
+ DEFAULT_TTL_PERCENT, "memory");
+ ASSERT_TRUE(factory->create_file_cache("memory", settings).ok());
+ _cache = factory->get_by_path(std::string("memory"));
+ ASSERT_NE(_cache, nullptr);
+ for (int i = 0; i < 100; ++i) {
+ if (_cache->get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(_cache->get_async_open_success());
+ }
+
+ LockWaitWorkloadResult run_lock_wait_workload(const
LockWaitWorkloadConfig& config) const {
+ DCHECK(_cache != nullptr);
+ DCHECK(config.file_count > 0);
+ DCHECK(config.file_size >= config.stress_read_bytes);
+ DCHECK(config.thread_count > 0);
+
+ LockWaitWorkloadResult result;
+
+ std::vector<std::shared_ptr<CachedRemoteFileReader>> readers;
+ readers.reserve(config.file_count);
+ std::vector<UInt128Wrapper> cache_keys;
+ cache_keys.reserve(config.file_count);
+
+ FileReaderOptions opts;
+ opts.cache_type = FileCachePolicy::FILE_BLOCK_CACHE;
+ opts.is_doris_table = true;
+
+ for (size_t i = 0; i < config.file_count; ++i) {
+ std::string path =
+ "/mock/" + config.file_prefix + "_file_" +
std::to_string(i) + ".dat";
+ auto remote_reader = std::make_shared<MockPatternFileReader>(path,
config.file_size, i);
+ auto cached_reader =
std::make_shared<CachedRemoteFileReader>(remote_reader, opts);
+ cache_keys.emplace_back(
+
BlockFileCache::hash(remote_reader->path().filename().native()));
+ readers.emplace_back(std::move(cached_reader));
+ }
+
+ std::vector<char> warmup_buffer(config.warmup_read_bytes, 0);
+ for (auto& reader : readers) {
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read = 0;
+ Status st = reader->read_at(0, Slice(warmup_buffer.data(),
warmup_buffer.size()),
+ &bytes_read, &io_ctx);
+ if (!st.ok() || bytes_read != warmup_buffer.size()) {
+ ++result.warmup_failed_reads;
+ }
+ }
+
+ for (const auto& hash : cache_keys) {
+ if (!_cache->get_blocks_by_key(hash).empty()) {
+ ++result.populated_keys;
+ }
+ }
+
+ std::atomic<size_t> ready_threads {0};
+ std::atomic<bool> start_flag {false};
+ std::atomic<size_t> failed_reads {0};
+ std::vector<std::vector<int64_t>> thread_samples(config.thread_count);
+ std::vector<std::thread> workers;
+ workers.reserve(config.thread_count);
+
+ for (size_t tid = 0; tid < config.thread_count; ++tid) {
+ workers.emplace_back([&, tid] {
+ SCOPED_INIT_THREAD_CONTEXT();
+ std::mt19937 gen(static_cast<uint32_t>(tid) +
config.seed_base);
+ std::uniform_int_distribution<size_t> reader_dist(0,
config.file_count - 1);
+ std::uniform_int_distribution<size_t> offset_dist(
+ 0, config.file_size - config.stress_read_bytes);
+ std::vector<char> buffer(config.stress_read_bytes, 0);
+ auto& samples = thread_samples[tid];
+ samples.reserve(config.ops_per_thread);
+
+ ready_threads.fetch_add(1, std::memory_order_release);
+ while (!start_flag.load(std::memory_order_acquire)) {
+ std::this_thread::yield();
+ }
+
+ for (size_t op = 0; op < config.ops_per_thread; ++op) {
+ const size_t reader_idx = reader_dist(gen);
+ const size_t offset = offset_dist(gen);
+
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read = 0;
+ Status st = readers[reader_idx]->read_at(
+ offset, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx);
+ if (!st.ok() || bytes_read != config.stress_read_bytes) {
+ failed_reads.fetch_add(1, std::memory_order_relaxed);
+ continue;
+ }
+ samples.push_back(stats.lock_wait_timer);
+ }
+ });
+ }
+
+ while (ready_threads.load(std::memory_order_acquire) <
config.thread_count) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ start_flag.store(true, std::memory_order_release);
+
+ for (auto& worker : workers) {
+ if (worker.joinable()) {
+ worker.join();
+ }
+ }
+
+ result.failed_reads = failed_reads.load(std::memory_order_relaxed);
+
+ std::vector<int64_t> merged_samples;
+ merged_samples.reserve(config.thread_count * config.ops_per_thread);
+ for (auto& per_thread : thread_samples) {
+ merged_samples.insert(merged_samples.end(), per_thread.begin(),
per_thread.end());
+ }
+
+ result.sample_count = merged_samples.size();
+ result.summary = summarize_lock_wait(&merged_samples);
+ return result;
+ }
+
+ inline static std::unique_ptr<FileCacheFactory> _suite_factory;
+ inline static bool _owns_factory = false;
+ inline static bool _owns_fd_cache = false;
+ BlockFileCache* _cache {nullptr};
+};
+
+TEST_F(CachedRemoteFileReaderLockWaitTest,
+
HighConcurrencyReadViaCachedRemoteFileReaderLockWaitManyFilesMemoryCache) {
+ constexpr size_t kOpsPerThread = 20000;
+
+ const bool original_direct_read = config::enable_read_cache_file_directly;
+ const bool original_async_touch =
config::enable_file_cache_async_touch_on_get_or_set;
+ Defer defer {[original_direct_read, original_async_touch] {
+ config::enable_read_cache_file_directly = original_direct_read;
+ config::enable_file_cache_async_touch_on_get_or_set =
original_async_touch;
+ }};
+ config::enable_read_cache_file_directly = false;
+ config::enable_file_cache_async_touch_on_get_or_set = false;
+
+ LockWaitWorkloadConfig workload;
+ workload.ops_per_thread = kOpsPerThread;
+ workload.thread_count = calc_thread_count();
+ workload.file_prefix = "perf_lock_wait";
+
+ LockWaitWorkloadResult result = run_lock_wait_workload(workload);
+
+ EXPECT_EQ(result.warmup_failed_reads, 0);
+ EXPECT_EQ(result.populated_keys, workload.file_count);
+ EXPECT_EQ(result.failed_reads, 0);
+ EXPECT_EQ(result.sample_count, workload.thread_count *
workload.ops_per_thread);
+
+ LOG(INFO) << "cached_remote_file_reader lock wait summary: samples=" <<
result.sample_count
+ << " total_ns=" << result.summary.total_ns << " avg_ns=" <<
result.summary.avg_ns
+ << " p50_ns=" << result.summary.p50_ns << " p95_ns=" <<
result.summary.p95_ns
+ << " p99_ns=" << result.summary.p99_ns << " max_ns=" <<
result.summary.max_ns
+ << " non_zero_samples=" << result.summary.non_zero_samples;
+
+ EXPECT_GT(result.summary.total_ns, 0);
+ EXPECT_GT(result.summary.non_zero_samples, 0);
+}
+
+TEST_F(CachedRemoteFileReaderLockWaitTest,
AsyncTouchOnGetOrSetReducesLockWait) {
+ const bool original_direct_read = config::enable_read_cache_file_directly;
+ const bool original_async_touch =
config::enable_file_cache_async_touch_on_get_or_set;
+ Defer defer {[original_direct_read, original_async_touch] {
+ config::enable_read_cache_file_directly = original_direct_read;
+ config::enable_file_cache_async_touch_on_get_or_set =
original_async_touch;
+ }};
+
+ config::enable_read_cache_file_directly = false;
+
+ LockWaitWorkloadConfig workload;
+ workload.file_count = 1536;
+ workload.ops_per_thread = 12000;
+ workload.thread_count = calc_thread_count();
+
+ config::enable_file_cache_async_touch_on_get_or_set = false;
+ workload.file_prefix = "sync_touch";
+ LockWaitWorkloadResult sync_result = run_lock_wait_workload(workload);
+
+ EXPECT_EQ(sync_result.warmup_failed_reads, 0);
+ EXPECT_EQ(sync_result.populated_keys, workload.file_count);
+ EXPECT_EQ(sync_result.failed_reads, 0);
+ EXPECT_EQ(sync_result.sample_count, workload.thread_count *
workload.ops_per_thread);
+
+ recreate_memory_cache();
+
+ config::enable_file_cache_async_touch_on_get_or_set = true;
+ workload.file_prefix = "async_touch";
+ LockWaitWorkloadResult async_result = run_lock_wait_workload(workload);
+
+ EXPECT_EQ(async_result.warmup_failed_reads, 0);
+ EXPECT_EQ(async_result.populated_keys, workload.file_count);
+ EXPECT_EQ(async_result.failed_reads, 0);
+ EXPECT_EQ(async_result.sample_count, workload.thread_count *
workload.ops_per_thread);
+
+ LOG(INFO) << "sync_touch lock wait: total_ns=" <<
sync_result.summary.total_ns
+ << " avg_ns=" << sync_result.summary.avg_ns
+ << " p95_ns=" << sync_result.summary.p95_ns
+ << " p99_ns=" << sync_result.summary.p99_ns
+ << " non_zero_samples=" << sync_result.summary.non_zero_samples;
+ LOG(INFO) << "async_touch lock wait: total_ns=" <<
async_result.summary.total_ns
+ << " avg_ns=" << async_result.summary.avg_ns
+ << " p95_ns=" << async_result.summary.p95_ns
+ << " p99_ns=" << async_result.summary.p99_ns
+ << " non_zero_samples=" << async_result.summary.non_zero_samples;
+
+ EXPECT_GT(sync_result.summary.total_ns, 0);
+ EXPECT_GT(async_result.summary.total_ns, 0);
+ EXPECT_GT(sync_result.summary.non_zero_samples, 0);
+ EXPECT_GT(async_result.summary.non_zero_samples, 0);
+ EXPECT_LT(async_result.summary.total_ns, sync_result.summary.total_ns);
+ EXPECT_LT(async_result.summary.p95_ns, sync_result.summary.p95_ns);
+}
+
+} // namespace doris::io
diff --git a/be/test/io/cache/cached_remote_file_reader_test.cpp
b/be/test/io/cache/cached_remote_file_reader_test.cpp
new file mode 100644
index 00000000000..39da1bfb6fd
--- /dev/null
+++ b/be/test/io/cache/cached_remote_file_reader_test.cpp
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "block_file_cache_test_common.h"
+
+namespace doris::io {
+
+TEST_F(BlockFileCacheTest,
+
direct_partial_hit_with_downloaded_remainder_should_not_read_remote_again) {
+ std::string local_cache_base_path =
+ caches_dir / "cache_direct_partial_downloaded_no_remote_read" / "";
+ config::enable_read_cache_file_directly = true;
+ if (fs::exists(local_cache_base_path)) {
+ fs::remove_all(local_cache_base_path);
+ }
+ fs::create_directories(local_cache_base_path);
+
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.query_id = query_id;
+ ASSERT_TRUE(
+
FileCacheFactory::instance()->create_file_cache(local_cache_base_path,
settings).ok());
+
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+
+ {
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&local_reader));
+ auto seed_reader =
std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+ std::string buffer(64_kb, '\0');
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read {0};
+ ASSERT_TRUE(
+ seed_reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+ }
+
+ FileReaderSPtr stale_local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&stale_local_reader));
+ auto stale_reader =
std::make_shared<CachedRemoteFileReader>(stale_local_reader, opts);
+ EXPECT_EQ(stale_reader->_cache_file_readers.size(), 1);
+
+ {
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file,
&local_reader));
+ auto updater_reader =
std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+ std::string buffer(64_kb, '\0');
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read {0};
+ ASSERT_TRUE(updater_reader
+ ->read_at(1_mb + 100, Slice(buffer.data(),
buffer.size()), &bytes_read,
+ &io_ctx)
+ .ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(64_kb, '1'), buffer);
+ }
+
+ EXPECT_EQ(stale_reader->_cache_file_readers.size(), 1);
+
+ std::string cross_block_buffer(64_kb, '\0');
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read {0};
+ ASSERT_TRUE(stale_reader
+ ->read_at(1_mb - 100,
+ Slice(cross_block_buffer.data(),
cross_block_buffer.size()),
+ &bytes_read, &io_ctx)
+ .ok());
+ EXPECT_EQ(bytes_read, 64_kb);
+ EXPECT_EQ(std::string(100, '0') + std::string(64_kb - 100, '1'),
cross_block_buffer);
+ EXPECT_EQ(stats.bytes_read_from_remote, 0);
+
+ EXPECT_TRUE(stale_reader->close().ok());
+ EXPECT_TRUE(stale_reader->closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(local_cache_base_path)) {
+ fs::remove_all(local_cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+ config::enable_read_cache_file_directly = false;
+}
+
+} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]