This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9dab3cbfd2f branch-3.1: [enhancement](filecache) fix
read_cache_file_directly #54503 (#56378)
9dab3cbfd2f is described below
commit 9dab3cbfd2f1a99c9a9dcd8c41dfd663dcf0e141
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Sep 25 17:40:41 2025 +0800
branch-3.1: [enhancement](filecache) fix read_cache_file_directly #54503
(#56378)
Cherry-picked from #54503
Signed-off-by: zhengyu <[email protected]>
Signed-off-by: freemandealer <[email protected]>
Co-authored-by: zhengyu <[email protected]>
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/io/cache/block_file_cache.cpp | 81 ++++++++--
be/src/io/cache/block_file_cache.h | 83 ++++++----
be/src/io/cache/cached_remote_file_reader.cpp | 53 ++++++-
be/src/io/cache/file_block.h | 8 +
be/test/io/cache/block_file_cache_test.cpp | 176 +++++++++++++++++++++
.../io/cache/block_file_cache_test_lru_dump.cpp | 99 ++++++++++++
8 files changed, 457 insertions(+), 47 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5f0d43ea54c..18116506856 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1138,6 +1138,8 @@
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
+DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
+DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 74ce6bb8ab9..7b76d436694 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1172,6 +1172,8 @@ DECLARE_mBool(enable_file_cache_adaptive_write);
DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
+DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
+DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index d69c3e3a07f..e7b050d5ede 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -230,6 +230,10 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
_cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
_recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
_cache_base_path.c_str(), "file_cache_recycle_keys_length");
+ _need_update_lru_blocks_length_recorder =
std::make_shared<bvar::LatencyRecorder>(
+ _cache_base_path.c_str(),
"file_cache_need_update_lru_blocks_length");
+ _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
+ _cache_base_path.c_str(),
"file_cache_update_lru_blocks_latency_us");
_ttl_gc_latency_us =
std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
"file_cache_ttl_gc_latency_us");
_shadow_queue_levenshtein_distance =
std::make_shared<bvar::LatencyRecorder>(
@@ -349,6 +353,8 @@ Status
BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
_cache_background_gc_thread =
std::thread(&BlockFileCache::run_background_gc, this);
_cache_background_evict_in_advance_thread =
std::thread(&BlockFileCache::run_background_evict_in_advance,
this);
+ _cache_background_block_lru_update_thread =
+ std::thread(&BlockFileCache::run_background_block_lru_update,
this);
// Initialize LRU dump thread and restore queues
_cache_background_lru_dump_thread =
std::thread(&BlockFileCache::run_background_lru_dump, this);
@@ -358,6 +364,21 @@ Status
BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
return Status::OK();
}
+void BlockFileCache::update_block_lru(FileBlockSPtr block,
+ std::lock_guard<std::mutex>& cache_lock)
{
+ FileBlockCell* cell = block->cell;
+ if (cell) {
+ if (cell->queue_iterator) {
+ auto& queue = get_queue(block->cache_type());
+ queue.move_to_end(*cell->queue_iterator, cache_lock);
+ _lru_recorder->record_queue_event(block->cache_type(),
CacheLRULogType::MOVETOBACK,
+ block->_key.hash,
block->_key.offset,
+ block->_block_range.size());
+ }
+ cell->update_atime();
+ }
+}
+
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result,
bool move_iter_flag,
std::lock_guard<std::mutex>& cache_lock) {
if (result) {
@@ -378,8 +399,8 @@ void BlockFileCache::use_cell(const FileBlockCell& cell,
FileBlocks* result, boo
template <class T>
requires IsXLock<T>
-BlockFileCache::FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper&
hash, size_t offset,
- T& /* cache_lock */) {
+FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t
offset,
+ T& /* cache_lock */) {
auto it = _files.find(hash);
if (it == _files.end()) {
return nullptr;
@@ -570,6 +591,15 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
return result;
}
+void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
+ bool ret = _need_update_lru_blocks.enqueue(block);
+ if (ret) [[likely]] {
+ *_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size_approx();
+ } else {
+ LOG_WARNING("Failed to push FileBlockSPtr to _need_update_lru_blocks");
+ }
+}
+
std::string BlockFileCache::clear_file_cache_async() {
LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
_lru_dumper->remove_lru_dump_files();
@@ -774,10 +804,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const
UInt128Wrapper& hash, size_t o
return FileBlocksHolder(std::move(file_blocks));
}
-BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper&
hash,
- const CacheContext&
context, size_t offset,
- size_t size,
FileBlock::State state,
-
std::lock_guard<std::mutex>& cache_lock) {
+FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const
CacheContext& context,
+ size_t offset, size_t size,
FileBlock::State state,
+ std::lock_guard<std::mutex>&
cache_lock) {
/// Create a file block cell and put it in `files` map by [hash][offset].
if (size == 0) {
return nullptr; /// Empty files are not cached.
@@ -1394,6 +1423,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T&
cache_lock, U& block_lo
auto type = file_block->cache_type();
auto expiration_time = file_block->expiration_time();
auto* cell = get_cell(hash, offset, cache_lock);
+ file_block->cell = nullptr;
DCHECK(cell);
DCHECK(cell->queue_iterator);
if (cell->queue_iterator) {
@@ -1493,9 +1523,9 @@ size_t
BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
return get_queue(cache_type).get_elements_num(cache_lock);
}
-BlockFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block,
- std::lock_guard<std::mutex>&
cache_lock)
+FileBlockCell::FileBlockCell(FileBlockSPtr file_block,
std::lock_guard<std::mutex>& cache_lock)
: file_block(file_block) {
+ file_block->cell = this;
/**
* Cell can be created with either DOWNLOADED or EMPTY file block's state.
* File block acquires DOWNLOADING state and creates LRUQueue iterator on
first
@@ -2014,6 +2044,37 @@ void BlockFileCache::run_background_evict_in_advance() {
}
}
+void BlockFileCache::run_background_block_lru_update() {
+ Thread::set_self_name("run_background_block_lru_update");
+ FileBlockSPtr block;
+ size_t batch_count = 0;
+ while (!_close) {
+ int64_t interval_ms =
config::file_cache_background_block_lru_update_interval_ms;
+ size_t batch_limit =
+ config::file_cache_background_block_lru_update_qps_limit *
interval_ms / 1000;
+ {
+ std::unique_lock close_lock(_close_mtx);
+ _close_cv.wait_for(close_lock,
std::chrono::milliseconds(interval_ms));
+ if (_close) {
+ break;
+ }
+ }
+
+ int64_t duration_ns = 0;
+ {
+ SCOPED_CACHE_LOCK(_mutex, this);
+ SCOPED_RAW_TIMER(&duration_ns);
+ while (batch_count < batch_limit &&
_need_update_lru_blocks.try_dequeue(block)) {
+ update_block_lru(block, cache_lock);
+ batch_count++;
+ }
+ }
+ *_update_lru_blocks_latency_us << (duration_ns / 1000);
+ *_need_update_lru_blocks_length_recorder <<
_need_update_lru_blocks.size_approx();
+ batch_count = 0;
+ }
+}
+
void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
uint64_t new_expiration_time) {
SCOPED_CACHE_LOCK(_mutex, this);
@@ -2211,10 +2272,8 @@ std::map<size_t, FileBlockSPtr>
BlockFileCache::get_blocks_by_key(const UInt128W
if (_files.contains(hash)) {
for (auto& [offset, cell] : _files[hash]) {
if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
+ cell.file_block->_owned_by_cached_reader = true;
offset_to_block.emplace(offset, cell.file_block);
- use_cell(cell, nullptr,
- need_to_move(cell.file_block->cache_type(),
FileCacheType::DISPOSABLE),
- cache_lock);
}
}
}
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index c0fe2cd2a09..d152e7403c0 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -79,6 +79,43 @@ class FSFileCacheStorage;
// The BlockFileCache is responsible for the management of the blocks
// The current strategies are lru and ttl.
+
+struct FileBlockCell {
+ friend class FileBlock;
+
+ FileBlockSPtr file_block;
+ /// Iterator is put here on first reservation attempt, if successful.
+ std::optional<LRUQueue::Iterator> queue_iterator;
+
+ mutable int64_t atime {0};
+ void update_atime() const {
+ atime = std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ }
+
+ /// Pointer to file block is always hold by the cache itself.
+ /// Apart from pointer in cache, it can be hold by cache users, when they
call
+ /// getorSet(), but cache users always hold it via FileBlocksHolder.
+ bool releasable() const {
+ return (file_block.use_count() == 1 ||
+ (file_block.use_count() == 2 &&
file_block->_owned_by_cached_reader));
+ }
+
+ size_t size() const { return file_block->_block_range.size(); }
+
+ FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>&
cache_lock);
+ FileBlockCell(FileBlockCell&& other) noexcept
+ : file_block(std::move(other.file_block)),
+ queue_iterator(other.queue_iterator),
+ atime(other.atime) {
+ file_block->cell = this;
+ }
+
+ FileBlockCell& operator=(const FileBlockCell&) = delete;
+ FileBlockCell(const FileBlockCell&) = delete;
+};
+
class BlockFileCache {
friend class FSFileCacheStorage;
friend class MemFileCacheStorage;
@@ -86,6 +123,7 @@ class BlockFileCache {
friend struct FileBlocksHolder;
friend class CacheLRUDumper;
friend class LRUQueueRecorder;
+ friend struct FileBlockCell;
public:
// hash the file_name to uint128
@@ -117,6 +155,9 @@ public:
if (_cache_background_lru_log_replay_thread.joinable()) {
_cache_background_lru_log_replay_thread.join();
}
+ if (_cache_background_block_lru_update_thread.joinable()) {
+ _cache_background_block_lru_update_thread.join();
+ }
}
/// Restore cache from local filesystem.
@@ -145,6 +186,11 @@ public:
FileBlocksHolder get_or_set(const UInt128Wrapper& hash, size_t offset,
size_t size,
CacheContext& context);
+ /**
+ * record blocks read directly by CachedRemoteFileReader
+ */
+ void add_need_update_lru_block(FileBlockSPtr block);
+
/**
* Clear all cached data for this cache instance async
*
@@ -290,35 +336,6 @@ public:
}
private:
- struct FileBlockCell {
- FileBlockSPtr file_block;
- /// Iterator is put here on first reservation attempt, if successful.
- std::optional<LRUQueue::Iterator> queue_iterator;
-
- mutable int64_t atime {0};
- void update_atime() const {
- atime = std::chrono::duration_cast<std::chrono::seconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- }
-
- /// Pointer to file block is always hold by the cache itself.
- /// Apart from pointer in cache, it can be hold by cache users, when
they call
- /// getorSet(), but cache users always hold it via FileBlocksHolder.
- bool releasable() const { return file_block.use_count() == 1; }
-
- size_t size() const { return file_block->_block_range.size(); }
-
- FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>&
cache_lock);
- FileBlockCell(FileBlockCell&& other) noexcept
- : file_block(std::move(other.file_block)),
- queue_iterator(other.queue_iterator),
- atime(other.atime) {}
-
- FileBlockCell& operator=(const FileBlockCell&) = delete;
- FileBlockCell(const FileBlockCell&) = delete;
- };
-
LRUQueue& get_queue(FileCacheType type);
const LRUQueue& get_queue(FileCacheType type) const;
@@ -339,6 +356,8 @@ private:
Status initialize_unlocked(std::lock_guard<std::mutex>& cache_lock);
+ void update_block_lru(FileBlockSPtr block, std::lock_guard<std::mutex>&
cache_lock);
+
void use_cell(const FileBlockCell& cell, FileBlocks* result, bool
not_need_move,
std::lock_guard<std::mutex>& cache_lock);
@@ -397,6 +416,7 @@ private:
void run_background_lru_dump();
void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
void run_background_evict_in_advance();
+ void run_background_block_lru_update();
bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
std::vector<FileCacheType> other_cache_types,
@@ -449,6 +469,7 @@ private:
std::thread _cache_background_evict_in_advance_thread;
std::thread _cache_background_lru_dump_thread;
std::thread _cache_background_lru_log_replay_thread;
+ std::thread _cache_background_block_lru_update_thread;
std::atomic_bool _async_open_done {false};
// disk space or inode is less than the specified value
bool _disk_resource_limit_mode {false};
@@ -524,6 +545,8 @@ private:
std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
+ std::shared_ptr<bvar::LatencyRecorder> _update_lru_blocks_latency_us;
+ std::shared_ptr<bvar::LatencyRecorder>
_need_update_lru_blocks_length_recorder;
std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;
std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
@@ -533,6 +556,8 @@ private:
// so join this async load thread first
std::unique_ptr<FileCacheStorage> _storage;
std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
+
+ moodycamel::ConcurrentQueue<FileBlockSPtr> _need_update_lru_blocks;
};
} // namespace doris::io
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index b89bdcf2f6d..a9b87734222 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -47,6 +47,22 @@ bvar::LatencyRecorder
g_skip_cache_num("cached_remote_reader_skip_cache_num");
bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
"cached_remote_reader_skip_local_cache_io_sum_bytes");
+bvar::Adder<uint64_t>
g_read_cache_direct_whole_num("cached_remote_reader_cache_direct_whole_num");
+bvar::Adder<uint64_t> g_read_cache_direct_partial_num(
+ "cached_remote_reader_cache_direct_partial_num");
+bvar::Adder<uint64_t>
g_read_cache_indirect_num("cached_remote_reader_cache_indirect_num");
+bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes(
+ "cached_remote_reader_cache_direct_whole_bytes");
+bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes(
+ "cached_remote_reader_cache_direct_partial_bytes");
+bvar::Adder<uint64_t>
g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes");
+bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes(
+ "cached_remote_reader_cache_indirect_total_bytes");
+bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window(
+ "cached_remote_reader_indirect_bytes_1min_window",
&g_read_cache_indirect_bytes, 60);
+bvar::Window<bvar::Adder<uint64_t>>
g_read_cache_indirect_total_bytes_1min_window(
+ "cached_remote_reader_indirect_total_bytes_1min_window",
&g_read_cache_indirect_total_bytes,
+ 60);
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
const FileReaderOptions& opts)
@@ -56,6 +72,7 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader
_cache_hash = BlockFileCache::hash(path().filename().native());
_cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
if (config::enable_read_cache_file_directly) {
+ // this is designed for and test in doris table, external table
need extra tests
_cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
}
} else {
@@ -78,14 +95,18 @@
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
}
void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
- if (config::enable_read_cache_file_directly) {
+ if (_is_doris_table && config::enable_read_cache_file_directly) {
std::lock_guard lock(_mtx);
DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
+ file_block->_owned_by_cached_reader = true;
_cache_file_readers.emplace(file_block->offset(),
std::move(file_block));
}
}
CachedRemoteFileReader::~CachedRemoteFileReader() {
+ for (auto& it : _cache_file_readers) {
+ it.second->_owned_by_cached_reader = false;
+ }
static_cast<void>(close());
}
@@ -112,6 +133,7 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result,
size_t* bytes_read,
const IOContext* io_ctx) {
+ size_t already_read = 0;
const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
DCHECK(io_ctx);
@@ -139,7 +161,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
};
std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01,
std::move(defer_func));
stats.bytes_read += bytes_req;
- if (config::enable_read_cache_file_directly) {
+ if (_is_doris_table && config::enable_read_cache_file_directly) {
// read directly
SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
size_t need_read_size = bytes_req;
@@ -166,23 +188,33 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
if (!iter->second
->read(Slice(result.data + (cur_offset -
offset), reserve_bytes),
file_offset)
- .ok()) {
+ .ok()) { //TODO: maybe read failed because
block evict, should handle error
break;
}
}
+ _cache->add_need_update_lru_block(iter->second);
need_read_size -= reserve_bytes;
cur_offset += reserve_bytes;
+ already_read += reserve_bytes;
iter++;
}
if (need_read_size == 0) {
*bytes_read = bytes_req;
stats.hit_cache = true;
+ g_read_cache_direct_whole_num << 1;
+ g_read_cache_direct_whole_bytes << bytes_req;
return Status::OK();
+ } else {
+ g_read_cache_direct_partial_num << 1;
+ g_read_cache_direct_partial_bytes << already_read;
}
}
}
// read from cache or remote
- auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
+ g_read_cache_indirect_num << 1;
+ size_t indirect_read_bytes = 0;
+ auto [align_left, align_size] =
+ s_align_size(offset + already_read, bytes_req - already_read,
size());
CacheContext cache_context(io_ctx);
cache_context.stats = &stats;
MonotonicStopWatch sw;
@@ -210,6 +242,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
stats.hit_cache = false;
break;
case FileBlock::State::DOWNLOADED:
+ _insert_file_reader(block);
break;
}
}
@@ -246,13 +279,14 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
}
// copy from memory directly
size_t right_offset = offset + bytes_req - 1;
- if (empty_start <= right_offset && empty_end >= offset && !is_dryrun) {
- size_t copy_left_offset = offset < empty_start ? empty_start :
offset;
- size_t copy_right_offset = right_offset < empty_end ? right_offset
: empty_end;
+ if (empty_start <= right_offset && empty_end >= offset + already_read
&& !is_dryrun) {
+ size_t copy_left_offset = std::max(offset + already_read,
empty_start);
+ size_t copy_right_offset = std::min(right_offset, empty_end);
char* dst = result.data + (copy_left_offset - offset);
char* src = buffer.get() + (copy_left_offset - empty_start);
size_t copy_size = copy_right_offset - copy_left_offset + 1;
memcpy(dst, src, copy_size);
+ indirect_read_bytes += copy_size;
}
}
@@ -307,6 +341,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
SCOPED_RAW_TIMER(&stats.local_read_timer);
st = block->read(Slice(result.data + (current_offset -
offset), read_size),
file_offset);
+ indirect_read_bytes += read_size;
}
}
if (!st || block_state != FileBlock::State::DOWNLOADED) {
@@ -319,12 +354,16 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
RETURN_IF_ERROR(_remote_file_reader->read_at(
current_offset, Slice(result.data + (current_offset -
offset), read_size),
&bytes_read));
+ indirect_read_bytes += read_size;
DCHECK(bytes_read == read_size);
}
}
*bytes_read += read_size;
current_offset = right + 1;
}
+ g_read_cache_indirect_bytes << indirect_read_bytes;
+ g_read_cache_indirect_total_bytes << *bytes_read;
+
DCHECK(*bytes_read == bytes_req);
return Status::OK();
}
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index 38c57ce9358..9cc4f06c6f0 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -37,11 +37,13 @@ namespace io {
struct FileBlocksHolder;
class BlockFileCache;
+struct FileBlockCell;
class FileBlock {
friend struct FileBlocksHolder;
friend class BlockFileCache;
friend class CachedRemoteFileReader;
+ friend struct FileBlockCell;
public:
enum class State {
@@ -135,6 +137,10 @@ public:
void set_deleting() { _is_deleting = true; }
bool is_deleting() const { return _is_deleting; };
+public:
+ std::atomic<bool> _owned_by_cached_reader {
+ false}; // pocessed by CachedRemoteFileReader::_cache_file_readers
+
private:
std::string get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock)
const;
@@ -161,6 +167,8 @@ private:
FileCacheKey _key;
size_t _downloaded_size {0};
bool _is_deleting {false};
+
+ FileBlockCell* cell;
};
extern std::ostream& operator<<(std::ostream& os, const FileBlock::State&
value);
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 743abeb8986..e8df6843a9b 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3668,6 +3668,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
+ std::string cache_base_path = caches_dir / "cached_remote_file_reader" /
"";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3776,6 +3777,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_tail) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_tail" / "";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3838,6 +3840,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_tail) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_error_handle" / "";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3919,6 +3922,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_init" / "";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3980,6 +3984,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_init) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_concurrent) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_concurrent" / "";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -4055,6 +4060,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_concurrent) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_concurrent_2) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_concurrent_2" / "";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -4567,6 +4573,7 @@ TEST_F(BlockFileCacheTest,
remove_if_cached_when_isnt_releasable) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_opt_lock" / "";
config::enable_read_cache_file_directly = true;
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -7184,6 +7191,7 @@ TEST_F(BlockFileCacheTest, validate_get_or_set_crash) {
extern bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes;
TEST_F(BlockFileCacheTest, reader_dryrun_when_download_file_cache) {
+ std::string cache_base_path = caches_dir /
"reader_dryrun_when_download_file_cache" / "";
bool org = config::enable_reader_dryrun_when_download_file_cache;
config::enable_reader_dryrun_when_download_file_cache = true;
if (fs::exists(cache_base_path)) {
@@ -7661,6 +7669,7 @@ TEST_F(BlockFileCacheTest,
test_upgrade_cache_dir_version) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_ttl_index) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_ttl_index" / "";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -7737,6 +7746,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_ttl_index) {
}
TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) {
+ std::string cache_base_path = caches_dir /
"cached_remote_file_reader_normal_index" / "";
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -7890,4 +7900,170 @@ TEST_F(BlockFileCacheTest, test_reset_capacity) {
FileCacheFactory::instance()->_capacity = 0;
}
+TEST_F(BlockFileCacheTest,
cached_remote_file_reader_direct_read_and_evict_cache) {
+ config::enable_read_cache_file_directly = true;
+ std::string cache_base_path = caches_dir / "cache_direct_read" / "";
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.query_id = query_id;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+
+ std::string buffer;
+ buffer.resize(64_kb);
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read {0};
+ ASSERT_TRUE(
+ reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+ auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+ EXPECT_GT(cache->_cur_cache_size, 0);
+
+ auto ret_str = FileCacheFactory::instance()->clear_file_caches(
+ /*sync*/ false); // use async to evict cache
+ std::cout << ret_str << std::endl;
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+
+ // evict would be success even if one reference is held by the reader
+ EXPECT_EQ(cache->_cur_cache_size, 0);
+
+ // try to read sth
+ ASSERT_TRUE(
+ reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+ EXPECT_TRUE(reader->close().ok());
+ EXPECT_TRUE(reader->closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
+extern bvar::Adder<uint64_t> g_read_cache_direct_whole_num;
+extern bvar::Adder<uint64_t> g_read_cache_direct_partial_num;
+extern bvar::Adder<uint64_t> g_read_cache_indirect_num;
+extern bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes;
+extern bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes;
+extern bvar::Adder<uint64_t> g_read_cache_indirect_bytes;
+
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) {
+ std::string cache_base_path = caches_dir / "cache_direct_read_bytes_check"
/ "";
+ uint64_t org_g_read_cache_direct_whole_num =
g_read_cache_direct_whole_num.get_value();
+ uint64_t org_g_read_cache_direct_whole_bytes =
g_read_cache_direct_whole_bytes.get_value();
+ uint64_t org_g_read_cache_direct_partial_num =
g_read_cache_direct_partial_num.get_value();
+ uint64_t org_g_read_cache_indirect_num =
g_read_cache_indirect_num.get_value();
+ uint64_t org_g_read_cache_direct_partial_bytes =
g_read_cache_direct_partial_bytes.get_value();
+ uint64_t org_g_read_cache_indirect_bytes =
g_read_cache_indirect_bytes.get_value();
+
+ config::enable_read_cache_file_directly = true;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.query_id = query_id;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+
+ std::string buffer;
+ buffer.resize(64_kb);
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read {0};
+ // read offset 100 size 64k
+ ASSERT_TRUE(
+ reader->read_at(100, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+ auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ EXPECT_EQ(cache->_cur_cache_size, 1048576);
+ EXPECT_EQ(g_read_cache_indirect_num.get_value() -
org_g_read_cache_indirect_num, 1);
+ EXPECT_EQ(g_read_cache_indirect_bytes.get_value() -
org_g_read_cache_indirect_bytes, 64_kb);
+
+ // read offset 640k size 64k
+ ASSERT_TRUE(
+ reader->read_at(10 * 64_kb, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+ EXPECT_EQ(std::string(64_kb, '0'), buffer);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ EXPECT_EQ(cache->_cur_cache_size, 1048576);
+ EXPECT_EQ(g_read_cache_direct_whole_num.get_value() -
org_g_read_cache_direct_whole_num, 1);
+ EXPECT_EQ(g_read_cache_direct_whole_bytes.get_value() -
org_g_read_cache_direct_whole_bytes,
+ 64_kb);
+
+ // try to read first two blocks
+ ASSERT_TRUE(reader->read_at(1048576 - 100, Slice(buffer.data(),
buffer.size()), &bytes_read,
+ &io_ctx)
+ .ok());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ EXPECT_EQ(cache->_cur_cache_size, 2097152);
+ EXPECT_EQ(g_read_cache_direct_partial_num.get_value() -
org_g_read_cache_direct_partial_num, 1);
+ EXPECT_EQ(g_read_cache_direct_partial_bytes.get_value() -
org_g_read_cache_direct_partial_bytes,
+ 100);
+ EXPECT_EQ(g_read_cache_indirect_bytes.get_value() -
org_g_read_cache_indirect_bytes,
+ 64_kb + 64_kb - 100);
+
+ EXPECT_TRUE(reader->close().ok());
+ EXPECT_TRUE(reader->closed());
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
} // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index baa953ac7ff..99c2c780bed 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -493,4 +493,103 @@ TEST_F(BlockFileCacheTest,
test_lru_duplicate_queue_entry_restore) {
}
}
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) {
+ std::string cache_base_path = caches_dir / "cache_direct_read_order_check"
/ "";
+ config::enable_read_cache_file_directly = true;
+ config::file_cache_background_block_lru_update_interval_ms = 1000;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+
+ std::string buffer;
+ buffer.resize(64_kb);
+ IOContext io_ctx;
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ size_t bytes_read = 0;
+
+ // read
+ ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+ ASSERT_TRUE(
+ reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+ ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(),
buffer.size()), &bytes_read,
+ &io_ctx)
+ .ok());
+
+ // check inital order
+ std::vector<size_t> initial_offsets;
+ for (auto it = cache->_normal_queue.begin(); it !=
cache->_normal_queue.end(); ++it) {
+ initial_offsets.push_back(it->offset);
+ }
+ ASSERT_EQ(initial_offsets.size(), 3);
+ ASSERT_EQ(initial_offsets[0], 0);
+ ASSERT_EQ(initial_offsets[1], 1024 * 1024);
+ ASSERT_EQ(initial_offsets[2], 1024 * 1024 * 2);
+
+ // read same but different order
+ ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(),
buffer.size()), &bytes_read,
+ &io_ctx)
+ .ok());
+ ASSERT_TRUE(
+ reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+ ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx).ok());
+
+ std::vector<size_t> before_updated_offsets;
+ for (auto it = cache->_normal_queue.begin(); it !=
cache->_normal_queue.end(); ++it) {
+ before_updated_offsets.push_back(it->offset);
+ }
+ ASSERT_EQ(before_updated_offsets.size(), 3);
+ ASSERT_EQ(before_updated_offsets[0], 0);
+ ASSERT_EQ(before_updated_offsets[1], 1024 * 1024);
+ ASSERT_EQ(before_updated_offsets[2], 1024 * 1024 * 2);
+
+ // wait LRU update
+ std::this_thread::sleep_for(std::chrono::milliseconds(
+ 2 * config::file_cache_background_block_lru_update_interval_ms));
+
+ // check order after update
+ std::vector<size_t> updated_offsets;
+ for (auto it = cache->_normal_queue.begin(); it !=
cache->_normal_queue.end(); ++it) {
+ updated_offsets.push_back(it->offset);
+ }
+ ASSERT_EQ(updated_offsets.size(), 3);
+ ASSERT_EQ(updated_offsets[0], 1024 * 1024 * 2);
+ ASSERT_EQ(updated_offsets[1], 1024 * 1024);
+ ASSERT_EQ(updated_offsets[2], 0);
+
+ EXPECT_TRUE(reader->close().ok());
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]