dataroaring commented on code in PR #57946:
URL: https://github.com/apache/doris/pull/57946#discussion_r2519391728


##########
be/src/pipeline/exec/olap_scan_operator.cpp:
##########
@@ -597,10 +656,101 @@ Status OlapScanLocalState::prepare(RuntimeState* state) {
                 cost_secs, 
print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
                 _scan_ranges.size());
     }
+
+    // Prefetch segment footers in parallel to warm up file cache
+    if (config::prefetch_segment_footer && config::is_cloud_mode()) {
+        SCOPED_TIMER(_prefetch_segment_footer_timer);
+        RETURN_IF_ERROR(_prefetch_segment_footers());
+    }
+
     _prepared = true;
     return Status::OK();
 }
 
+Status OlapScanLocalState::_prefetch_segment_footers() {
+    std::vector<std::shared_ptr<std::promise<Status>>> proms;
+    auto* pool = 
ExecEnv::GetInstance()->scanner_scheduler()->get_remote_scan_thread_pool();
+
+    for (const auto& read_source : _read_sources) {
+        for (const auto& rs_split : read_source.rs_splits) {
+            auto rowset = rs_split.rs_reader->rowset();
+
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            if (!beta_rowset) {
+                continue;
+            }
+
+            int64_t num_segments = rowset->num_segments();
+            for (int64_t seg_id = 0; seg_id < num_segments; seg_id++) {
+                auto prom = std::make_shared<std::promise<Status>>();
+                proms.emplace_back(prom);
+
+                auto st = 
pool->submit_scan_task(vectorized::SimplifiedScanTask(
+                        [rowset, seg_id, p = std::move(prom)] {
+                            auto task_st = [&]() -> Status {
+                                auto fs = rowset->rowset_meta()->fs();
+                                if (!fs) {
+                                    return Status::InternalError<false>("get 
fs failed");
+                                }
+
+                                auto seg_path = rowset->segment_path(seg_id);
+                                if (!seg_path.has_value()) {
+                                    return seg_path.error();
+                                }
+
+                                int64_t file_size =
+                                        
rowset->rowset_meta()->segment_file_size(seg_id);
+                                if (file_size < 12) {
+                                    return Status::OK(); // Skip invalid 
segments
+                                }
+
+                                io::FileReaderOptions reader_options {
+                                        .cache_type =
+                                                config::enable_file_cache
+                                                        ? 
io::FileCachePolicy::FILE_BLOCK_CACHE
+                                                        : 
io::FileCachePolicy::NO_CACHE,
+                                        .is_doris_table = true,
+                                        .cache_base_path = "",
+                                        .file_size = file_size,
+                                };
+
+                                io::FileReaderSPtr file_reader;
+                                
RETURN_IF_ERROR(fs->open_file(seg_path.value(), &file_reader,
+                                                              
&reader_options));
+
+                                // due to file block alignment, this will 
prefetch segment footer into cache
+                                uint8_t fixed_buf[12];
+                                size_t bytes_read = 0;
+                                io::IOContext io_ctx {.is_index_data = true, 
.is_dryrun = true};
+                                RETURN_IF_ERROR(file_reader->read_at(file_size 
- 12,
+                                                                     
Slice(fixed_buf, 12),
+                                                                     
&bytes_read, &io_ctx));
+
+                                return Status::OK();
+                            }();
+                            Defer defer([p, &task_st] { p->set_value(task_st); 
});
+                        },
+                        nullptr));
+
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to submit prefetch task, err=" << 
st;
+                    return st;
+                }
+            }
+        }
+    }
+
+    // Wait for all prefetch tasks to complete
+    for (auto& p : proms) {
+        auto st = p->get_future().get();

Review Comment:
   Do we really need to wait here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to