This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new bc65a9c9ee2 branch-3.1: [optimization](filecache) speed up filecache 
warm up #51776 (#52626)
bc65a9c9ee2 is described below

commit bc65a9c9ee24f7da2a15f91742493bf8d3ce109c
Author: zhengyu <[email protected]>
AuthorDate: Wed Jul 2 18:07:17 2025 +0800

    branch-3.1: [optimization](filecache) speed up filecache warm up #51776 
(#52626)
    
    Cherry-picked from #51776
    
    Signed-off-by: zhengyu <[email protected]>
---
 be/src/cloud/cloud_backend_service.cpp          |   4 +-
 be/src/cloud/cloud_warm_up_manager.cpp          | 159 +++++++++++++-----------
 be/src/cloud/cloud_warm_up_manager.h            |   5 +
 be/src/common/config.cpp                        |   3 +
 be/src/common/config.h                          |   3 +
 be/src/io/cache/block_file_cache_downloader.cpp |   4 +-
 be/src/io/cache/block_file_cache_downloader.h   |   2 +-
 7 files changed, 105 insertions(+), 75 deletions(-)

diff --git a/be/src/cloud/cloud_backend_service.cpp 
b/be/src/cloud/cloud_backend_service.cpp
index d260e256afc..a50d0e36419 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -115,7 +115,9 @@ void 
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
                 .tag("request_type", "SET_BATCH")
                 .tag("job_id", request.job_id)
                 .tag("batch_id", request.batch_id)
-                .tag("jobs size", request.job_metas.size());
+                .tag("jobs size", request.job_metas.size())
+                .tag("tablet num of first meta",
+                     request.job_metas.empty() ? 0 : 
request.job_metas[0].tablet_ids.size());
         bool retry = false;
         st = manager.check_and_set_batch_id(request.job_id, request.batch_id, 
&retry);
         if (!retry && st) {
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index 15c346e465d..7a304a872a4 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -17,7 +17,8 @@
 
 #include "cloud/cloud_warm_up_manager.h"
 
-#include <bthread/countdown_event.h>
+#include <bvar/bvar.h>
+#include <bvar/reducer.h>
 
 #include <algorithm>
 #include <cstddef>
@@ -69,6 +70,7 @@ bvar::Adder<uint64_t> 
g_file_cache_recycle_cache_requested_index_num(
         "file_cache_recycle_cache_requested_index_num");
 bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_call_unix_ts(
         "file_cache_warm_up_rowset_last_call_unix_ts", 0);
+bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", 
"failed_task_num");
 
 CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : 
_engine(engine) {
     _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
@@ -95,6 +97,52 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> 
snapshot_rs_metas(BaseTable
     return id_to_rowset_meta_map;
 }
 
+void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t 
file_size,
+                                               io::FileSystemSPtr file_system,
+                                               int64_t expiration_time,
+                                               
std::shared_ptr<bthread::CountdownEvent> wait) {
+    if (file_size < 0) {
+        auto st = file_system->file_size(path, &file_size);
+        if (!st.ok()) [[unlikely]] {
+            LOG(WARNING) << "get file size failed: " << path;
+            file_cache_warm_up_failed_task_num << 1;
+            return;
+        }
+    }
+
+    const int64_t chunk_size = 10 * 1024 * 1024; // 10MB
+    int64_t offset = 0;
+    int64_t remaining_size = file_size;
+
+    while (remaining_size > 0) {
+        int64_t current_chunk_size = std::min(chunk_size, remaining_size);
+        wait->add_count();
+
+        
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                .path = path,
+                .file_size = file_size,
+                .offset = offset,
+                .download_size = current_chunk_size,
+                .file_system = file_system,
+                .ctx =
+                        {
+                                .expiration_time = expiration_time,
+                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                        },
+                .download_done =
+                        [wait](Status st) {
+                            if (!st) {
+                                LOG_WARNING("Warm up error ").error(st);
+                            }
+                            wait->signal();
+                        },
+        });
+
+        offset += current_chunk_size;
+        remaining_size -= current_chunk_size;
+    }
+}
+
 void CloudWarmUpManager::handle_jobs() {
 #ifndef BE_TEST
     constexpr int WAIT_TIME_SECONDS = 600;
@@ -116,6 +164,10 @@ void CloudWarmUpManager::handle_jobs() {
             LOG_WARNING("Warm up job is null");
             continue;
         }
+
+        std::shared_ptr<bthread::CountdownEvent> wait =
+                std::make_shared<bthread::CountdownEvent>(0);
+
         for (int64_t tablet_id : cur_job->tablet_ids) {
             if (_cur_job_id == 0) { // The job is canceled
                 break;
@@ -131,8 +183,7 @@ void CloudWarmUpManager::handle_jobs() {
                 LOG_WARNING("Warm up error ").tag("tablet_id", 
tablet_id).error(st);
                 continue;
             }
-            std::shared_ptr<bthread::CountdownEvent> wait =
-                    std::make_shared<bthread::CountdownEvent>(0);
+
             auto tablet_meta = tablet->tablet_meta();
             auto rs_metas = snapshot_rs_metas(tablet.get());
             for (auto& [_, rs] : rs_metas) {
@@ -151,96 +202,62 @@ void CloudWarmUpManager::handle_jobs() {
                         expiration_time = 0;
                     }
 
-                    wait->add_count();
                     
g_file_cache_once_or_periodic_warm_up_submitted_segment_num << 1;
                     if (rs->segment_file_size(seg_id) > 0) {
                         
g_file_cache_once_or_periodic_warm_up_submitted_segment_size
                                 << rs->segment_file_size(seg_id);
                     }
-                    
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
-                            .path = 
storage_resource.value()->remote_segment_path(*rs, seg_id),
-                            .file_size = rs->segment_file_size(seg_id),
-                            .file_system = storage_resource.value()->fs,
-                            .ctx =
-                                    {
-                                            .expiration_time = expiration_time,
-                                            .is_dryrun = config::
-                                                    
enable_reader_dryrun_when_download_file_cache,
-                                    },
-                            .download_done =
-                                    [wait](Status st) {
-                                        if (!st) {
-                                            LOG_WARNING("Warm up error 
").error(st);
-                                        }
-                                        wait->signal();
-                                    },
-                    });
-
-                    auto download_idx_file = [&](const io::Path& idx_path, 
int64_t idx_size) {
-                        io::DownloadFileMeta meta {
-                                .path = idx_path,
-                                .file_size = idx_size,
-                                .file_system = storage_resource.value()->fs,
-                                .ctx =
-                                        {
-                                                .expiration_time = 
expiration_time,
-                                                .is_dryrun = config::
-                                                        
enable_reader_dryrun_when_download_file_cache,
-                                        },
-                                .download_done =
-                                        [wait](Status st) {
-                                            if (!st) {
-                                                LOG_WARNING("Warm up error 
").error(st);
-                                            }
-                                            wait->signal();
-                                        },
-                        };
-                        // clang-format on
-                        
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
-                        
g_file_cache_once_or_periodic_warm_up_submitted_index_size << idx_size;
-                        
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
-                    };
+                    // 1st. download segment files
+                    submit_download_tasks(
+                            storage_resource.value()->remote_segment_path(*rs, 
seg_id),
+                            rs->segment_file_size(seg_id), 
storage_resource.value()->fs,
+                            expiration_time, wait);
+
+                    // 2nd. download inverted index files
+                    int64_t file_size = -1;
                     auto schema_ptr = rs->tablet_schema();
                     auto idx_version = 
schema_ptr->get_inverted_index_storage_format();
+                    const auto& idx_file_info = 
rs->inverted_index_file_info(seg_id);
                     if (idx_version == InvertedIndexStorageFormatPB::V1) {
                         auto&& inverted_index_info = 
rs->inverted_index_file_info(seg_id);
                         std::unordered_map<int64_t, int64_t> index_size_map;
-                        for (const auto& info : 
inverted_index_info.index_info()) {
-                            if (info.index_file_size() != -1) {
-                                index_size_map[info.index_id()] = 
info.index_file_size();
-                            } else {
-                                VLOG_DEBUG << "Invalid index_file_size for 
segment_id " << seg_id
-                                           << ", index_id " << info.index_id();
-                            }
-                        }
                         for (const auto& index : 
schema_ptr->inverted_indexes()) {
-                            wait->add_count();
                             auto idx_path = 
storage_resource.value()->remote_idx_v1_path(
                                     *rs, seg_id, index->index_id(), 
index->get_index_suffix());
-                            download_idx_file(idx_path, 
index_size_map[index->index_id()]);
+                            if (idx_file_info.index_info_size() > 0) {
+                                for (const auto& idx_info : 
idx_file_info.index_info()) {
+                                    if (index->index_id() == 
idx_info.index_id() &&
+                                        index->get_index_suffix() == 
idx_info.index_suffix()) {
+                                        file_size = idx_info.index_file_size();
+                                        break;
+                                    }
+                                }
+                            }
+                            submit_download_tasks(idx_path, file_size, 
storage_resource.value()->fs,
+                                                  expiration_time, wait);
+                            
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
+                            
g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size;
                         }
                     } else {
                         if (schema_ptr->has_inverted_index()) {
-                            auto&& inverted_index_info = 
rs->inverted_index_file_info(seg_id);
-                            int64_t idx_size = 0;
-                            if (inverted_index_info.has_index_size()) {
-                                idx_size = inverted_index_info.index_size();
-                            } else {
-                                VLOG_DEBUG << "index_size is not set for 
segment " << seg_id;
-                            }
-                            wait->add_count();
                             auto idx_path =
                                     
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
-                            download_idx_file(idx_path, idx_size);
+                            file_size = idx_file_info.has_index_size() ? 
idx_file_info.index_size()
+                                                                       : -1;
+                            submit_download_tasks(idx_path, file_size, 
storage_resource.value()->fs,
+                                                  expiration_time, wait);
+                            
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
+                            
g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size;
                         }
                     }
                 }
             }
-            timespec time;
-            time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
-            if (!wait->timed_wait(time)) {
-                LOG_WARNING("Warm up {} tablets take a long time", 
cur_job->tablet_ids.size());
-            }
+        }
+
+        timespec time;
+        time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
+        if (wait->timed_wait(time)) {
+            LOG_WARNING("Warm up {} tablets take a long time", 
cur_job->tablet_ids.size());
         }
         {
             std::unique_lock lock(_mtx);
diff --git a/be/src/cloud/cloud_warm_up_manager.h 
b/be/src/cloud/cloud_warm_up_manager.h
index 55fbcc476da..85a460fda1e 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <bthread/countdown_event.h>
+
 #include <condition_variable>
 #include <deque>
 #include <mutex>
@@ -79,6 +81,9 @@ private:
     void handle_jobs();
     std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id);
 
+    void submit_download_tasks(io::Path path, int64_t file_size, 
io::FileSystemSPtr file_system,
+                               int64_t expiration_time,
+                               std::shared_ptr<bthread::CountdownEvent> wait);
     std::mutex _mtx;
     std::condition_variable _cond;
     int64_t _cur_job_id {0};
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ff3caf0ea67..b06decc84cb 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1131,6 +1131,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, 
"5000");
 DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
 DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
 
+DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
+DEFINE_Int32(file_cache_downloader_thread_num_max, "32");
+
 DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
 DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
 // inverted index searcher cache size
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c5d048c0295..6d85765c071 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1155,6 +1155,7 @@ DECLARE_mBool(enbale_dump_error_file);
 DECLARE_mInt64(file_cache_error_log_limit_bytes);
 DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us);
 DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
+
 // Base compaction may retrieve and produce some less frequently accessed data,
 // potentially affecting the file cache hit rate.
 // This configuration determines whether to retain the output within the file 
cache.
@@ -1168,6 +1169,8 @@ 
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 DECLARE_mInt64(file_cache_background_monitor_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_batch);
+DECLARE_Int32(file_cache_downloader_thread_num_min);
+DECLARE_Int32(file_cache_downloader_thread_num_max);
 
 DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index dc4c622e807..1732197e5b4 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -49,8 +49,8 @@ bvar::Adder<uint64_t> 
g_file_cache_download_failed_num("file_cache_download_fail
 FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) 
: _engine(engine) {
     _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, 
this);
     auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
-                      .set_min_threads(4)
-                      .set_max_threads(16)
+                      
.set_min_threads(config::file_cache_downloader_thread_num_min)
+                      
.set_max_threads(config::file_cache_downloader_thread_num_max)
                       .build(&_workers);
     CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
 }
diff --git a/be/src/io/cache/block_file_cache_downloader.h 
b/be/src/io/cache/block_file_cache_downloader.h
index 30827b69580..c9a46891673 100644
--- a/be/src/io/cache/block_file_cache_downloader.h
+++ b/be/src/io/cache/block_file_cache_downloader.h
@@ -92,7 +92,7 @@ private:
     // tablet id -> inflight block num of tablet
     std::unordered_map<int64_t, int64_t> _inflight_tablets;
 
-    static inline constexpr size_t _max_size {10240};
+    static inline constexpr size_t _max_size {102400};
 };
 
 } // namespace doris::io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to