github-actions[bot] commented on code in PR #63818:
URL: https://github.com/apache/doris/pull/63818#discussion_r3393164064


##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -237,38 +401,228 @@ Status read_file_block(const 
std::shared_ptr<io::FileBlock>& file_block, size_t
     }
     size_t read_size = std::min(static_cast<size_t>(file_size - 
file_block->offset()),
                                 file_block->range().size());
-    data.resize(read_size);
+    output->set_block_offset(static_cast<int64_t>(file_block->offset()));
+    output->set_block_size(static_cast<int64_t>(read_size));
+    if (read_size == 0) {
+        return Status::OK();
+    }
 
-    auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
-                                      
std::chrono::steady_clock::now().time_since_epoch())
-                                      .count();
+    Status read_st = Status::OK();
+    // Attachment payload mode: protobuf carries metadata only, payload goes 
to attachment.
+    // This allows FS cache to use a file-descriptor->IOBuf path directly.
+    if (response_attachment != nullptr) {
+        size_t bytes_read = 0;
+        auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                          
std::chrono::steady_clock::now().time_since_epoch())
+                                          .count();
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+        read_st = file_block->read_to_iobuf(response_attachment, 
/*read_offset=*/0, read_size,
+                                            &bytes_read);
+        auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                        
std::chrono::steady_clock::now().time_since_epoch())
+                                        .count();
+        g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts 
- begin_read_file_ts);
 
-    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
-    Slice slice(data.data(), data.size());
-    Status read_st = file_block->read(slice, /*read_offset=*/0);
+        if (read_st.ok()) {
+            if (bytes_read != read_size) {
+                return Status::InternalError<false>(
+                        "peer cache read short data, expected={}, actual={}", 
read_size,
+                        bytes_read);
+            }
+            g_file_cache_get_by_peer_response_bytes_total << bytes_read;
+            return Status::OK();
+        }
+    } else {
+        std::string data;
+        data.resize(read_size);
+        auto begin_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                          
std::chrono::steady_clock::now().time_since_epoch())
+                                          .count();
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+        Slice slice(data.data(), data.size());
+        read_st = file_block->read(slice, /*read_offset=*/0);
+        auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
+                                        
std::chrono::steady_clock::now().time_since_epoch())
+                                        .count();
+        g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts 
- begin_read_file_ts);
+
+        if (read_st.ok()) {
+            auto set_data_start = std::chrono::steady_clock::now();
+            output->set_data(std::move(data));
+            set_data_us = elapsed_us(set_data_start);
+            g_file_cache_get_by_peer_response_bytes_total << read_size;
+            return Status::OK();
+        }
+    }
+
+    g_file_cache_get_by_peer_failed_num << 1;
+    LOG(WARNING) << "read cache block failed, file_size=" << file_size
+                 << ", block=" << file_block->get_info_for_log()
+                 << ", cache_file=" << file_block->get_cache_file() << ", 
err=" << read_st;
+    return read_st;
+}
 
-    auto end_read_file_ts = 
std::chrono::duration_cast<std::chrono::microseconds>(
-                                    
std::chrono::steady_clock::now().time_since_epoch())
-                                    .count();
-    g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - 
begin_read_file_ts);
+// Trigger S3 -> local cache fill for the given file block.
+// Returns OK when the block is DOWNLOADED after the fill.
+// Returns TOO_MANY_TASKS when the fill slot is exhausted (server healthy but 
overloaded):
+//   client should not rotate or evict, just fall back to S3 and retry same 
candidate later.
+// Returns NOT_FOUND for soft misses (tablet not found, fill incomplete, 
timeout):
+//   client should rotate the candidate to try a different CG next time.
+// The peer uses request.path as the full remote path. tablet_id/filename are 
kept for logging.
+Status trigger_peer_server_fill(io::FileBlockSPtr& fb, int64_t fill_tablet_id,
+                                const std::string& filename, const 
std::string& resource_id,
+                                const std::string& remote_path, int64_t 
file_size, int64_t offset,
+                                int64_t size, int32_t timeout_ms) {
+    g_peer_server_fill_requested << 1;
+
+    // Concurrency guard: atomically reserve a fill slot.
+    // Excess requests are rejected so the client falls back to its own S3 
read.
+    // Return NOT_FOUND so the client rotates the candidate instead of 
evicting it.
+    if (g_active_server_fills.fetch_add(1, std::memory_order_relaxed) >=
+        config::max_concurrent_peer_server_fills) {
+        g_active_server_fills.fetch_sub(1, std::memory_order_relaxed);
+        g_peer_server_fill_rejected << 1;
+        VLOG_DEBUG << "trigger_peer_server_fill: rejected (concurrency limit "
+                   << config::max_concurrent_peer_server_fills << "), 
tablet_id=" << fill_tablet_id;
+        // TOO_MANY_TASKS: server is healthy but overloaded. Client must not 
rotate or evict;
+        // just fall back to S3 for this request and retry the same candidate 
next time.
+        return Status::Error<ErrorCode::TOO_MANY_TASKS, false>("fill slot 
exhausted");
+    }
+    // RAII decrement: runs on every return path below.
+    Defer fill_guard {[]() { g_active_server_fills.fetch_sub(1, 
std::memory_order_relaxed); }};
 
-    if (read_st.ok()) {
-        output->set_block_offset(static_cast<int64_t>(file_block->offset()));
-        output->set_block_size(static_cast<int64_t>(read_size));
-        output->set_data(std::move(data));
+    if (remote_path.empty() || resource_id.empty()) {
+        const std::string ctx =
+                format_peer_fill_context(fb, fill_tablet_id, filename, 
resource_id, remote_path,
+                                         file_size, offset, size, timeout_ms);
+        LOG(WARNING) << "trigger_peer_server_fill: missing remote_path or 
resource_id, " << ctx;
+        g_peer_server_fill_rejected << 1;
+        return Status::NotFound<false>("fill: missing remote_path or 
resource_id, {}", ctx);
+    }
+    auto storage_resource = doris::get_storage_resource(resource_id);
+    if (!storage_resource.has_value()) {
+        const std::string ctx =
+                format_peer_fill_context(fb, fill_tablet_id, filename, 
resource_id, remote_path,
+                                         file_size, offset, size, timeout_ms);
+        LOG(WARNING) << "trigger_peer_server_fill: storage resource not found, 
" << ctx;
+        g_peer_server_fill_rejected << 1;
+        return Status::NotFound<false>("fill: storage resource not found, {}", 
ctx);
+    }
+    auto fs = storage_resource->first.fs;
+
+    const auto initial_state = fb->state();
+    if (initial_state == io::FileBlock::State::DOWNLOADING) {
+        // Another thread already owns the block downloader. Wait up to the 
request timeout instead
+        // of the shorter per-wait timeout in FileBlock::wait().
+        [[maybe_unused]] const bool completed = wait_for_file_block_state(fb, 
timeout_ms);
+        const std::string ctx =
+                format_peer_fill_context(fb, fill_tablet_id, filename, 
resource_id, remote_path,
+                                         file_size, offset, size, timeout_ms);
+        return fb->state() == io::FileBlock::State::DOWNLOADED
+                       ? Status::OK()
+                       : Status::NotFound<false>("fill: concurrent download 
incomplete, {}", ctx);
+    }
+    if (initial_state != io::FileBlock::State::EMPTY) {
+        const std::string ctx =
+                format_peer_fill_context(fb, fill_tablet_id, filename, 
resource_id, remote_path,
+                                         file_size, offset, size, timeout_ms);
+        return initial_state == io::FileBlock::State::DOWNLOADED
+                       ? Status::OK()
+                       : Status::NotFound<false>("fill: unexpected initial 
block state, {}", ctx);
+    }
+
+    auto fill_start = std::chrono::steady_clock::now();
+    auto fill_done = std::make_shared<bthread::CountdownEvent>(1);
+    auto fill_status = std::make_shared<Status>(Status::OK());
+    io::DownloadFileMeta download_meta {

Review Comment:
   This server-side pull-through fill path still bypasses packed-file metadata. 
This is separate from the already raised segment/index remote-path concern: 
even when `request.path` is the correct logical segment or index path, 
`storage_resource->first.fs` is the plain object-store filesystem and cannot 
map logical files into packed objects. Existing warm-up paths explicitly use 
`RowsetMeta::fs()` because it wraps the physical FS with `PackedFileSystem` 
from `packed_slice_locations`; the new peer-fill request only sends 
`tablet_id/resource_id` plus a dummy rowset id, so this code cannot reconstruct 
that wrapper. The new server-fill regression suites also set 
`enable_packed_file=false`, while cloud defaults it to true. In normal 
packed-file deployments a cold peer will fail this fill and fall back to the 
requester's S3 read instead of filling/serving from the designated CG. Please 
carry enough rowset metadata to reconstruct `RowsetMeta::fs()` or route the 
fill through the existing tabl
 et/rowset lookup path before opening the remote file.



##########
be/src/io/cache/peer_file_cache_reader.cpp:
##########
@@ -125,7 +234,9 @@ Status PeerFileCacheReader::fetch_blocks(const 
std::vector<FileBlockSPtr>& block
     }};
 

Review Comment:
   The fill RPC timeout is hard-coded to 7000ms, but the server waits according 
to the mutable `peer_server_cache_fill_timeout_ms` config. If an operator 
raises that server-side fill timeout above roughly 7s to tolerate slow object 
storage, the client will always time out first and treat the peer as an RPC 
failure even though the server is still within its configured deadline. Please 
derive the client timeout from `config::peer_server_cache_fill_timeout_ms` plus 
a small margin when `request_fill` is true, instead of baking in the 
default-only value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to