This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 87badcdff31 branch-4.0: [fix](cloud) Fix read peer file cache some
bugs #57838 #58063 (#58627)
87badcdff31 is described below
commit 87badcdff31f22ce006a802e019e57a53d25395d
Author: deardeng <[email protected]>
AuthorDate: Wed Dec 3 09:25:44 2025 +0800
branch-4.0: [fix](cloud) Fix read peer file cache some bugs #57838 #58063
(#58627)
cherry pick from #57838 #58063
---
be/src/cloud/cloud_internal_service.cpp | 234 +++++++++++++--------
be/src/io/cache/block_file_cache_downloader.cpp | 1 +
be/src/io/cache/cached_remote_file_reader.cpp | 19 +-
be/src/io/cache/peer_file_cache_reader.cpp | 8 +-
be/src/io/cache/peer_file_cache_reader.h | 5 +-
.../cloud_p0/balance/test_balance_warm_up.groovy | 2 +
6 files changed, 166 insertions(+), 103 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 94bb951b95d..2584ce8146b 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -170,110 +170,160 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
<< ", response=" << response->DebugString();
}
+namespace {
+// Helper functions for fetch_peer_data
+
+Status handle_peer_file_range_request(const std::string& path,
PFetchPeerDataResponse* response) {
+ // Read specific range [file_offset, file_offset+file_size) across cached
blocks
+ auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
+ for (auto& cb : datas) {
+ *(response->add_datas()) = std::move(cb);
+ }
+ return Status::OK();
+}
+
+void set_error_response(PFetchPeerDataResponse* response, const std::string&
error_msg) {
+ response->mutable_status()->add_error_msgs(error_msg);
+ response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+}
+
+Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block,
size_t file_size,
+ doris::CacheBlockPB* output) {
+ std::string data;
+ // ATTN: calculate the rightmost boundary value of the block, due to
inaccurate current block meta information.
+ // see CachedRemoteFileReader::read_at_impl for more details.
+ // Ensure file_size >= file_block->offset() to avoid underflow
+ if (file_size < file_block->offset()) {
+ LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset("
+ << file_block->offset() << ")";
+ return Status::InternalError<false>("file_size less than block
offset");
+ }
+ size_t read_size = std::min(static_cast<size_t>(file_size -
file_block->offset()),
+ file_block->range().size());
+ data.resize(read_size);
+
+ auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ Slice slice(data.data(), data.size());
+ Status read_st = file_block->read(slice, /*read_offset=*/0);
+
+ auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts -
begin_read_file_ts);
+
+ if (read_st.ok()) {
+ output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+ output->set_block_size(static_cast<int64_t>(read_size));
+ output->set_data(std::move(data));
+ return Status::OK();
+ } else {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed: " << read_st;
+ return read_st;
+ }
+}
+
+Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest*
request,
+ PFetchPeerDataResponse* response) {
+ const auto& path = request->path();
+ auto hash = io::BlockFileCache::hash(path);
+ auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
+ if (cache == nullptr) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ set_error_response(response, "can't get file cache instance");
+ return Status::InternalError<false>("can't get file cache instance");
+ }
+
+ io::CacheContext ctx {};
+ io::ReadStatistics local_stats;
+ ctx.stats = &local_stats;
+
+ for (const auto& cb_req : request->cache_req()) {
+ size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
+ size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
+ auto holder = cache->get_or_set(hash, offset, size, ctx);
+
+ for (auto& fb : holder.file_blocks) {
+ if (fb->state() != io::FileBlock::State::DOWNLOADED) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed, state=" <<
fb->state();
+ set_error_response(response, "read cache file error");
+ return Status::InternalError<false>("cache block not
downloaded");
+ }
+
+ g_file_cache_get_by_peer_blocks_num << 1;
+ doris::CacheBlockPB* out = response->add_datas();
+ Status read_status = read_file_block(fb, request->file_size(),
out);
+ if (!read_status.ok()) {
+ set_error_response(response, "read cache file error");
+ return read_status;
+ }
+ }
+ }
+
+ return Status::OK();
+}
+} // namespace
+
void
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController*
controller
[[maybe_unused]],
const PFetchPeerDataRequest*
request,
PFetchPeerDataResponse*
response,
google::protobuf::Closure*
done) {
- // TODO(dx): use async thread pool to handle the request, not AsyncIO
- brpc::ClosureGuard closure_guard(done);
- g_file_cache_get_by_peer_num << 1;
- if (!config::enable_file_cache) {
- LOG_WARNING("try to access file cache data, but file cache not
enabled");
- return;
- }
- int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- const auto type = request->type();
- const auto& path = request->path();
- response->mutable_status()->set_status_code(TStatusCode::OK);
- if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
- // Read specific range [file_offset, file_offset+file_size) across
cached blocks
- auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
- for (auto& cb : datas) {
- *(response->add_datas()) = std::move(cb);
- }
- } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
- // Multiple specific blocks
- auto hash = io::BlockFileCache::hash(path);
- auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
- if (cache == nullptr) {
- g_file_cache_get_by_peer_failed_num << 1;
- response->mutable_status()->add_error_msgs("can't get file cache
instance");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ g_file_cache_get_by_peer_num << 1;
+
+ if (!config::enable_file_cache) {
+ LOG_WARNING("try to access file cache data, but file cache not
enabled");
return;
}
- io::CacheContext ctx {};
- // ensure a valid stats pointer is provided to cache layer
- io::ReadStatistics local_stats;
- ctx.stats = &local_stats;
- for (const auto& cb_req : request->cache_req()) {
- size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
- size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
- auto holder = cache->get_or_set(hash, offset, size, ctx);
- for (auto& fb : holder.file_blocks) {
- auto state = fb->state();
- if (state != io::FileBlock::State::DOWNLOADED) {
- g_file_cache_get_by_peer_failed_num << 1;
- LOG(WARNING) << "read cache block failed, state=" << state;
- response->mutable_status()->add_error_msgs("read cache
file error");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
- return;
- }
- g_file_cache_get_by_peer_blocks_num << 1;
- doris::CacheBlockPB* out = response->add_datas();
- out->set_block_offset(static_cast<int64_t>(fb->offset()));
- out->set_block_size(static_cast<int64_t>(fb->range().size()));
- std::string data;
- data.resize(fb->range().size());
- // Offload the file block read to a dedicated OS thread to
avoid bthread IO
- Status read_st = Status::OK();
- // due to file_reader.cpp:33] Check failed: bthread_self() == 0
- int64_t begin_read_file_ts =
- std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- auto task = [&] {
- // Current thread not exist ThreadContext, usually after
the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext
and bind a Task.
-
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
- Slice slice(data.data(), data.size());
- read_st = fb->read(slice, /*read_offset=*/0);
- };
- AsyncIO::run_task(task, io::FileSystemType::LOCAL);
- int64_t end_read_file_ts =
- std::chrono::duration_cast<std::chrono::microseconds>(
+
+ auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
- g_file_cache_get_by_peer_read_cache_file_latency
- << (end_read_file_ts - begin_read_file_ts);
- if (read_st.ok()) {
- out->set_data(std::move(data));
- } else {
- g_file_cache_get_by_peer_failed_num << 1;
- LOG(WARNING) << "read cache block failed: " << read_st;
- response->mutable_status()->add_error_msgs("read cache
file error");
-
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
- return;
- }
- }
+
+ const auto type = request->type();
+ const auto& path = request->path();
+ response->mutable_status()->set_status_code(TStatusCode::OK);
+
+ Status status = Status::OK();
+ if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
+ status = handle_peer_file_range_request(path, response);
+ } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
+ status = handle_peer_file_cache_block_request(request, response);
}
- }
- DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
- int st_us = dp->param<int>("sleep", 1000);
-
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep",
st_us);
- // sleep us
- bthread_usleep(st_us);
- });
- int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
- g_file_cache_get_by_peer_success_num << 1;
+ if (!status.ok()) {
+ LOG(WARNING) << "fetch peer data failed: " << status.to_string();
+ set_error_response(response, status.to_string());
+ }
- VLOG_DEBUG << "fetch cache request=" << request->DebugString()
- << ", response=" << response->DebugString();
+ DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
+ int st_us = dp->param<int>("sleep", 1000);
+
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep",
st_us);
+ bthread_usleep(st_us);
+ });
+
+ auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
+ g_file_cache_get_by_peer_success_num << 1;
+
+ VLOG_DEBUG << "fetch cache request=" << request->DebugString()
+ << ", response=" << response->DebugString();
+ });
+
+ if (!ret) {
+ brpc::ClosureGuard closure_guard(done);
+ LOG(WARNING) << "fail to offer fetch peer data request to the work
pool, pool="
+ << _heavy_work_pool.get_info();
+ }
}
#include "common/compile_check_end.h"
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index 851ff386e4d..ba28d9f3479 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -284,6 +284,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
.is_index_data = meta.cache_type() ==
::doris::FileCacheType::INDEX,
.expiration_time = meta.expiration_time(),
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ .is_warmup = true,
},
.download_done = std::move(download_done),
};
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index 59ed51a6f6b..240515e94e1 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -191,10 +191,12 @@ std::pair<std::string, int>
get_peer_connection_info(const std::string& file_pat
}
// Execute peer read with fallback to S3
+// file_size is the size of the file
+// used to calculate the rightmost boundary value of the block, due to
inaccurate current block meta information.
Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
size_t& size, std::unique_ptr<char[]>& buffer,
- const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
- const IOContext* io_ctx) {
+ const std::string& file_path, size_t file_size, bool
is_doris_table,
+ ReadStatistics& stats, const IOContext* io_ctx) {
auto [host, port] = get_peer_connection_info(file_path);
VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ",
port=" << port
<< ", file_path=" << file_path;
@@ -210,7 +212,7 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>&
empty_blocks, size_t
peer_read_counter << 1;
PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
auto st = peer_reader.fetch_blocks(empty_blocks, empty_start,
Slice(buffer.get(), size), &size,
- io_ctx);
+ file_size, io_ctx);
if (!st.ok()) {
LOG_WARNING("PeerFileCacheReader read from peer failed")
.tag("host", host)
@@ -252,19 +254,24 @@ Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockS
return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
} else {
return execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
- _is_doris_table, stats, io_ctx);
+ this->size(), _is_doris_table, stats,
io_ctx);
}
});
- if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
+ if (!doris::config::is_cloud_mode() || !_is_doris_table ||
io_ctx->is_warmup ||
+ !doris::config::enable_cache_read_from_peer) {
return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
} else {
// first try peer read, if peer failed, fallback to S3
// peer timeout is 5 seconds
// TODO(dx): here peer and s3 reader need to get data in parallel, and
take the one that is correct and returns first
+ // ATTN: Save original size before peer read, as it may be modified by
fetch_blocks, read peer ref size
+ size_t original_size = size;
auto st = execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
- _is_doris_table, stats, io_ctx);
+ this->size(), _is_doris_table, stats,
io_ctx);
if (!st.ok()) {
+ // Restore original size for S3 fallback, as peer read may have
modified it
+ size = original_size;
// Fallback to S3
return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
}
diff --git a/be/src/io/cache/peer_file_cache_reader.cpp
b/be/src/io/cache/peer_file_cache_reader.cpp
index c034cdce110..adb867f5c31 100644
--- a/be/src/io/cache/peer_file_cache_reader.cpp
+++ b/be/src/io/cache/peer_file_cache_reader.cpp
@@ -66,11 +66,12 @@ PeerFileCacheReader::~PeerFileCacheReader() {
}
Status PeerFileCacheReader::fetch_blocks(const std::vector<FileBlockSPtr>&
blocks, size_t off,
- Slice s, size_t* bytes_read, const
IOContext* ctx) {
+ Slice s, size_t* bytes_read, size_t
file_size,
+ const IOContext* ctx) {
VLOG_DEBUG << "enter PeerFileCacheReader::fetch_blocks, off=" << off
<< " bytes_read=" << *bytes_read;
- *bytes_read = 0;
if (blocks.empty()) {
+ *bytes_read = 0;
return Status::OK();
}
if (!_is_doris_table) {
@@ -80,6 +81,7 @@ Status PeerFileCacheReader::fetch_blocks(const
std::vector<FileBlockSPtr>& block
PFetchPeerDataRequest req;
req.set_type(PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK);
req.set_path(_path.filename().native());
+ req.set_file_size(static_cast<int64_t>(file_size));
for (const auto& blk : blocks) {
auto* cb = req.add_cache_req();
cb->set_block_offset(static_cast<int64_t>(blk->range().left));
@@ -154,13 +156,13 @@ Status PeerFileCacheReader::fetch_blocks(const
std::vector<FileBlockSPtr>& block
}
VLOG_DEBUG << "peer cache read filled=" << filled;
peer_bytes_read_total << filled;
- *bytes_read = filled;
peer_bytes_per_read << filled;
if (filled != s.size) {
peer_cache_reader_failed_counter << 1;
return Status::InternalError("peer cache read incomplete: need={},
got={}", s.size, filled);
}
peer_cache_reader_succ_counter << 1;
+ *bytes_read = filled;
return Status::OK();
}
diff --git a/be/src/io/cache/peer_file_cache_reader.h
b/be/src/io/cache/peer_file_cache_reader.h
index f028bc2cd91..49fe56336a2 100644
--- a/be/src/io/cache/peer_file_cache_reader.h
+++ b/be/src/io/cache/peer_file_cache_reader.h
@@ -61,7 +61,8 @@ public:
* - blocks: List of file blocks to fetch (global file offsets, inclusive
ranges).
* - off: Base file offset corresponding to the start of Slice s.
* - s: Destination buffer; must be large enough to hold all requested
block bytes.
- * - n: Output number of bytes successfully written.
+ * - bytes_read: Output number of bytes read.
+ * - file_size: Size of the file to be read.
* - ctx: IO context (kept for interface symmetry).
*
* Returns:
@@ -69,7 +70,7 @@ public:
* - NotSupported: The file is not a Doris table segment.
*/
Status fetch_blocks(const std::vector<FileBlockSPtr>& blocks, size_t off,
Slice s,
- size_t* bytes_read, const IOContext* ctx);
+ size_t* bytes_read, size_t file_size, const IOContext*
ctx);
private:
io::Path _path;
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
index 56eace67c8e..37eb309d866 100644
--- a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -215,6 +215,8 @@ suite('test_balance_warm_up', 'docker') {
// test expired be tablet cache info be removed
// after cache_read_from_peer_expired_seconds = 100s
assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"balance_tablet_be_mapping_size"))
+ assert(0 == getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_peer_read"))
+ assert(0 != getBrpcMetrics(newAddBe.Host, newAddBe.BrpcPort,
"cached_remote_reader_s3_read"))
}
docker(options) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]