deardeng commented on code in PR #63818:
URL: https://github.com/apache/doris/pull/63818#discussion_r3393206965
##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -237,38 +401,228 @@ Status read_file_block(const
std::shared_ptr<io::FileBlock>& file_block, size_t
}
size_t read_size = std::min(static_cast<size_t>(file_size -
file_block->offset()),
file_block->range().size());
- data.resize(read_size);
+ output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+ output->set_block_size(static_cast<int64_t>(read_size));
+ if (read_size == 0) {
+ return Status::OK();
+ }
- auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
+ Status read_st = Status::OK();
+ // Attachment payload mode: protobuf carries metadata only, payload goes
to attachment.
+ // This allows FS cache to use a file-descriptor->IOBuf path directly.
+ if (response_attachment != nullptr) {
+ size_t bytes_read = 0;
+ auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ read_st = file_block->read_to_iobuf(response_attachment,
/*read_offset=*/0, read_size,
+ &bytes_read);
+ auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts
- begin_read_file_ts);
- SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
- Slice slice(data.data(), data.size());
- Status read_st = file_block->read(slice, /*read_offset=*/0);
+ if (read_st.ok()) {
+ if (bytes_read != read_size) {
+ return Status::InternalError<false>(
+ "peer cache read short data, expected={}, actual={}",
read_size,
+ bytes_read);
+ }
+ g_file_cache_get_by_peer_response_bytes_total << bytes_read;
+ return Status::OK();
+ }
+ } else {
+ std::string data;
+ data.resize(read_size);
+ auto begin_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ Slice slice(data.data(), data.size());
+ read_st = file_block->read(slice, /*read_offset=*/0);
+ auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts
- begin_read_file_ts);
+
+ if (read_st.ok()) {
+ auto set_data_start = std::chrono::steady_clock::now();
+ output->set_data(std::move(data));
+ set_data_us = elapsed_us(set_data_start);
+ g_file_cache_get_by_peer_response_bytes_total << read_size;
+ return Status::OK();
+ }
+ }
+
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed, file_size=" << file_size
+ << ", block=" << file_block->get_info_for_log()
+ << ", cache_file=" << file_block->get_cache_file() << ",
err=" << read_st;
+ return read_st;
+}
- auto end_read_file_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
-
std::chrono::steady_clock::now().time_since_epoch())
- .count();
- g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts -
begin_read_file_ts);
+// Trigger S3 -> local cache fill for the given file block.
+// Returns OK when the block is DOWNLOADED after the fill.
+// Returns TOO_MANY_TASKS when the fill slot is exhausted (server healthy but
overloaded):
+// client should not rotate or evict, just fall back to S3 and retry same
candidate later.
+// Returns NOT_FOUND for soft misses (tablet not found, fill incomplete,
timeout):
+// client should rotate the candidate to try a different CG next time.
+// The peer uses request.path as the full remote path. tablet_id/filename are
kept for logging.
+Status trigger_peer_server_fill(io::FileBlockSPtr& fb, int64_t fill_tablet_id,
+ const std::string& filename, const
std::string& resource_id,
+ const std::string& remote_path, int64_t
file_size, int64_t offset,
+ int64_t size, int32_t timeout_ms) {
+ g_peer_server_fill_requested << 1;
+
+ // Concurrency guard: atomically reserve a fill slot.
+ // Excess requests are rejected so the client falls back to its own S3
read.
+ // Return NOT_FOUND so the client rotates the candidate instead of
evicting it.
+ if (g_active_server_fills.fetch_add(1, std::memory_order_relaxed) >=
+ config::max_concurrent_peer_server_fills) {
+ g_active_server_fills.fetch_sub(1, std::memory_order_relaxed);
+ g_peer_server_fill_rejected << 1;
+ VLOG_DEBUG << "trigger_peer_server_fill: rejected (concurrency limit "
+ << config::max_concurrent_peer_server_fills << "),
tablet_id=" << fill_tablet_id;
+ // TOO_MANY_TASKS: server is healthy but overloaded. Client must not
rotate or evict;
+ // just fall back to S3 for this request and retry the same candidate
next time.
+ return Status::Error<ErrorCode::TOO_MANY_TASKS, false>("fill slot
exhausted");
+ }
+ // RAII decrement: runs on every return path below.
+ Defer fill_guard {[]() { g_active_server_fills.fetch_sub(1,
std::memory_order_relaxed); }};
- if (read_st.ok()) {
- output->set_block_offset(static_cast<int64_t>(file_block->offset()));
- output->set_block_size(static_cast<int64_t>(read_size));
- output->set_data(std::move(data));
+ if (remote_path.empty() || resource_id.empty()) {
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ LOG(WARNING) << "trigger_peer_server_fill: missing remote_path or
resource_id, " << ctx;
+ g_peer_server_fill_rejected << 1;
+ return Status::NotFound<false>("fill: missing remote_path or
resource_id, {}", ctx);
+ }
+ auto storage_resource = doris::get_storage_resource(resource_id);
+ if (!storage_resource.has_value()) {
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ LOG(WARNING) << "trigger_peer_server_fill: storage resource not found,
" << ctx;
+ g_peer_server_fill_rejected << 1;
+ return Status::NotFound<false>("fill: storage resource not found, {}",
ctx);
+ }
+ auto fs = storage_resource->first.fs;
+
+ const auto initial_state = fb->state();
+ if (initial_state == io::FileBlock::State::DOWNLOADING) {
+ // Another thread already owns the block downloader. Wait up to the
request timeout instead
+ // of the shorter per-wait timeout in FileBlock::wait().
+ [[maybe_unused]] const bool completed = wait_for_file_block_state(fb,
timeout_ms);
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ return fb->state() == io::FileBlock::State::DOWNLOADED
+ ? Status::OK()
+ : Status::NotFound<false>("fill: concurrent download
incomplete, {}", ctx);
+ }
+ if (initial_state != io::FileBlock::State::EMPTY) {
+ const std::string ctx =
+ format_peer_fill_context(fb, fill_tablet_id, filename,
resource_id, remote_path,
+ file_size, offset, size, timeout_ms);
+ return initial_state == io::FileBlock::State::DOWNLOADED
+ ? Status::OK()
+ : Status::NotFound<false>("fill: unexpected initial
block state, {}", ctx);
+ }
+
+ auto fill_start = std::chrono::steady_clock::now();
+ auto fill_done = std::make_shared<bthread::CountdownEvent>(1);
+ auto fill_status = std::make_shared<Status>(Status::OK());
+ io::DownloadFileMeta download_meta {
Review Comment:
This is a known limitation with packed files. We will address it in a
follow-up PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]