This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new bee8e3edb47 branch-3.0: [fix](cloud) fix file cache types priority
order #51463 (#51603)
bee8e3edb47 is described below
commit bee8e3edb47f623d0fe7b60987fa452c109a3fd6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 10:35:16 2025 +0800
branch-3.0: [fix](cloud) fix file cache types priority order #51463 (#51603)
Cherry-picked from #51463
Signed-off-by: zhengyu <[email protected]>
Co-authored-by: zhengyu <[email protected]>
---
be/src/io/cache/file_cache_common.h | 8 +-
be/test/io/cache/block_file_cache_test.cpp | 153 +++++++++++++++++++++++++++++
2 files changed, 157 insertions(+), 4 deletions(-)
diff --git a/be/src/io/cache/file_cache_common.h
b/be/src/io/cache/file_cache_common.h
index 25df07b5ddf..6e9396fb11a 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -128,13 +128,13 @@ FileCacheSettings get_file_cache_settings(size_t
capacity, size_t max_query_cach
struct CacheContext {
CacheContext(const IOContext* io_context) {
- if (io_context->is_index_data) {
+ if (io_context->expiration_time != 0) {
+ cache_type = FileCacheType::TTL;
+ expiration_time = io_context->expiration_time;
+ } else if (io_context->is_index_data) {
cache_type = FileCacheType::INDEX;
} else if (io_context->is_disposable) {
cache_type = FileCacheType::DISPOSABLE;
- } else if (io_context->expiration_time != 0) {
- cache_type = FileCacheType::TTL;
- expiration_time = io_context->expiration_time;
} else {
cache_type = FileCacheType::NORMAL;
}
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index d0546787026..0d6883e4c8b 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -7752,4 +7752,157 @@ TEST_F(BlockFileCacheTest,
test_upgrade_cache_dir_version) {
}
}
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_ttl_index) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.query_id = query_id;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ BlockFileCache* cache =
FileCacheFactory::instance()->get_by_path(cache_base_path);
+
+ for (int i = 0; i < 100; i++) {
+ if (cache->get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ CachedRemoteFileReader reader(local_reader, opts);
+ auto key = io::BlockFileCache::hash("tmp_file");
+ EXPECT_EQ(reader._cache_hash, key);
+ EXPECT_EQ(local_reader->path().native(), reader.path().native());
+ EXPECT_EQ(local_reader->size(), reader.size());
+ EXPECT_FALSE(reader.closed());
+ EXPECT_EQ(local_reader->path().native(),
reader.get_remote_reader()->path().native());
+ {
+ std::string buffer;
+ buffer.resize(64_kb);
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ io_ctx.is_index_data = true;
+ int64_t cur_time = UnixSeconds();
+ io_ctx.expiration_time = cur_time + 120;
+ size_t bytes_read {0};
+ EXPECT_TRUE(
+ reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+ LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size;
+ LOG(INFO) << "index:" << cache->_index_queue.cache_size;
+ LOG(INFO) << "normal:" << cache->_normal_queue.cache_size;
+ LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size;
+ EXPECT_EQ(cache->_ttl_queue.cache_size, 1048576);
+ EXPECT_EQ(cache->_index_queue.cache_size, 0);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) {
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.query_id = query_id;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ BlockFileCache* cache =
FileCacheFactory::instance()->get_by_path(cache_base_path);
+
+ for (int i = 0; i < 100; i++) {
+ if (cache->get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ CachedRemoteFileReader reader(local_reader, opts);
+ auto key = io::BlockFileCache::hash("tmp_file");
+ EXPECT_EQ(reader._cache_hash, key);
+ EXPECT_EQ(local_reader->path().native(), reader.path().native());
+ EXPECT_EQ(local_reader->size(), reader.size());
+ EXPECT_FALSE(reader.closed());
+ EXPECT_EQ(local_reader->path().native(),
reader.get_remote_reader()->path().native());
+
+ {
+ std::string buffer;
+ buffer.resize(64_kb);
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ io_ctx.is_index_data = true;
+ // int64_t cur_time = UnixSeconds();
+ // io_ctx.expiration_time = cur_time + 120;
+ size_t bytes_read {0};
+ EXPECT_TRUE(
+ reader.read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+ LOG(INFO) << "ttl:" << cache->_ttl_queue.cache_size;
+ LOG(INFO) << "index:" << cache->_index_queue.cache_size;
+ LOG(INFO) << "normal:" << cache->_normal_queue.cache_size;
+ LOG(INFO) << "disp:" << cache->_disposable_queue.cache_size;
+ EXPECT_EQ(cache->_ttl_queue.cache_size, 0);
+ EXPECT_EQ(cache->_index_queue.cache_size, 1048576);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]