This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7a8804f8ba3 [fix](cloud) Fix read peer file cache some bugs (#57838)
7a8804f8ba3 is described below
commit 7a8804f8ba38c5ec5187a808a1237e1a5d464eea
Author: deardeng <[email protected]>
AuthorDate: Tue Nov 11 16:38:43 2025 +0800
[fix](cloud) Fix read peer file cache some bugs (#57838)
1. Fixed an issue where reading from peer back to S3 resulted in an
incorrect size (ref parameter passing).
2. Fixed a large number of reads from peer during warm-up.
3. Fixed block meta Inaccurate, this caused an error in reading data with a
last block size less than 1m.
---
be/src/cloud/cloud_internal_service.cpp | 21 +++++++++++++++------
be/src/io/cache/block_file_cache_downloader.cpp | 1 +
be/src/io/cache/cached_remote_file_reader.cpp | 18 ++++++++++++------
be/src/io/cache/peer_file_cache_reader.cpp | 8 +++++---
be/src/io/cache/peer_file_cache_reader.h | 5 +++--
.../cloud_p0/balance/test_balance_warm_up.groovy | 2 ++
6 files changed, 38 insertions(+), 17 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 5363fc7479a..2584ce8146b 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -187,10 +187,20 @@ void set_error_response(PFetchPeerDataResponse* response,
const std::string& err
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
}
-Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
+Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
size_t file_size,
doris::CacheBlockPB* output) {
std::string data;
- data.resize(file_block->range().size());
+ // ATTN: calculate the rightmost boundary value of the block, due to
inaccurate current block meta information.
+ // see CachedRemoteFileReader::read_at_impl for more details.
+ // Ensure file_size >= file_block->offset() to avoid underflow
+ if (file_size < file_block->offset()) {
+ LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset("
+ << file_block->offset() << ")";
+ return Status::InternalError<false>("file_size less than block
offset");
+ }
+ size_t read_size = std::min(static_cast<size_t>(file_size -
file_block->offset()),
+ file_block->range().size());
+ data.resize(read_size);
auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
@@ -206,6 +216,8 @@ Status read_file_block(const
std::shared_ptr<io::FileBlock>& file_block,
g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts -
begin_read_file_ts);
if (read_st.ok()) {
+ output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+ output->set_block_size(static_cast<int64_t>(read_size));
output->set_data(std::move(data));
return Status::OK();
} else {
@@ -245,10 +257,7 @@ Status handle_peer_file_cache_block_request(const
PFetchPeerDataRequest* request
g_file_cache_get_by_peer_blocks_num << 1;
doris::CacheBlockPB* out = response->add_datas();
- out->set_block_offset(static_cast<int64_t>(fb->offset()));
- out->set_block_size(static_cast<int64_t>(fb->range().size()));
-
- Status read_status = read_file_block(fb, out);
+ Status read_status = read_file_block(fb, request->file_size(),
out);
if (!read_status.ok()) {
set_error_response(response, "read cache file error");
return read_status;
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index a529a8df625..27f046140b3 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -284,6 +284,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
.is_index_data = meta.cache_type() ==
::doris::FileCacheType::INDEX,
.expiration_time = meta.expiration_time(),
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ .is_warmup = true,
},
.download_done = std::move(download_done),
};
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index cce96adb3e5..0fdcce45c8a 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -192,10 +192,12 @@ std::pair<std::string, int>
get_peer_connection_info(const std::string& file_pat
}
// Execute peer read with fallback to S3
+// file_size is the size of the file
+// used to calculate the rightmost boundary value of the block, due to
inaccurate current block meta information.
Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
size_t& size, std::unique_ptr<char[]>& buffer,
- const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
- const IOContext* io_ctx) {
+ const std::string& file_path, size_t file_size, bool
is_doris_table,
+ ReadStatistics& stats, const IOContext* io_ctx) {
auto [host, port] = get_peer_connection_info(file_path);
VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ",
port=" << port
<< ", file_path=" << file_path;
@@ -211,7 +213,7 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>&
empty_blocks, size_t
peer_read_counter << 1;
PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
auto st = peer_reader.fetch_blocks(empty_blocks, empty_start,
Slice(buffer.get(), size), &size,
- io_ctx);
+ file_size, io_ctx);
if (!st.ok()) {
LOG_WARNING("PeerFileCacheReader read from peer failed")
.tag("host", host)
@@ -253,19 +255,23 @@ Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockS
return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
} else {
return execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
- _is_doris_table, stats, io_ctx);
+ this->size(), _is_doris_table, stats,
io_ctx);
}
});
- if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
+ if (!_is_doris_table || io_ctx->is_warmup ||
!doris::config::enable_cache_read_from_peer) {
return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
} else {
// first try peer read, if peer failed, fallback to S3
// peer timeout is 5 seconds
// TODO(dx): here peer and s3 reader need to get data in parallel, and
take the one that is correct and returns first
+ // ATTN: Save original size before peer read, as it may be modified by
fetch_blocks, read peer ref size
+ size_t original_size = size;
auto st = execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
- _is_doris_table, stats, io_ctx);
+ this->size(), _is_doris_table, stats,
io_ctx);
if (!st.ok()) {
+ // Restore original size for S3 fallback, as peer read may have
modified it
+ size = original_size;
// Fallback to S3
return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
}
diff --git a/be/src/io/cache/peer_file_cache_reader.cpp
b/be/src/io/cache/peer_file_cache_reader.cpp
index a3ba09cb2b4..255a2629b1a 100644
--- a/be/src/io/cache/peer_file_cache_reader.cpp
+++ b/be/src/io/cache/peer_file_cache_reader.cpp
@@ -66,11 +66,12 @@ PeerFileCacheReader::~PeerFileCacheReader() {
}
Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>&
blocks, size_t off,
- Slice s, size_t* bytes_read, const
IOContext* ctx) {
+ Slice s, size_t* bytes_read, size_t
file_size,
+ const IOContext* ctx) {
VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
<< " bytes_read=" << *bytes_read;
- *bytes_read = 0;
if (blocks.empty()) {
+ *bytes_read = 0;
return Status::OK();
}
if (!_is_doris_table) {
@@ -80,6 +81,7 @@ Status PeerFileCacheReader::fetch_blocks(const
std::vector<FileBlockSPtr>& block
PFetchPeerDataRequest req;
req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
req.set_path(_path.filename().native());
+ req.set_file_size(static_cast<int64_t>(file_size));
for (const auto& blk : blocks) {
auto* cb = req.add_cache_req();
cb->set_block_offset(static_cast<int64_t>(blk->range().left));
@@ -154,7 +156,6 @@ Status PeerFileCacheReader::fetch_blocks(const
std::vector<FileBlockSPtr>& block
}
VLOG_DEBUG << "peer cache read filled=" << filled;
peer_bytes_read_total << filled;
- *bytes_read = filled;
peer_bytes_per_read << filled;
if (filled != s.size) {
peer_cache_reader_failed_counter << 1;
@@ -162,6 +163,7 @@ Status PeerFileCacheReader::fetch_blocks(const
std::vector<FileBlockSPtr>& block
filled);
}
peer_cache_reader_succ_counter << 1;
+ *bytes_read = filled;
return Status::OK();
}
diff --git a/be/src/io/cache/peer_file_cache_reader.h
b/be/src/io/cache/peer_file_cache_reader.h
index f028bc2cd91..49fe56336a2 100644
--- a/be/src/io/cache/peer_file_cache_reader.h
+++ b/be/src/io/cache/peer_file_cache_reader.h
@@ -61,7 +61,8 @@ public:
* - blocks: List of file blocks to fetch (global file offsets, inclusive
ranges).
* - off: Base file offset corresponding to the start of Slice s.
* - s: Destination buffer; must be large enough to hold all requested
block bytes.
- * - n: Output number of bytes successfully written.
+ * - bytes_read: Output number of bytes read.
+ * - file_size: Size of the file to be read.
* - ctx: IO context (kept for interface symmetry).
*
* Returns:
@@ -69,7 +70,7 @@ public:
* - NotSupported: The file is not a Doris table segment.
*/
Status fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off,
Slice s,
- size_t* bytes_read, const IOContext* ctx);
+ size_t* bytes_read, size_t file_size, const IOContext*
ctx);
private:
io::Path _path;
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index 56eace67c8e..37eb309d866 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -215,6 +215,8 @@ suite('test_balance_warm_up', 'docker') {
// test expired be tablet cache info be removed
// after cache_read_from_peer_expired_seconds = 100s
assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"balance_tablet_be_mapping_size"))
+ assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_peer_read"))
+ assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_s3_read"))
}
docker(options) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]