This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9dab3cbfd2f branch-3.1: [enhancement](filecache) fix 
read_cache_file_directly #54503 (#56378)
9dab3cbfd2f is described below

commit 9dab3cbfd2f1a99c9a9dcd8c41dfd663dcf0e141
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Sep 25 17:40:41 2025 +0800

    branch-3.1: [enhancement](filecache) fix read_cache_file_directly #54503 
(#56378)
    
    Cherry-picked from #54503
    
    Signed-off-by: zhengyu <[email protected]>
    Signed-off-by: freemandealer <[email protected]>
    Co-authored-by: zhengyu <[email protected]>
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   2 +
 be/src/io/cache/block_file_cache.cpp               |  81 ++++++++--
 be/src/io/cache/block_file_cache.h                 |  83 ++++++----
 be/src/io/cache/cached_remote_file_reader.cpp      |  53 ++++++-
 be/src/io/cache/file_block.h                       |   8 +
 be/test/io/cache/block_file_cache_test.cpp         | 176 +++++++++++++++++++++
 .../io/cache/block_file_cache_test_lru_dump.cpp    |  99 ++++++++++++
 8 files changed, 457 insertions(+), 47 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5f0d43ea54c..18116506856 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1138,6 +1138,8 @@ 
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
 
 DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
+DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
+DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
 DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
 DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
 DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 74ce6bb8ab9..7b76d436694 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1172,6 +1172,8 @@ DECLARE_mBool(enable_file_cache_adaptive_write);
 DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
 DECLARE_mInt64(file_cache_remove_block_qps_limit);
 DECLARE_mInt64(file_cache_background_gc_interval_ms);
+DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
+DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
 DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 DECLARE_mInt64(file_cache_background_monitor_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
diff --git a/be/src/io/cache/block_file_cache.cpp 
b/be/src/io/cache/block_file_cache.cpp
index d69c3e3a07f..e7b050d5ede 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -230,6 +230,10 @@ BlockFileCache::BlockFileCache(const std::string& 
cache_base_path,
             _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
     _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
             _cache_base_path.c_str(), "file_cache_recycle_keys_length");
+    _need_update_lru_blocks_length_recorder = 
std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), 
"file_cache_need_update_lru_blocks_length");
+    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
+            _cache_base_path.c_str(), 
"file_cache_update_lru_blocks_latency_us");
     _ttl_gc_latency_us = 
std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
                                                                  
"file_cache_ttl_gc_latency_us");
     _shadow_queue_levenshtein_distance = 
std::make_shared<bvar::LatencyRecorder>(
@@ -349,6 +353,8 @@ Status 
BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
     _cache_background_gc_thread = 
std::thread(&BlockFileCache::run_background_gc, this);
     _cache_background_evict_in_advance_thread =
             std::thread(&BlockFileCache::run_background_evict_in_advance, 
this);
+    _cache_background_block_lru_update_thread =
+            std::thread(&BlockFileCache::run_background_block_lru_update, 
this);
 
     // Initialize LRU dump thread and restore queues
     _cache_background_lru_dump_thread = 
std::thread(&BlockFileCache::run_background_lru_dump, this);
@@ -358,6 +364,21 @@ Status 
BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
     return Status::OK();
 }
 
+void BlockFileCache::update_block_lru(FileBlockSPtr block,
+                                      std::lock_guard<std::mutex>& cache_lock) 
{
+    FileBlockCell* cell = block->cell;
+    if (cell) {
+        if (cell->queue_iterator) {
+            auto& queue = get_queue(block->cache_type());
+            queue.move_to_end(*cell->queue_iterator, cache_lock);
+            _lru_recorder->record_queue_event(block->cache_type(), 
CacheLRULogType::MOVETOBACK,
+                                              block->_key.hash, 
block->_key.offset,
+                                              block->_block_range.size());
+        }
+        cell->update_atime();
+    }
+}
+
 void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, 
bool move_iter_flag,
                               std::lock_guard<std::mutex>& cache_lock) {
     if (result) {
@@ -378,8 +399,8 @@ void BlockFileCache::use_cell(const FileBlockCell& cell, 
FileBlocks* result, boo
 
 template <class T>
     requires IsXLock<T>
-BlockFileCache::FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& 
hash, size_t offset,
-                                                        T& /* cache_lock */) {
+FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t 
offset,
+                                        T& /* cache_lock */) {
     auto it = _files.find(hash);
     if (it == _files.end()) {
         return nullptr;
@@ -570,6 +591,15 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& 
hash, const CacheConte
     return result;
 }
 
+void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
+    bool ret = _need_update_lru_blocks.enqueue(block);
+    if (ret) [[likely]] {
+        *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size_approx();
+    } else {
+        LOG_WARNING("Failed to push FileBlockSPtr to _need_update_lru_blocks");
+    }
+}
+
 std::string BlockFileCache::clear_file_cache_async() {
     LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
     _lru_dumper->remove_lru_dump_files();
@@ -774,10 +804,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const 
UInt128Wrapper& hash, size_t o
     return FileBlocksHolder(std::move(file_blocks));
 }
 
-BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& 
hash,
-                                                        const CacheContext& 
context, size_t offset,
-                                                        size_t size, 
FileBlock::State state,
-                                                        
std::lock_guard<std::mutex>& cache_lock) {
+FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const 
CacheContext& context,
+                                        size_t offset, size_t size, 
FileBlock::State state,
+                                        std::lock_guard<std::mutex>& 
cache_lock) {
     /// Create a file block cell and put it in `files` map by [hash][offset].
     if (size == 0) {
         return nullptr; /// Empty files are not cached.
@@ -1394,6 +1423,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& 
cache_lock, U& block_lo
     auto type = file_block->cache_type();
     auto expiration_time = file_block->expiration_time();
     auto* cell = get_cell(hash, offset, cache_lock);
+    file_block->cell = nullptr;
     DCHECK(cell);
     DCHECK(cell->queue_iterator);
     if (cell->queue_iterator) {
@@ -1493,9 +1523,9 @@ size_t 
BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
     return get_queue(cache_type).get_elements_num(cache_lock);
 }
 
-BlockFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block,
-                                             std::lock_guard<std::mutex>& 
cache_lock)
+FileBlockCell::FileBlockCell(FileBlockSPtr file_block, 
std::lock_guard<std::mutex>& cache_lock)
         : file_block(file_block) {
+    file_block->cell = this;
     /**
      * Cell can be created with either DOWNLOADED or EMPTY file block's state.
      * File block acquires DOWNLOADING state and creates LRUQueue iterator on 
first
@@ -2014,6 +2044,37 @@ void BlockFileCache::run_background_evict_in_advance() {
     }
 }
 
+void BlockFileCache::run_background_block_lru_update() {
+    Thread::set_self_name("run_background_block_lru_update");
+    FileBlockSPtr block;
+    size_t batch_count = 0;
+    while (!_close) {
+        int64_t interval_ms = 
config::file_cache_background_block_lru_update_interval_ms;
+        size_t batch_limit =
+                config::file_cache_background_block_lru_update_qps_limit * 
interval_ms / 1000;
+        {
+            std::unique_lock close_lock(_close_mtx);
+            _close_cv.wait_for(close_lock, 
std::chrono::milliseconds(interval_ms));
+            if (_close) {
+                break;
+            }
+        }
+
+        int64_t duration_ns = 0;
+        {
+            SCOPED_CACHE_LOCK(_mutex, this);
+            SCOPED_RAW_TIMER(&duration_ns);
+            while (batch_count < batch_limit && 
_need_update_lru_blocks.try_dequeue(block)) {
+                update_block_lru(block, cache_lock);
+                batch_count++;
+            }
+        }
+        *_update_lru_blocks_latency_us << (duration_ns / 1000);
+        *_need_update_lru_blocks_length_recorder << 
_need_update_lru_blocks.size_approx();
+        batch_count = 0;
+    }
+}
+
 void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash,
                                             uint64_t new_expiration_time) {
     SCOPED_CACHE_LOCK(_mutex, this);
@@ -2211,10 +2272,8 @@ std::map<size_t, FileBlockSPtr> 
BlockFileCache::get_blocks_by_key(const UInt128W
     if (_files.contains(hash)) {
         for (auto& [offset, cell] : _files[hash]) {
             if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
+                cell.file_block->_owned_by_cached_reader = true;
                 offset_to_block.emplace(offset, cell.file_block);
-                use_cell(cell, nullptr,
-                         need_to_move(cell.file_block->cache_type(), 
FileCacheType::DISPOSABLE),
-                         cache_lock);
             }
         }
     }
diff --git a/be/src/io/cache/block_file_cache.h 
b/be/src/io/cache/block_file_cache.h
index c0fe2cd2a09..d152e7403c0 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -79,6 +79,43 @@ class FSFileCacheStorage;
 
 // The BlockFileCache is responsible for the management of the blocks
 // The current strategies are lru and ttl.
+
+struct FileBlockCell {
+    friend class FileBlock;
+
+    FileBlockSPtr file_block;
+    /// Iterator is put here on first reservation attempt, if successful.
+    std::optional<LRUQueue::Iterator> queue_iterator;
+
+    mutable int64_t atime {0};
+    void update_atime() const {
+        atime = std::chrono::duration_cast<std::chrono::seconds>(
+                        std::chrono::steady_clock::now().time_since_epoch())
+                        .count();
+    }
+
+    /// Pointer to file block is always hold by the cache itself.
+    /// Apart from pointer in cache, it can be hold by cache users, when they 
call
+    /// getorSet(), but cache users always hold it via FileBlocksHolder.
+    bool releasable() const {
+        return (file_block.use_count() == 1 ||
+                (file_block.use_count() == 2 && 
file_block->_owned_by_cached_reader));
+    }
+
+    size_t size() const { return file_block->_block_range.size(); }
+
+    FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& 
cache_lock);
+    FileBlockCell(FileBlockCell&& other) noexcept
+            : file_block(std::move(other.file_block)),
+              queue_iterator(other.queue_iterator),
+              atime(other.atime) {
+        file_block->cell = this;
+    }
+
+    FileBlockCell& operator=(const FileBlockCell&) = delete;
+    FileBlockCell(const FileBlockCell&) = delete;
+};
+
 class BlockFileCache {
     friend class FSFileCacheStorage;
     friend class MemFileCacheStorage;
@@ -86,6 +123,7 @@ class BlockFileCache {
     friend struct FileBlocksHolder;
     friend class CacheLRUDumper;
     friend class LRUQueueRecorder;
+    friend struct FileBlockCell;
 
 public:
     // hash the file_name to uint128
@@ -117,6 +155,9 @@ public:
         if (_cache_background_lru_log_replay_thread.joinable()) {
             _cache_background_lru_log_replay_thread.join();
         }
+        if (_cache_background_block_lru_update_thread.joinable()) {
+            _cache_background_block_lru_update_thread.join();
+        }
     }
 
     /// Restore cache from local filesystem.
@@ -145,6 +186,11 @@ public:
     FileBlocksHolder get_or_set(const UInt128Wrapper& hash, size_t offset, 
size_t size,
                                 CacheContext& context);
 
+    /**
+     * record blocks read directly by CachedRemoteFileReader
+     */
+    void add_need_update_lru_block(FileBlockSPtr block);
+
     /**
      * Clear all cached data for this cache instance async
      *
@@ -290,35 +336,6 @@ public:
     }
 
 private:
-    struct FileBlockCell {
-        FileBlockSPtr file_block;
-        /// Iterator is put here on first reservation attempt, if successful.
-        std::optional<LRUQueue::Iterator> queue_iterator;
-
-        mutable int64_t atime {0};
-        void update_atime() const {
-            atime = std::chrono::duration_cast<std::chrono::seconds>(
-                            
std::chrono::steady_clock::now().time_since_epoch())
-                            .count();
-        }
-
-        /// Pointer to file block is always hold by the cache itself.
-        /// Apart from pointer in cache, it can be hold by cache users, when 
they call
-        /// getorSet(), but cache users always hold it via FileBlocksHolder.
-        bool releasable() const { return file_block.use_count() == 1; }
-
-        size_t size() const { return file_block->_block_range.size(); }
-
-        FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& 
cache_lock);
-        FileBlockCell(FileBlockCell&& other) noexcept
-                : file_block(std::move(other.file_block)),
-                  queue_iterator(other.queue_iterator),
-                  atime(other.atime) {}
-
-        FileBlockCell& operator=(const FileBlockCell&) = delete;
-        FileBlockCell(const FileBlockCell&) = delete;
-    };
-
     LRUQueue& get_queue(FileCacheType type);
     const LRUQueue& get_queue(FileCacheType type) const;
 
@@ -339,6 +356,8 @@ private:
 
     Status initialize_unlocked(std::lock_guard<std::mutex>& cache_lock);
 
+    void update_block_lru(FileBlockSPtr block, std::lock_guard<std::mutex>& 
cache_lock);
+
     void use_cell(const FileBlockCell& cell, FileBlocks* result, bool 
not_need_move,
                   std::lock_guard<std::mutex>& cache_lock);
 
@@ -397,6 +416,7 @@ private:
     void run_background_lru_dump();
     void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
     void run_background_evict_in_advance();
+    void run_background_block_lru_update();
 
     bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
                                                        
std::vector<FileCacheType> other_cache_types,
@@ -449,6 +469,7 @@ private:
     std::thread _cache_background_evict_in_advance_thread;
     std::thread _cache_background_lru_dump_thread;
     std::thread _cache_background_lru_log_replay_thread;
+    std::thread _cache_background_block_lru_update_thread;
     std::atomic_bool _async_open_done {false};
     // disk space or inode is less than the specified value
     bool _disk_resource_limit_mode {false};
@@ -524,6 +545,8 @@ private:
     std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency_us;
     std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
     std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
+    std::shared_ptr<bvar::LatencyRecorder> _update_lru_blocks_latency_us;
+    std::shared_ptr<bvar::LatencyRecorder> 
_need_update_lru_blocks_length_recorder;
     std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;
 
     std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
@@ -533,6 +556,8 @@ private:
     // so join this async load thread first
     std::unique_ptr<FileCacheStorage> _storage;
     std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
+
+    moodycamel::ConcurrentQueue<FileBlockSPtr> _need_update_lru_blocks;
 };
 
 } // namespace doris::io
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index b89bdcf2f6d..a9b87734222 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -47,6 +47,22 @@ bvar::LatencyRecorder 
g_skip_cache_num("cached_remote_reader_skip_cache_num");
 bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
 bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
         "cached_remote_reader_skip_local_cache_io_sum_bytes");
+bvar::Adder<uint64_t> 
g_read_cache_direct_whole_num("cached_remote_reader_cache_direct_whole_num");
+bvar::Adder<uint64_t> g_read_cache_direct_partial_num(
+        "cached_remote_reader_cache_direct_partial_num");
+bvar::Adder<uint64_t> 
g_read_cache_indirect_num("cached_remote_reader_cache_indirect_num");
+bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes(
+        "cached_remote_reader_cache_direct_whole_bytes");
+bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes(
+        "cached_remote_reader_cache_direct_partial_bytes");
+bvar::Adder<uint64_t> 
g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes");
+bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes(
+        "cached_remote_reader_cache_indirect_total_bytes");
+bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window(
+        "cached_remote_reader_indirect_bytes_1min_window", 
&g_read_cache_indirect_bytes, 60);
+bvar::Window<bvar::Adder<uint64_t>> 
g_read_cache_indirect_total_bytes_1min_window(
+        "cached_remote_reader_indirect_total_bytes_1min_window", 
&g_read_cache_indirect_total_bytes,
+        60);
 
 CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr 
remote_file_reader,
                                                const FileReaderOptions& opts)
@@ -56,6 +72,7 @@ CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr 
remote_file_reader
         _cache_hash = BlockFileCache::hash(path().filename().native());
         _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
         if (config::enable_read_cache_file_directly) {
+            // this is designed for and test in doris table, external table 
need extra tests
             _cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
         }
     } else {
@@ -78,14 +95,18 @@ 
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader
 }
 
 void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
-    if (config::enable_read_cache_file_directly) {
+    if (_is_doris_table && config::enable_read_cache_file_directly) {
         std::lock_guard lock(_mtx);
         DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
+        file_block->_owned_by_cached_reader = true;
         _cache_file_readers.emplace(file_block->offset(), 
std::move(file_block));
     }
 }
 
 CachedRemoteFileReader::~CachedRemoteFileReader() {
+    for (auto& it : _cache_file_readers) {
+        it.second->_owned_by_cached_reader = false;
+    }
     static_cast<void>(close());
 }
 
@@ -112,6 +133,7 @@ std::pair<size_t, size_t> 
CachedRemoteFileReader::s_align_size(size_t offset, si
 
 Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
                                             const IOContext* io_ctx) {
+    size_t already_read = 0;
     const bool is_dryrun = io_ctx->is_dryrun;
     DCHECK(!closed());
     DCHECK(io_ctx);
@@ -139,7 +161,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
     };
     std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, 
std::move(defer_func));
     stats.bytes_read += bytes_req;
-    if (config::enable_read_cache_file_directly) {
+    if (_is_doris_table && config::enable_read_cache_file_directly) {
         // read directly
         SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
         size_t need_read_size = bytes_req;
@@ -166,23 +188,33 @@ Status CachedRemoteFileReader::read_at_impl(size_t 
offset, Slice result, size_t*
                     if (!iter->second
                                  ->read(Slice(result.data + (cur_offset - 
offset), reserve_bytes),
                                         file_offset)
-                                 .ok()) {
+                                 .ok()) { //TODO: maybe read failed because 
block evict, should handle error
                         break;
                     }
                 }
+                _cache->add_need_update_lru_block(iter->second);
                 need_read_size -= reserve_bytes;
                 cur_offset += reserve_bytes;
+                already_read += reserve_bytes;
                 iter++;
             }
             if (need_read_size == 0) {
                 *bytes_read = bytes_req;
                 stats.hit_cache = true;
+                g_read_cache_direct_whole_num << 1;
+                g_read_cache_direct_whole_bytes << bytes_req;
                 return Status::OK();
+            } else {
+                g_read_cache_direct_partial_num << 1;
+                g_read_cache_direct_partial_bytes << already_read;
             }
         }
     }
     // read from cache or remote
-    auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
+    g_read_cache_indirect_num << 1;
+    size_t indirect_read_bytes = 0;
+    auto [align_left, align_size] =
+            s_align_size(offset + already_read, bytes_req - already_read, 
size());
     CacheContext cache_context(io_ctx);
     cache_context.stats = &stats;
     MonotonicStopWatch sw;
@@ -210,6 +242,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
             stats.hit_cache = false;
             break;
         case FileBlock::State::DOWNLOADED:
+            _insert_file_reader(block);
             break;
         }
     }
@@ -246,13 +279,14 @@ Status CachedRemoteFileReader::read_at_impl(size_t 
offset, Slice result, size_t*
         }
         // copy from memory directly
         size_t right_offset = offset + bytes_req - 1;
-        if (empty_start <= right_offset && empty_end >= offset && !is_dryrun) {
-            size_t copy_left_offset = offset < empty_start ? empty_start : 
offset;
-            size_t copy_right_offset = right_offset < empty_end ? right_offset 
: empty_end;
+        if (empty_start <= right_offset && empty_end >= offset + already_read 
&& !is_dryrun) {
+            size_t copy_left_offset = std::max(offset + already_read, 
empty_start);
+            size_t copy_right_offset = std::min(right_offset, empty_end);
             char* dst = result.data + (copy_left_offset - offset);
             char* src = buffer.get() + (copy_left_offset - empty_start);
             size_t copy_size = copy_right_offset - copy_left_offset + 1;
             memcpy(dst, src, copy_size);
+            indirect_read_bytes += copy_size;
         }
     }
 
@@ -307,6 +341,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                     SCOPED_RAW_TIMER(&stats.local_read_timer);
                     st = block->read(Slice(result.data + (current_offset - 
offset), read_size),
                                      file_offset);
+                    indirect_read_bytes += read_size;
                 }
             }
             if (!st || block_state != FileBlock::State::DOWNLOADED) {
@@ -319,12 +354,16 @@ Status CachedRemoteFileReader::read_at_impl(size_t 
offset, Slice result, size_t*
                 RETURN_IF_ERROR(_remote_file_reader->read_at(
                         current_offset, Slice(result.data + (current_offset - 
offset), read_size),
                         &bytes_read));
+                indirect_read_bytes += read_size;
                 DCHECK(bytes_read == read_size);
             }
         }
         *bytes_read += read_size;
         current_offset = right + 1;
     }
+    g_read_cache_indirect_bytes << indirect_read_bytes;
+    g_read_cache_indirect_total_bytes << *bytes_read;
+
     DCHECK(*bytes_read == bytes_req);
     return Status::OK();
 }
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index 38c57ce9358..9cc4f06c6f0 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -37,11 +37,13 @@ namespace io {
 
 struct FileBlocksHolder;
 class BlockFileCache;
+struct FileBlockCell;
 
 class FileBlock {
     friend struct FileBlocksHolder;
     friend class BlockFileCache;
     friend class CachedRemoteFileReader;
+    friend struct FileBlockCell;
 
 public:
     enum class State {
@@ -135,6 +137,10 @@ public:
     void set_deleting() { _is_deleting = true; }
     bool is_deleting() const { return _is_deleting; };
 
+public:
+    std::atomic<bool> _owned_by_cached_reader {
+            false}; // pocessed by CachedRemoteFileReader::_cache_file_readers
+
 private:
     std::string get_info_for_log_impl(std::lock_guard<std::mutex>& block_lock) 
const;
 
@@ -161,6 +167,8 @@ private:
     FileCacheKey _key;
     size_t _downloaded_size {0};
     bool _is_deleting {false};
+
+    FileBlockCell* cell;
 };
 
 extern std::ostream& operator<<(std::ostream& os, const FileBlock::State& 
value);
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 743abeb8986..e8df6843a9b 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -3668,6 +3668,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
+    std::string cache_base_path = caches_dir / "cached_remote_file_reader" / 
"";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -3776,6 +3777,7 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_tail) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_tail" / "";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -3838,6 +3840,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_tail) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_error_handle) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_error_handle" / "";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -3919,6 +3922,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_error_handle) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_init) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_init" / "";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -3980,6 +3984,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_init) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_concurrent) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_concurrent" / "";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -4055,6 +4060,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_concurrent) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_concurrent_2) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_concurrent_2" / "";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -4567,6 +4573,7 @@ TEST_F(BlockFileCacheTest, 
remove_if_cached_when_isnt_releasable) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_opt_lock) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_opt_lock" / "";
     config::enable_read_cache_file_directly = true;
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
@@ -7184,6 +7191,7 @@ TEST_F(BlockFileCacheTest, validate_get_or_set_crash) {
 extern bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes;
 
 TEST_F(BlockFileCacheTest, reader_dryrun_when_download_file_cache) {
+    std::string cache_base_path = caches_dir / 
"reader_dryrun_when_download_file_cache" / "";
     bool org = config::enable_reader_dryrun_when_download_file_cache;
     config::enable_reader_dryrun_when_download_file_cache = true;
     if (fs::exists(cache_base_path)) {
@@ -7661,6 +7669,7 @@ TEST_F(BlockFileCacheTest, 
test_upgrade_cache_dir_version) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_ttl_index) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_ttl_index" / "";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -7737,6 +7746,7 @@ TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_ttl_index) {
 }
 
 TEST_F(BlockFileCacheTest, cached_remote_file_reader_normal_index) {
+    std::string cache_base_path = caches_dir / 
"cached_remote_file_reader_normal_index" / "";
     if (fs::exists(cache_base_path)) {
         fs::remove_all(cache_base_path);
     }
@@ -7890,4 +7900,170 @@ TEST_F(BlockFileCacheTest, test_reset_capacity) {
     FileCacheFactory::instance()->_capacity = 0;
 }
 
+TEST_F(BlockFileCacheTest, 
cached_remote_file_reader_direct_read_and_evict_cache) {
+    config::enable_read_cache_file_directly = true;
+    std::string cache_base_path = caches_dir / "cache_direct_read" / "";
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.query_id = query_id;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+
+    std::string buffer;
+    buffer.resize(64_kb);
+    IOContext io_ctx;
+    FileCacheStatistics stats;
+    io_ctx.file_cache_stats = &stats;
+    size_t bytes_read {0};
+    ASSERT_TRUE(
+            reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+    auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+    EXPECT_GT(cache->_cur_cache_size, 0);
+
+    auto ret_str = FileCacheFactory::instance()->clear_file_caches(
+            /*sync*/ false); // use async to evict cache
+    std::cout << ret_str << std::endl;
+    std::this_thread::sleep_for(std::chrono::seconds(5));
+
+    // evict would be success even if one reference is held by the reader
+    EXPECT_EQ(cache->_cur_cache_size, 0);
+
+    // try to read sth
+    ASSERT_TRUE(
+            reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+    EXPECT_TRUE(reader->close().ok());
+    EXPECT_TRUE(reader->closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
+extern bvar::Adder<uint64_t> g_read_cache_direct_whole_num;
+extern bvar::Adder<uint64_t> g_read_cache_direct_partial_num;
+extern bvar::Adder<uint64_t> g_read_cache_indirect_num;
+extern bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes;
+extern bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes;
+extern bvar::Adder<uint64_t> g_read_cache_indirect_bytes;
+
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) {
+    std::string cache_base_path = caches_dir / "cache_direct_read_bytes_check" 
/ "";
+    uint64_t org_g_read_cache_direct_whole_num = 
g_read_cache_direct_whole_num.get_value();
+    uint64_t org_g_read_cache_direct_whole_bytes = 
g_read_cache_direct_whole_bytes.get_value();
+    uint64_t org_g_read_cache_direct_partial_num = 
g_read_cache_direct_partial_num.get_value();
+    uint64_t org_g_read_cache_indirect_num = 
g_read_cache_indirect_num.get_value();
+    uint64_t org_g_read_cache_direct_partial_bytes = 
g_read_cache_direct_partial_bytes.get_value();
+    uint64_t org_g_read_cache_indirect_bytes = 
g_read_cache_indirect_bytes.get_value();
+
+    config::enable_read_cache_file_directly = true;
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.query_id = query_id;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+
+    std::string buffer;
+    buffer.resize(64_kb);
+    IOContext io_ctx;
+    FileCacheStatistics stats;
+    io_ctx.file_cache_stats = &stats;
+    size_t bytes_read {0};
+    // read offset 100 size 64k
+    ASSERT_TRUE(
+            reader->read_at(100, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+
+    auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    EXPECT_EQ(cache->_cur_cache_size, 1048576);
+    EXPECT_EQ(g_read_cache_indirect_num.get_value() - 
org_g_read_cache_indirect_num, 1);
+    EXPECT_EQ(g_read_cache_indirect_bytes.get_value() - 
org_g_read_cache_indirect_bytes, 64_kb);
+
+    // read offset 640k size 64k
+    ASSERT_TRUE(
+            reader->read_at(10 * 64_kb, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                    .ok());
+    EXPECT_EQ(std::string(64_kb, '0'), buffer);
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    EXPECT_EQ(cache->_cur_cache_size, 1048576);
+    EXPECT_EQ(g_read_cache_direct_whole_num.get_value() - 
org_g_read_cache_direct_whole_num, 1);
+    EXPECT_EQ(g_read_cache_direct_whole_bytes.get_value() - 
org_g_read_cache_direct_whole_bytes,
+              64_kb);
+
+    // try to read first two blocks
+    ASSERT_TRUE(reader->read_at(1048576 - 100, Slice(buffer.data(), 
buffer.size()), &bytes_read,
+                                &io_ctx)
+                        .ok());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    EXPECT_EQ(cache->_cur_cache_size, 2097152);
+    EXPECT_EQ(g_read_cache_direct_partial_num.get_value() - 
org_g_read_cache_direct_partial_num, 1);
+    EXPECT_EQ(g_read_cache_direct_partial_bytes.get_value() - 
org_g_read_cache_direct_partial_bytes,
+              100);
+    EXPECT_EQ(g_read_cache_indirect_bytes.get_value() - 
org_g_read_cache_indirect_bytes,
+              64_kb + 64_kb - 100);
+
+    EXPECT_TRUE(reader->close().ok());
+    EXPECT_TRUE(reader->closed());
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
 } // namespace doris::io
diff --git a/be/test/io/cache/block_file_cache_test_lru_dump.cpp 
b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
index baa953ac7ff..99c2c780bed 100644
--- a/be/test/io/cache/block_file_cache_test_lru_dump.cpp
+++ b/be/test/io/cache/block_file_cache_test_lru_dump.cpp
@@ -493,4 +493,103 @@ TEST_F(BlockFileCacheTest, 
test_lru_duplicate_queue_entry_restore) {
     }
 }
 
+TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_order_check) {
+    std::string cache_base_path = caches_dir / "cache_direct_read_order_check" 
/ "";
+    config::enable_read_cache_file_directly = true;
+    config::file_cache_background_block_lru_update_interval_ms = 1000;
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    auto cache = FileCacheFactory::instance()->_path_to_cache[cache_base_path];
+
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    auto reader = std::make_shared<CachedRemoteFileReader>(local_reader, opts);
+
+    std::string buffer;
+    buffer.resize(64_kb);
+    IOContext io_ctx;
+    FileCacheStatistics stats;
+    io_ctx.file_cache_stats = &stats;
+    size_t bytes_read = 0;
+
+    // read
+    ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+    ASSERT_TRUE(
+            reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                    .ok());
+    ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), 
buffer.size()), &bytes_read,
+                                &io_ctx)
+                        .ok());
+
+    // check inital order
+    std::vector<size_t> initial_offsets;
+    for (auto it = cache->_normal_queue.begin(); it != 
cache->_normal_queue.end(); ++it) {
+        initial_offsets.push_back(it->offset);
+    }
+    ASSERT_EQ(initial_offsets.size(), 3);
+    ASSERT_EQ(initial_offsets[0], 0);
+    ASSERT_EQ(initial_offsets[1], 1024 * 1024);
+    ASSERT_EQ(initial_offsets[2], 1024 * 1024 * 2);
+
+    // read same but different order
+    ASSERT_TRUE(reader->read_at(1024 * 1024 * 2, Slice(buffer.data(), 
buffer.size()), &bytes_read,
+                                &io_ctx)
+                        .ok());
+    ASSERT_TRUE(
+            reader->read_at(1024 * 1024, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                    .ok());
+    ASSERT_TRUE(reader->read_at(0, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx).ok());
+
+    std::vector<size_t> before_updated_offsets;
+    for (auto it = cache->_normal_queue.begin(); it != 
cache->_normal_queue.end(); ++it) {
+        before_updated_offsets.push_back(it->offset);
+    }
+    ASSERT_EQ(before_updated_offsets.size(), 3);
+    ASSERT_EQ(before_updated_offsets[0], 0);
+    ASSERT_EQ(before_updated_offsets[1], 1024 * 1024);
+    ASSERT_EQ(before_updated_offsets[2], 1024 * 1024 * 2);
+
+    // wait LRU update
+    std::this_thread::sleep_for(std::chrono::milliseconds(
+            2 * config::file_cache_background_block_lru_update_interval_ms));
+
+    // check order after update
+    std::vector<size_t> updated_offsets;
+    for (auto it = cache->_normal_queue.begin(); it != 
cache->_normal_queue.end(); ++it) {
+        updated_offsets.push_back(it->offset);
+    }
+    ASSERT_EQ(updated_offsets.size(), 3);
+    ASSERT_EQ(updated_offsets[0], 1024 * 1024 * 2);
+    ASSERT_EQ(updated_offsets[1], 1024 * 1024);
+    ASSERT_EQ(updated_offsets[2], 0);
+
+    EXPECT_TRUE(reader->close().ok());
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+}
+
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to