This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9abe0c55d03 [optimization](filecache) speed up filecache warm up 
(#51776)
9abe0c55d03 is described below

commit 9abe0c55d03a794ebe3a07068fd625673e81fbb7
Author: zhengyu <[email protected]>
AuthorDate: Fri Jun 27 11:04:53 2025 +0800

    [optimization](filecache) speed up filecache warm up (#51776)
    
    this pr does the following:
    1. make file cache downloader worker pool thread num configurable
    2. make warm up job split batch size configurable
    3. split large file downloading task to smaller ones to maintain load
    balance between threads, thus improve concurrency
    4. use meta info to deduce size of inverted idx file size to reduce S3
    HEAD ops
    5. some log print optimization
    
    in our test, this opt can improve more than 3x file cache warm up
    performance
    
    Signed-off-by: zhengyu <[email protected]>
---
 be/src/cloud/cloud_backend_service.cpp             |   4 +-
 be/src/cloud/cloud_tablet.cpp                      |  12 +-
 be/src/cloud/cloud_warm_up_manager.cpp             | 138 +++++++++++++--------
 be/src/cloud/cloud_warm_up_manager.h               |   6 +-
 be/src/common/config.cpp                           |   3 +
 be/src/common/config.h                             |   3 +
 be/src/io/cache/block_file_cache_downloader.cpp    |   4 +-
 be/src/io/cache/block_file_cache_downloader.h      |   2 +-
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../apache/doris/cloud/CacheHotspotManager.java    |   2 +-
 10 files changed, 116 insertions(+), 61 deletions(-)

diff --git a/be/src/cloud/cloud_backend_service.cpp 
b/be/src/cloud/cloud_backend_service.cpp
index 63f76632d79..de5c77208c5 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -107,7 +107,9 @@ void 
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
                 .tag("request_type", "SET_BATCH")
                 .tag("job_id", request.job_id)
                 .tag("batch_id", request.batch_id)
-                .tag("jobs size", request.job_metas.size());
+                .tag("jobs size", request.job_metas.size())
+                .tag("tablet num of first meta",
+                     request.job_metas.empty() ? 0 : 
request.job_metas[0].tablet_ids.size());
         bool retry = false;
         st = manager.check_and_set_batch_id(request.job_id, request.batch_id, 
&retry);
         if (!retry && st) {
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 1c046eeac8f..673d721d84e 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -286,7 +286,11 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> 
to_add, bool version_
                                             .expiration_time = expiration_time,
                                             .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
                                     },
-                            .download_done {},
+                            .download_done {[](Status st) {
+                                if (!st) {
+                                    LOG_WARNING("add rowset warm up error 
").error(st);
+                                }
+                            }},
                     });
 
                     auto download_idx_file = [&](const io::Path& idx_path) {
@@ -299,7 +303,11 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> 
to_add, bool version_
                                                 .expiration_time = 
expiration_time,
                                                 .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
                                         },
-                                .download_done {},
+                                .download_done {[](Status st) {
+                                    if (!st) {
+                                        LOG_WARNING("add rowset warm up error 
").error(st);
+                                    }
+                                }},
                         };
                         
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
                     };
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index d8bce097465..510c677f06f 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -17,7 +17,8 @@
 
 #include "cloud/cloud_warm_up_manager.h"
 
-#include <bthread/countdown_event.h>
+#include <bvar/bvar.h>
+#include <bvar/reducer.h>
 
 #include <algorithm>
 #include <cstddef>
@@ -34,6 +35,8 @@
 
 namespace doris {
 
+bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", 
"failed_task_num");
+
 CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : 
_engine(engine) {
     _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
 }
@@ -59,6 +62,52 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> 
snapshot_rs_metas(BaseTable
     return id_to_rowset_meta_map;
 }
 
+void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t 
file_size,
+                                               io::FileSystemSPtr file_system,
+                                               int64_t expiration_time,
+                                               
std::shared_ptr<bthread::CountdownEvent> wait) {
+    if (file_size < 0) {
+        auto st = file_system->file_size(path, &file_size);
+        if (!st.ok()) [[unlikely]] {
+            LOG(WARNING) << "get file size failed: " << path;
+            file_cache_warm_up_failed_task_num << 1;
+            return;
+        }
+    }
+
+    const int64_t chunk_size = 10 * 1024 * 1024; // 10MB
+    int64_t offset = 0;
+    int64_t remaining_size = file_size;
+
+    while (remaining_size > 0) {
+        int64_t current_chunk_size = std::min(chunk_size, remaining_size);
+        wait->add_count();
+
+        
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                .path = path,
+                .file_size = file_size,
+                .offset = offset,
+                .download_size = current_chunk_size,
+                .file_system = file_system,
+                .ctx =
+                        {
+                                .expiration_time = expiration_time,
+                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                        },
+                .download_done =
+                        [wait](Status st) {
+                            if (!st) {
+                                LOG_WARNING("Warm up error ").error(st);
+                            }
+                            wait->signal();
+                        },
+        });
+
+        offset += current_chunk_size;
+        remaining_size -= current_chunk_size;
+    }
+}
+
 void CloudWarmUpManager::handle_jobs() {
 #ifndef BE_TEST
     constexpr int WAIT_TIME_SECONDS = 600;
@@ -77,6 +126,10 @@ void CloudWarmUpManager::handle_jobs() {
             LOG_WARNING("Warm up job is null");
             continue;
         }
+
+        std::shared_ptr<bthread::CountdownEvent> wait =
+                std::make_shared<bthread::CountdownEvent>(0);
+
         for (int64_t tablet_id : cur_job->tablet_ids) {
             if (_cur_job_id == 0) { // The job is canceled
                 break;
@@ -92,8 +145,7 @@ void CloudWarmUpManager::handle_jobs() {
                 LOG_WARNING("Warm up error ").tag("tablet_id", 
tablet_id).error(st);
                 continue;
             }
-            std::shared_ptr<bthread::CountdownEvent> wait =
-                    std::make_shared<bthread::CountdownEvent>(0);
+
             auto tablet_meta = tablet->tablet_meta();
             auto rs_metas = snapshot_rs_metas(tablet.get());
             for (auto& [_, rs] : rs_metas) {
@@ -112,71 +164,51 @@ void CloudWarmUpManager::handle_jobs() {
                         expiration_time = 0;
                     }
 
-                    wait->add_count();
-                    // clang-format off
-                    
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
-                            .path = 
storage_resource.value()->remote_segment_path(*rs, seg_id),
-                            .file_size = rs->segment_file_size(seg_id),
-                            .file_system = storage_resource.value()->fs,
-                            .ctx =
-                                    {
-                                            .expiration_time = expiration_time,
-                                            .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
-                                    },
-                            .download_done =
-                                    [wait](Status st) {
-                                        if (!st) {
-                                            LOG_WARNING("Warm up error 
").error(st);
-                                        }
-                                        wait->signal();
-                                    },
-                    });
-
-                    auto download_idx_file = [&](const io::Path& idx_path) {
-                        io::DownloadFileMeta meta {
-                                .path = idx_path,
-                                .file_size = -1,
-                                .file_system = storage_resource.value()->fs,
-                                .ctx =
-                                        {
-                                                .expiration_time = 
expiration_time,
-                                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
-                                        },
-                                .download_done =
-                                        [wait](Status st) {
-                                            if (!st) {
-                                                LOG_WARNING("Warm up error 
").error(st);
-                                            }
-                                            wait->signal();
-                                        },
-                        };
-                        // clang-format on
-                        
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
-                    };
+                    // 1st. download segment files
+                    submit_download_tasks(
+                            storage_resource.value()->remote_segment_path(*rs, 
seg_id),
+                            rs->segment_file_size(seg_id), 
storage_resource.value()->fs,
+                            expiration_time, wait);
+
+                    // 2nd. download inverted index files
+                    int64_t file_size = -1;
                     auto schema_ptr = rs->tablet_schema();
                     auto idx_version = 
schema_ptr->get_inverted_index_storage_format();
+                    const auto& idx_file_info = 
rs->inverted_index_file_info(seg_id);
                     if (idx_version == InvertedIndexStorageFormatPB::V1) {
                         for (const auto& index : 
schema_ptr->inverted_indexes()) {
-                            wait->add_count();
                             auto idx_path = 
storage_resource.value()->remote_idx_v1_path(
                                     *rs, seg_id, index->index_id(), 
index->get_index_suffix());
-                            download_idx_file(idx_path);
+                            if (idx_file_info.index_info_size() > 0) {
+                                for (const auto& idx_info : 
idx_file_info.index_info()) {
+                                    if (index->index_id() == 
idx_info.index_id() &&
+                                        index->get_index_suffix() == 
idx_info.index_suffix()) {
+                                        file_size = idx_info.index_file_size();
+                                        break;
+                                    }
+                                }
+                            }
+                            submit_download_tasks(idx_path, file_size, 
storage_resource.value()->fs,
+                                                  expiration_time, wait);
                         }
                     } else {
                         if (schema_ptr->has_inverted_index()) {
-                            wait->add_count();
                             auto idx_path =
                                     
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
-                            download_idx_file(idx_path);
+                            file_size = idx_file_info.has_index_size() ? 
idx_file_info.index_size()
+                                                                       : -1;
+                            submit_download_tasks(idx_path, file_size, 
storage_resource.value()->fs,
+                                                  expiration_time, wait);
                         }
                     }
                 }
             }
-            timespec time;
-            time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
-            if (!wait->timed_wait(time)) {
-                LOG_WARNING("Warm up tablet {} take a long time", 
tablet_meta->tablet_id());
-            }
+        }
+
+        timespec time;
+        time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
+        if (wait->timed_wait(time)) {
+            LOG_WARNING("Warm up {} tablets take a long time", 
cur_job->tablet_ids.size());
         }
         {
             std::unique_lock lock(_mtx);
diff --git a/be/src/cloud/cloud_warm_up_manager.h 
b/be/src/cloud/cloud_warm_up_manager.h
index 219dedc5806..356d7284f6f 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <bthread/countdown_event.h>
+
 #include <condition_variable>
 #include <deque>
 #include <mutex>
@@ -69,7 +71,9 @@ public:
 
 private:
     void handle_jobs();
-
+    void submit_download_tasks(io::Path path, int64_t file_size, 
io::FileSystemSPtr file_system,
+                               int64_t expiration_time,
+                               std::shared_ptr<bthread::CountdownEvent> wait);
     std::mutex _mtx;
     std::condition_variable _cond;
     int64_t _cur_job_id {0};
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dd3be9f1520..2badc8ca97f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1114,6 +1114,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms, 
"5000");
 DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
 DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
 
+DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
+DEFINE_Int32(file_cache_downloader_thread_num_max, "32");
+
 DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
 DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
 DEFINE_mBool(enable_write_index_searcher_cache, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 150e4e602fb..23e4a94c9ea 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1152,6 +1152,9 @@ 
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 DECLARE_mInt64(file_cache_background_monitor_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_batch);
+
+DECLARE_Int32(file_cache_downloader_thread_num_min);
+DECLARE_Int32(file_cache_downloader_thread_num_max);
 // inverted index searcher cache
 // cache entry stay time after lookup
 DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index b9944e39989..05c18e0b945 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -43,8 +43,8 @@ namespace doris::io {
 FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine) 
: _engine(engine) {
     _poller = std::thread(&FileCacheBlockDownloader::polling_download_task, 
this);
     auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
-                      .set_min_threads(4)
-                      .set_max_threads(16)
+                      
.set_min_threads(config::file_cache_downloader_thread_num_min)
+                      
.set_max_threads(config::file_cache_downloader_thread_num_max)
                       .build(&_workers);
     CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
 }
diff --git a/be/src/io/cache/block_file_cache_downloader.h 
b/be/src/io/cache/block_file_cache_downloader.h
index 30827b69580..c9a46891673 100644
--- a/be/src/io/cache/block_file_cache_downloader.h
+++ b/be/src/io/cache/block_file_cache_downloader.h
@@ -92,7 +92,7 @@ private:
     // tablet id -> inflight block num of tablet
     std::unordered_map<int64_t, int64_t> _inflight_tablets;
 
-    static inline constexpr size_t _max_size {10240};
+    static inline constexpr size_t _max_size {102400};
 };
 
 } // namespace doris::io
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c59e8adc3eb..39d25ea6cf9 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3338,6 +3338,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static int cloud_warm_up_job_scheduler_interval_millisecond = 1000; 
// 1 seconds
 
+    @ConfField(mutable = true, masterOnly = true)
+    public static long cloud_warm_up_job_max_bytes_per_batch = 21474836480L; 
// 20GB
+
     @ConfField(mutable = true, masterOnly = true)
     public static boolean enable_fetch_cluster_cache_hotspot = true;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index 30cc76b2a6b..48771d36240 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -365,7 +365,7 @@ public class CacheHotspotManager extends MasterDaemon {
     }
 
     private Map<Long, List<List<Long>>> splitBatch(Map<Long, List<Tablet>> 
beToWarmUpTablets) {
-        final Long maxSizePerBatch = 10737418240L; // 10G
+        final Long maxSizePerBatch = 
Config.cloud_warm_up_job_max_bytes_per_batch;
         Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
         for (Map.Entry<Long, List<Tablet>> entry : 
beToWarmUpTablets.entrySet()) {
             List<List<Long>> batches = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to