This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8804f8ba3 [fix](cloud) Fix read peer file cache some bugs (#57838)
7a8804f8ba3 is described below

commit 7a8804f8ba38c5ec5187a808a1237e1a5d464eea
Author: deardeng <[email protected]>
AuthorDate: Tue Nov 11 16:38:43 2025 +0800

    [fix](cloud) Fix read peer file cache some bugs (#57838)
    
    1. Fixed an issue where reading from peer back to S3 resulted in an 
incorrect size (ref parameter passing).
    2. Fixed a large number of reads from peer during warm-up.
    3. Fixed block meta Inaccurate, this caused an error in reading data with a 
last block size less than 1m.
---
 be/src/cloud/cloud_internal_service.cpp             | 21 +++++++++++++++------
 be/src/io/cache/block_file_cache_downloader.cpp     |  1 +
 be/src/io/cache/cached_remote_file_reader.cpp       | 18 ++++++++++++------
 be/src/io/cache/peer_file_cache_reader.cpp          |  8 +++++---
 be/src/io/cache/peer_file_cache_reader.h            |  5 +++--
 .../cloud_p0/balance/test_balance_warm_up.groovy    |  2 ++
 6 files changed, 38 insertions(+), 17 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 5363fc7479a..2584ce8146b 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -187,10 +187,20 @@ void set_error_response(PFetchPeerDataResponse* response, 
const std::string& err
     response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
 }
 
-Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
+Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block, 
size_t file_size,
                        doris::CacheBlockPB* output) {
     std::string data;
-    data.resize(file_block->range().size());
+    // ATTN: calculate the rightmost boundary value of the block, due to 
inaccurate current block meta information.
+    // see CachedRemoteFileReader::read_at_impl for more details.
+    // Ensure file_size >= file_block->offset() to avoid underflow
+    if (file_size < file_block->offset()) {
+        LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset("
+                     << file_block->offset() << ")";
+        return Status::InternalError<false>("file_size less than block 
offset");
+    }
+    size_t read_size = std::min(static_cast<size_t>(file_size - 
file_block->offset()),
+                                file_block->range().size());
+    data.resize(read_size);
 
     auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
                                       
std::chrono::steady_clock::now().time_since_epoch())
@@ -206,6 +216,8 @@ Status read_file_block(const 
std::shared_ptr<io::FileBlock>& file_block,
     g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - 
begin_read_file_ts);
 
     if (read_st.ok()) {
+        output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+        output->set_block_size(static_cast<int64_t>(read_size));
         output->set_data(std::move(data));
         return Status::OK();
     } else {
@@ -245,10 +257,7 @@ Status handle_peer_file_cache_block_request(const 
PFetchPeerDataRequest* request
 
             g_file_cache_get_by_peer_blocks_num << 1;
             doris::CacheBlockPB* out = response->add_datas();
-            out->set_block_offset(static_cast<int64_t>(fb->offset()));
-            out->set_block_size(static_cast<int64_t>(fb->range().size()));
-
-            Status read_status = read_file_block(fb, out);
+            Status read_status = read_file_block(fb, request->file_size(), 
out);
             if (!read_status.ok()) {
                 set_error_response(response, "read cache file error");
                 return read_status;
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index a529a8df625..27f046140b3 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -284,6 +284,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
                                 .is_index_data = meta.cache_type() == 
::doris::FileCacheType::INDEX,
                                 .expiration_time = meta.expiration_time(),
                                 .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                                .is_warmup = true,
                         },
                 .download_done = std::move(download_done),
         };
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index cce96adb3e5..0fdcce45c8a 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -192,10 +192,12 @@ std::pair<std::string, int> 
get_peer_connection_info(const std::string& file_pat
 }
 
 // Execute peer read with fallback to S3
+// file_size is the size of the file
+// used to calculate the rightmost boundary value of the block, due to 
inaccurate current block meta information.
 Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, 
size_t empty_start,
                          size_t& size, std::unique_ptr<char[]>& buffer,
-                         const std::string& file_path, bool is_doris_table, 
ReadStatistics& stats,
-                         const IOContext* io_ctx) {
+                         const std::string& file_path, size_t file_size, bool 
is_doris_table,
+                         ReadStatistics& stats, const IOContext* io_ctx) {
     auto [host, port] = get_peer_connection_info(file_path);
     VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", 
port=" << port
                << ", file_path=" << file_path;
@@ -211,7 +213,7 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>& 
empty_blocks, size_t
     peer_read_counter << 1;
     PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
     auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, 
Slice(buffer.get(), size), &size,
-                                       io_ctx);
+                                       file_size, io_ctx);
     if (!st.ok()) {
         LOG_WARNING("PeerFileCacheReader read from peer failed")
                 .tag("host", host)
@@ -253,19 +255,23 @@ Status CachedRemoteFileReader::_execute_remote_read(const 
std::vector<FileBlockS
             return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
         } else {
             return execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                     _is_doris_table, stats, io_ctx);
+                                     this->size(), _is_doris_table, stats, 
io_ctx);
         }
     });
 
-    if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
+    if (!_is_doris_table || io_ctx->is_warmup || 
!doris::config::enable_cache_read_from_peer) {
         return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
     } else {
         // first try peer read, if peer failed, fallback to S3
         // peer timeout is 5 seconds
         // TODO(dx): here peer and s3 reader need to get data in parallel, and 
take the one that is correct and returns first
+        // ATTN: Save original size before peer read, as it may be modified by 
fetch_blocks, read peer ref size
+        size_t original_size = size;
         auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, 
path().native(),
-                                    _is_doris_table, stats, io_ctx);
+                                    this->size(), _is_doris_table, stats, 
io_ctx);
         if (!st.ok()) {
+            // Restore original size for S3 fallback, as peer read may have 
modified it
+            size = original_size;
             // Fallback to S3
             return execute_s3_read(empty_start, size, buffer, stats, io_ctx, 
_remote_file_reader);
         }
diff --git a/be/src/io/cache/peer_file_cache_reader.cpp 
b/be/src/io/cache/peer_file_cache_reader.cpp
index a3ba09cb2b4..255a2629b1a 100644
--- a/be/src/io/cache/peer_file_cache_reader.cpp
+++ b/be/src/io/cache/peer_file_cache_reader.cpp
@@ -66,11 +66,12 @@ PeerFileCacheReader::~PeerFileCacheReader() {
 }
 
 Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>& 
blocks, size_t off,
-                                         Slice s, size_t* bytes_read, const 
IOContext* ctx) {
+                                         Slice s, size_t* bytes_read, size_t 
file_size,
+                                         const IOContext* ctx) {
     VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
                << " bytes_read=" << *bytes_read;
-    *bytes_read = 0;
     if (blocks.empty()) {
+        *bytes_read = 0;
         return Status::OK();
     }
     if (!_is_doris_table) {
@@ -80,6 +81,7 @@ Status PeerFileCacheReader::fetch_blocks(const 
std::vector<FileBlockSPtr>& block
     PFetchPeerDataRequest req;
     req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
     req.set_path(_path.filename().native());
+    req.set_file_size(static_cast<int64_t>(file_size));
     for (const auto& blk : blocks) {
         auto* cb = req.add_cache_req();
         cb->set_block_offset(static_cast<int64_t>(blk->range().left));
@@ -154,7 +156,6 @@ Status PeerFileCacheReader::fetch_blocks(const 
std::vector<FileBlockSPtr>& block
     }
     VLOG_DEBUG << "peer cache read filled=" << filled;
     peer_bytes_read_total << filled;
-    *bytes_read = filled;
     peer_bytes_per_read << filled;
     if (filled != s.size) {
         peer_cache_reader_failed_counter << 1;
@@ -162,6 +163,7 @@ Status PeerFileCacheReader::fetch_blocks(const 
std::vector<FileBlockSPtr>& block
                                             filled);
     }
     peer_cache_reader_succ_counter << 1;
+    *bytes_read = filled;
     return Status::OK();
 }
 
diff --git a/be/src/io/cache/peer_file_cache_reader.h 
b/be/src/io/cache/peer_file_cache_reader.h
index f028bc2cd91..49fe56336a2 100644
--- a/be/src/io/cache/peer_file_cache_reader.h
+++ b/be/src/io/cache/peer_file_cache_reader.h
@@ -61,7 +61,8 @@ public:
      * - blocks: List of file blocks to fetch (global file offsets, inclusive 
ranges).
      * - off: Base file offset corresponding to the start of Slice s.
      * - s: Destination buffer; must be large enough to hold all requested 
block bytes.
-     * - n: Output number of bytes successfully written.
+     * - bytes_read: Output number of bytes read.
+     * - file_size: Size of the file to be read.
      * - ctx: IO context (kept for interface symmetry).
      *
      * Returns:
@@ -69,7 +70,7 @@ public:
      * - NotSupported: The file is not a Doris table segment.
      */
     Status fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off, 
Slice s,
-                        size_t* bytes_read, const IOContext* ctx);
+                        size_t* bytes_read, size_t file_size, const IOContext* 
ctx);
 
 private:
     io::Path _path;
diff --git 
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy 
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index 56eace67c8e..37eb309d866 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -215,6 +215,8 @@ suite('test_balance_warm_up', 'docker') {
         // test expired be tablet cache info be removed
         // after cache_read_from_peer_expired_seconds = 100s
         assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"balance_tablet_be_mapping_size"))
+        assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"cached_remote_reader_peer_read"))
+        assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort, 
"cached_remote_reader_s3_read"))
     }
 
     docker(options) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to