github-actions[bot] commented on code in PR #63818:
URL: https://github.com/apache/doris/pull/63818#discussion_r3393164064
##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -237,38 +401,228 @@ Status read_file_block(const
std::shared_ptr<io::FileBlock>& file_block, size_t
}
size_t read_size = std::min(static_cast<size_t>(file_size -
file_block->offset()),
file_block->range().size());
- data.resize(read_size);
+ output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+ output->set_block_size(static_cast<int64_t>(read_size));
+ if (read_size == 0) {
+ return Status::OK();
+ }
- auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ Status read_st = Status::OK();
+ // Attachment payload mode: protobuf carries metadata only, payload goes
to attachment.
+ // This allows FS cache to use a file-descriptor->IOBuf path directly.
+ if (response_attachment != nullptr) {
+ size_t bytes_read = 0;
+ auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ read_st = file_block->read_to_iobuf(response_attachment,
/*read_offset=*/0, read_size,
+ &bytes_read);
+ auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts
- begin_read_file_ts);
- SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
- Slice slice(data.data(), data.size());
- Status read_st = file_block->read(slice, /*read_offset=*/0);
+ if (read_st.ok()) {
+ if (bytes_read != read_size) {
+ return Status::InternalError<false>(
+ "peer cache read short data, expected={}, actual={}",
read_size,
+ bytes_read);
+ }
+ g_file_cache_get_by_peer_response_bytes_total << bytes_read;
+ return Status::OK();
+ }
+ } else {
+ std::string data;
+ data.resize(read_size);
+ auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ Slice slice(data.data(), data.size());
+ read_st = file_block->read(slice, /*read_offset=*/0);
+ auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts
- begin_read_file_ts);
+
+ if (read_st.ok()) {
+ auto set_data_start = std::chrono::steady_clock::now();
+ output->set_data(std::move(data));
+ set_data_us = elapsed_us(set_data_start);
+ g_file_cache_get_by_peer_response_bytes_total << read_size;
+ return Status::OK();
+ }
+ }
+
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed, file_size=" << file_size
+ << ", block=" << file_block->get_info_for_log()
+ << ", cache_file=" << file_block->get_cache_file() << ",
err=" << read_st;
+ return read_st;
+}
- auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts -
begin_read_file_ts);
+// Trigger S3 -> local cache fill for the given file block.
+// Returns OK when the block is DOWNLOADED after the fill.
+// Returns TOO_MANY_TASKS when the fill slot is exhausted (server healthy but
overloaded):
+// client should not rotate or evict, just fall back to S3 and retry same
candidate later.
+// Returns NOT_FOUND for soft misses (tablet not found, fill incomplete,
timeout):
+// client should rotate the candidate to try a different CG next time.
+// The peer uses request.path as the full remote path. tablet_id/filename are
kept for logging.
+Status trigger_peer_server_fill(io::FileBlockSPtr& fb, int64_t fill_tablet_id,
+ const std::string& filename, const
std::string& resource_id,
+ const std::string& remote_path, int64_t
file_size, int64_t offset,
+ int64_t size, int32_t timeout_ms) {
+ g_peer_server_fill_requested << 1;
+
+ // Concurrency guard: atomically reserve a fill slot.
+ // Excess requests are rejected so the client falls back to its own S3
read.
+ // Return NOT_FOUND so the client rotates the candidate instead of
evicting it.
+ if (g_active_server_fills.fetch_add(1, std::memory_order_relaxed) >=
+ config::max_concurrent_peer_server_fills) {
+ g_active_server_fills.fetch_sub(1, std::memory_order_relaxed);
+ g_peer_server_fill_rejected << 1;
+ VLOG_DEBUG << "trigger_peer_server_fill: rejected (concurrency limit "
+ << config::max_concurrent_peer_server_fills << "),
tablet_id=" << fill_tablet_id;
+ // TOO_MANY_TASKS: server is healthy but overloaded. Client must not
rotate or evict;
+ // just fall back to S3 for this request and retry the same candidate
next time.
+ return Status::Error<ErrorCode::TOO_MANY_TASKS, false>("fill slot
exhausted");
+ }
+ // RAII decrement: runs on every return path below.
+ Defer fill_guard {[]() { g_active_server_fills.fetch_sub(1,
std::memory_order_relaxed); }};
- if (read_st.ok()) {
- output->set_block_offset(static_cast<int64_t>(file_block->offset()));
- output->set_block_size(static_cast<int64_t>(read_size));
- output->set_data(std::move(data));
+ if (remote_path.empty() || resource_id.empty()) {
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ LOG(WARNING) << "trigger_peer_server_fill: missing remote_path or
resource_id, " << ctx;
+ g_peer_server_fill_rejected << 1;
+ return Status::NotFound<false>("fill: missing remote_path or
resource_id, {}", ctx);
+ }
+ auto storage_resource = doris::get_storage_resource(resource_id);
+ if (!storage_resource.has_value()) {
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ LOG(WARNING) << "trigger_peer_server_fill: storage resource not found,
" << ctx;
+ g_peer_server_fill_rejected << 1;
+ return Status::NotFound<false>("fill: storage resource not found, {}",
ctx);
+ }
+ auto fs = storage_resource->first.fs;
+
+ const auto initial_state = fb->state();
+ if (initial_state == io::FileBlock::State::DOWNLOADING) {
+ // Another thread already owns the block downloader. Wait up to the
request timeout instead
+ // of the shorter per-wait timeout in FileBlock::wait().
+ [[maybe_unused]] const bool completed = wait_for_file_block_state(fb,
timeout_ms);
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ return fb->state() == io::FileBlock::State::DOWNLOADED
+ ? Status::OK()
+ : Status::NotFound<false>("fill: concurrent download
incomplete, {}", ctx);
+ }
+ if (initial_state != io::FileBlock::State::EMPTY) {
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ return initial_state == io::FileBlock::State::DOWNLOADED
+ ? Status::OK()
+ : Status::NotFound<false>("fill: unexpected initial
block state, {}", ctx);
+ }
+
+ auto fill_start = std::chrono::steady_clock::now();
+ auto fill_done = std::make_shared<bthread::CountdownEvent>(1);
+ auto fill_status = std::make_shared<Status>(Status::OK());
+ io::DownloadFileMeta download_meta {
Review Comment:
This server-side pull-through fill path still bypasses packed-file metadata.
This is separate from the already raised segment/index remote-path concern:
even when `request.path` is the correct logical segment or index path,
`storage_resource->first.fs` is the plain object-store filesystem and cannot
map logical files into packed objects. Existing warm-up paths explicitly use
`RowsetMeta::fs()` because it wraps the physical FS with `PackedFileSystem`
from `packed_slice_locations`; the new peer-fill request only sends
`tablet_id/resource_id` plus a dummy rowset id, so this code cannot reconstruct
that wrapper. The new server-fill regression suites also set
`enable_packed_file=false`, while cloud defaults it to true. In normal
packed-file deployments a cold peer will fail this fill and fall back to the
requester's S3 read instead of filling/serving from the designated CG. Please
carry enough rowset metadata to reconstruct `RowsetMeta::fs()` or route the
fill through the existing tabl
et/rowset lookup path before opening the remote file.
##########
be/src/io/cache/peer_file_cache_reader.cpp:
##########
@@ -125,7 +234,9 @@ Status PeerFileCacheReader::fetch_blocks(const
std::vector<FileBlockSPtr>& block
}};
Review Comment:
The fill RPC timeout is hard-coded to 7000ms, but the server waits according
to the mutable `peer_server_cache_fill_timeout_ms` config. If an operator
raises that server-side fill timeout above roughly 7s to tolerate slow object
storage, the client will always time out first and treat the peer as an RPC
failure even though the server is still within its configured deadline. Please
derive the client timeout from `config::peer_server_cache_fill_timeout_ms` plus
a small margin when `request_fill` is true, instead of baking in the
default-only value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]