This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 87badcdff31 branch-4.0: [fix](cloud) Fix read peer file cache some 
bugs #57838 #58063 (#58627)
87badcdff31 is described below

commit 87badcdff31f22ce006a802e019e57a53d25395d
Author: deardeng <[email protected]>
AuthorDate: Wed Dec 3 09:25:44 2025 +0800

    branch-4.0: [fix](cloud) Fix read peer file cache some bugs #57838 #58063 
(#58627)
    
    cherry pick from #57838 #58063
---
 be/src/cloud/cloud_internal_service.cpp            | 234 +++++++++++++--------
 be/src/io/cache/block_file_cache_downloader.cpp    |   1 +
 be/src/io/cache/cached_remote_file_reader.cpp      |  19 +-
 be/src/io/cache/peer_file_cache_reader.cpp         |   8 +-
 be/src/io/cache/peer_file_cache_reader.h           |   5 +-
 .../cloud_p0/balance/test_balance_warm_up.groovy   |   2 +
 6 files changed, 166 insertions(+), 103 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 94bb951b95d..2584ce8146b 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -170,110 +170,160 @@ void 
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
                << ", response=" << response->DebugString();
 }
 
+namespace {
+// Helper functions for fetch_peer_data
+
+Status handle_peer_file_range_request(const std::string& path, 
PFetchPeerDataResponse* response) {
+    // Read specific range [file_offset, file_offset+file_size) across cached 
blocks
+    auto datas = 
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
+    for (auto& cb : datas) {
+        *(response->add_datas()) = std::move(cb);
+    }
+    return Status::OK();
+}
+
+void set_error_response(PFetchPeerDataResponse* response, const std::string& 
error_msg) {
+    response->mutable_status()->add_error_msgs(error_msg);
+    response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+}
+
+Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block, 
size_t file_size,
+                       doris::CacheBlockPB* output) {
+    std::string data;
+    // ATTN: calculate the rightmost boundary value of the block, due to 
inaccurate current block meta information.
+    // see CachedRemoteFileReader::read_at_impl for more details.
+    // Ensure file_size >= file_block->offset() to avoid underflow
+    if (file_size < file_block->offset()) {
+        LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset("
+                     << file_block->offset() << ")";
+        return Status::InternalError<false>("file_size less than block 
offset");
+    }
+    size_t read_size = std::min(static_cast<size_t>(file_size - 
file_block->offset()),
+                                file_block->range().size());
+    data.resize(read_size);
+
+    auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                      
std::chrono::steady_clock::now().time_since_epoch())
+                                      .count();
+
+    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+    Slice slice(data.data(), data.size());
+    Status read_st = file_block->read(slice, /*read_offset=*/0);
+
+    auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                    
std::chrono::steady_clock::now().time_since_epoch())
+                                    .count();
+    g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - 
begin_read_file_ts);
+
+    if (read_st.ok()) {
+        output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+        output->set_block_size(static_cast<int64_t>(read_size));
+        output->set_data(std::move(data));
+        return Status::OK();
+    } else {
+        g_file_cache_get_by_peer_failed_num << 1;
+        LOG(WARNING) << "read cache block failed: " << read_st;
+        return read_st;
+    }
+}
+
+Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest* 
request,
+                                            PFetchPeerDataResponse* response) {
+    const auto& path = request->path();
+    auto hash = io::BlockFileCache::hash(path);
+    auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
+    if (cache == nullptr) {
+        g_file_cache_get_by_peer_failed_num << 1;
+        set_error_response(response, "can't get file cache instance");
+        return Status::InternalError<false>("can't get file cache instance");
+    }
+
+    io::CacheContext ctx {};
+    io::ReadStatistics local_stats;
+    ctx.stats = &local_stats;
+
+    for (const auto& cb_req : request->cache_req()) {
+        size_t offset = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_offset()));
+        size_t size = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_size()));
+        auto holder = cache->get_or_set(hash, offset, size, ctx);
+
+        for (auto& fb : holder.file_blocks) {
+            if (fb->state() != io::FileBlock::State::DOWNLOADED) {
+                g_file_cache_get_by_peer_failed_num << 1;
+                LOG(WARNING) << "read cache block failed, state=" << 
fb->state();
+                set_error_response(response, "read cache file error");
+                return Status::InternalError<false>("cache block not 
downloaded");
+            }
+
+            g_file_cache_get_by_peer_blocks_num << 1;
+            doris::CacheBlockPB* out = response->add_datas();
+            Status read_status = read_file_block(fb, request->file_size(), 
out);
+            if (!read_status.ok()) {
+                set_error_response(response, "read cache file error");
+                return read_status;
+            }
+        }
+    }
+
+    return Status::OK();
+}
+} // namespace
+
 void 
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* 
controller
                                                [[maybe_unused]],
                                                const PFetchPeerDataRequest* 
request,
                                                PFetchPeerDataResponse* 
response,
                                                google::protobuf::Closure* 
done) {
-    // TODO(dx): use async thread pool to handle the request, not AsyncIO
-    brpc::ClosureGuard closure_guard(done);
-    g_file_cache_get_by_peer_num << 1;
-    if (!config::enable_file_cache) {
-        LOG_WARNING("try to access file cache data, but file cache not 
enabled");
-        return;
-    }
-    int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-                               
std::chrono::steady_clock::now().time_since_epoch())
-                               .count();
-    const auto type = request->type();
-    const auto& path = request->path();
-    response->mutable_status()->set_status_code(TStatusCode::OK);
-    if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
-        // Read specific range [file_offset, file_offset+file_size) across 
cached blocks
-        auto datas = 
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
-        for (auto& cb : datas) {
-            *(response->add_datas()) = std::move(cb);
-        }
-    } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
-        // Multiple specific blocks
-        auto hash = io::BlockFileCache::hash(path);
-        auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
-        if (cache == nullptr) {
-            g_file_cache_get_by_peer_failed_num << 1;
-            response->mutable_status()->add_error_msgs("can't get file cache 
instance");
-            
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        g_file_cache_get_by_peer_num << 1;
+
+        if (!config::enable_file_cache) {
+            LOG_WARNING("try to access file cache data, but file cache not 
enabled");
             return;
         }
-        io::CacheContext ctx {};
-        // ensure a valid stats pointer is provided to cache layer
-        io::ReadStatistics local_stats;
-        ctx.stats = &local_stats;
-        for (const auto& cb_req : request->cache_req()) {
-            size_t offset = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_offset()));
-            size_t size = static_cast<size_t>(std::max<int64_t>(0, 
cb_req.block_size()));
-            auto holder = cache->get_or_set(hash, offset, size, ctx);
-            for (auto& fb : holder.file_blocks) {
-                auto state = fb->state();
-                if (state != io::FileBlock::State::DOWNLOADED) {
-                    g_file_cache_get_by_peer_failed_num << 1;
-                    LOG(WARNING) << "read cache block failed, state=" << state;
-                    response->mutable_status()->add_error_msgs("read cache 
file error");
-                    
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
-                    return;
-                }
-                g_file_cache_get_by_peer_blocks_num << 1;
-                doris::CacheBlockPB* out = response->add_datas();
-                out->set_block_offset(static_cast<int64_t>(fb->offset()));
-                out->set_block_size(static_cast<int64_t>(fb->range().size()));
-                std::string data;
-                data.resize(fb->range().size());
-                // Offload the file block read to a dedicated OS thread to 
avoid bthread IO
-                Status read_st = Status::OK();
-                // due to file_reader.cpp:33] Check failed: bthread_self() == 0
-                int64_t begin_read_file_ts =
-                        std::chrono::duration_cast<std::chrono::microseconds>(
-                                
std::chrono::steady_clock::now().time_since_epoch())
-                                .count();
-                auto task = [&] {
-                    // Current thread not exist ThreadContext, usually after 
the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext 
and bind a Task.
-                    
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
-                    Slice slice(data.data(), data.size());
-                    read_st = fb->read(slice, /*read_offset=*/0);
-                };
-                AsyncIO::run_task(task, io::FileSystemType::LOCAL);
-                int64_t end_read_file_ts =
-                        std::chrono::duration_cast<std::chrono::microseconds>(
+
+        auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
                                 
std::chrono::steady_clock::now().time_since_epoch())
                                 .count();
-                g_file_cache_get_by_peer_read_cache_file_latency
-                        << (end_read_file_ts - begin_read_file_ts);
-                if (read_st.ok()) {
-                    out->set_data(std::move(data));
-                } else {
-                    g_file_cache_get_by_peer_failed_num << 1;
-                    LOG(WARNING) << "read cache block failed: " << read_st;
-                    response->mutable_status()->add_error_msgs("read cache 
file error");
-                    
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
-                    return;
-                }
-            }
+
+        const auto type = request->type();
+        const auto& path = request->path();
+        response->mutable_status()->set_status_code(TStatusCode::OK);
+
+        Status status = Status::OK();
+        if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
+            status = handle_peer_file_range_request(path, response);
+        } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
+            status = handle_peer_file_cache_block_request(request, response);
         }
-    }
-    DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
-        int st_us = dp->param<int>("sleep", 1000);
-        
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", 
st_us);
-        // sleep us
-        bthread_usleep(st_us);
-    });
 
-    int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-                             
std::chrono::steady_clock::now().time_since_epoch())
-                             .count();
-    g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
-    g_file_cache_get_by_peer_success_num << 1;
+        if (!status.ok()) {
+            LOG(WARNING) << "fetch peer data failed: " << status.to_string();
+            set_error_response(response, status.to_string());
+        }
 
-    VLOG_DEBUG << "fetch cache request=" << request->DebugString()
-               << ", response=" << response->DebugString();
+        DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
+            int st_us = dp->param<int>("sleep", 1000);
+            
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", 
st_us);
+            bthread_usleep(st_us);
+        });
+
+        auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+                              
std::chrono::steady_clock::now().time_since_epoch())
+                              .count();
+        g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
+        g_file_cache_get_by_peer_success_num << 1;
+
+        VLOG_DEBUG << "fetch cache request=" << request->DebugString()
+                   << ", response=" << response->DebugString();
+    });
+
+    if (!ret) {
+        brpc::ClosureGuard closure_guard(done);
+        LOG(WARNING) << "fail to offer fetch peer data request to the work 
pool, pool="
+                     << _heavy_work_pool.get_info();
+    }
 }
 
 #include "common/compile_check_end.h"
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index 851ff386e4d..ba28d9f3479 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -284,6 +284,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
                                 .is_index_data = meta.cache_type() == 
::doris::FileCacheType::INDEX,
                                 .expiration_time = meta.expiration_time(),
                                 .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                                .is_warmup = true,
                         },
                 .download_done = std::move(download_done),
         };
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index 59ed51a6f6b..240515e94e1 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -191,10 +191,12 @@ std::pair<std::string, int> 
get_peer_connection_info(const std::string& file_pat
 }
 
 // Execute peer read with fallback to S3
+// file_size is the size of the file
+// used to calculate the rightmost boundary value of the block, due to 
inaccurate current block meta information.
 Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, 
size_t empty_start,
                          size_t& size, std::unique_ptr<char[]>& buffer,
-                         const std::string& file_path, bool is_doris_table, 
ReadStatistics& stats,
-                         const IOContext* io_ctx) {
+                         const std::string& file_path, size_t file_size, bool 
is_doris_table,
+                         ReadStatistics& stats, const IOContext* io_ctx) {
     auto [host, port] = get_peer_connection_info(file_path);
     VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", 
port=" << port
                << ", file_path=" << file_path;
@@ -210,7 +212,7 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>& 
empty_blocks, size_t
     peer_read_counter << 1;
     PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
     auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, 
Slice(buffer.get(), size), &size,
-                                       io_ctx);
+                                       file_size, io_ctx);
     if (!st.ok()) {
         LOG_WARNING("PeerFileCacheReader read from peer failed")
                 .tag("host", host)
@@ -252,19 +254,24 @@ Status CachedRemoteFileReader::_execute_remote_read(const 
std::vector<FileBlockS
             return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
         } else {
             return execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                     _is_doris_table, stats, io_ctx);
+                                     this->size(), _is_doris_table, stats, 
io_ctx);
         }
     });
 
-    if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
+    if (!doris::config::is_cloud_mode() || !_is_doris_table || 
io_ctx->is_warmup ||
+        !doris::config::enable_cache_read_from_peer) {
         return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
     } else {
         // first try peer read, if peer failed, fallback to S3
         // peer timeout is 5 seconds
         // TODO(dx): here peer and s3 reader need to get data in parallel, and 
take the one that is correct and returns first
+        // ATTN: Save original size before peer read, as it may be modified by 
fetch_blocks, read peer ref size
+        size_t original_size = size;
         auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                    _is_doris_table, stats, io_ctx);
+                                    this->size(), _is_doris_table, stats, 
io_ctx);
         if (!st.ok()) {
+            // Restore original size for S3 fallback, as peer read may have 
modified it
+            size = original_size;
             // Fallback to S3
             return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
         }
diff --git a/be/src/io/cache/peer_file_cache_reader.cpp 
b/be/src/io/cache/peer_file_cache_reader.cpp
index c034cdce110..adb867f5c31 100644
--- a/be/src/io/cache/peer_file_cache_reader.cpp
+++ b/be/src/io/cache/peer_file_cache_reader.cpp
@@ -66,11 +66,12 @@ PeerFileCacheReader::~PeerFileCacheReader() {
 }
 
 Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& 
blocks, size_t off,
-                                         Slice s, size_t* bytes_read, const 
IOContext* ctx) {
+                                         Slice s, size_t* bytes_read, size_t 
file_size,
+                                         const IOContext* ctx) {
     VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
                << " bytes_read=" << *bytes_read;
-    *bytes_read = 0;
     if (blocks.empty()) {
+        *bytes_read = 0;
         return Status::OK();
     }
     if (!_is_doris_table) {
@@ -80,6 +81,7 @@ Status PeerFileCacheReader::fetch_blocks(const 
std::vector<FileBlockSPtr>& block
     PFetchPeerDataRequest req;
     req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
     req.set_path(_path.filename().native());
+    req.set_file_size(static_cast<int64_t>(file_size));
     for (const auto& blk : blocks) {
         auto* cb = req.add_cache_req();
         cb->set_block_offset(static_cast<int64_t>(blk->range().left));
@@ -154,13 +156,13 @@ Status PeerFileCacheReader::fetch_blocks(const 
std::vector<FileBlockSPtr>& block
     }
     VLOG_DEBUG << "peer cache read filled=" << filled;
     peer_bytes_read_total << filled;
-    *bytes_read = filled;
     peer_bytes_per_read << filled;
     if (filled != s.size) {
         peer_cache_reader_failed_counter << 1;
         return Status::InternalError("peer cache read incomplete: need={}, 
got={}", s.size, filled);
     }
     peer_cache_reader_succ_counter << 1;
+    *bytes_read = filled;
     return Status::OK();
 }
 
diff --git a/be/src/io/cache/peer_file_cache_reader.h 
b/be/src/io/cache/peer_file_cache_reader.h
index f028bc2cd91..49fe56336a2 100644
--- a/be/src/io/cache/peer_file_cache_reader.h
+++ b/be/src/io/cache/peer_file_cache_reader.h
@@ -61,7 +61,8 @@ public:
      * - blocks: List of file blocks to fetch (global file offsets, inclusive 
ranges).
      * - off: Base file offset corresponding to the start of Slice s.
      * - s: Destination buffer; must be large enough to hold all requested 
block bytes.
-     * - n: Output number of bytes successfully written.
+     * - bytes_read: Output number of bytes read.
+     * - file_size: Size of the file to be read.
      * - ctx: IO context (kept for interface symmetry).
      *
      * Returns:
@@ -69,7 +70,7 @@ public:
      * - NotSupported: The file is not a Doris table segment.
      */
     Status fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off, 
Slice s,
-                        size_t* bytes_read, const IOContext* ctx);
+                        size_t* bytes_read, size_t file_size, const IOContext* 
ctx);
 
 private:
     io::Path _path;
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index 56eace67c8e..37eb309d866 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -215,6 +215,8 @@ suite('test_balance_warm_up', 'docker') {
         // test expired be tablet cache info be removed
         // after cache_read_from_peer_expired_seconds = 100s
         assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"balance_tablet_be_mapping_size"))
+        assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"cached_remote_reader_peer_read"))
+        assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"cached_remote_reader_s3_read"))
     }
 
     docker(options) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to