This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new bc65a9c9ee2 branch-3.1: [optimization](filecache) speed up filecache
warm up #51776 (#52626)
bc65a9c9ee2 is described below
commit bc65a9c9ee24f7da2a15f91742493bf8d3ce109c
Author: zhengyu <[email protected]>
AuthorDate: Wed Jul 2 18:07:17 2025 +0800
branch-3.1: [optimization](filecache) speed up filecache warm up #51776
(#52626)
Cherry-picked from #51776
Signed-off-by: zhengyu <[email protected]>
---
be/src/cloud/cloud_backend_service.cpp | 4 +-
be/src/cloud/cloud_warm_up_manager.cpp | 159 +++++++++++++-----------
be/src/cloud/cloud_warm_up_manager.h | 5 +
be/src/common/config.cpp | 3 +
be/src/common/config.h | 3 +
be/src/io/cache/block_file_cache_downloader.cpp | 4 +-
be/src/io/cache/block_file_cache_downloader.h | 2 +-
7 files changed, 105 insertions(+), 75 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index d260e256afc..a50d0e36419 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -115,7 +115,9 @@ void
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_BATCH")
.tag("job_id", request.job_id)
.tag("batch_id", request.batch_id)
- .tag("jobs size", request.job_metas.size());
+ .tag("jobs size", request.job_metas.size())
+ .tag("tablet num of first meta",
+ request.job_metas.empty() ? 0 :
request.job_metas[0].tablet_ids.size());
bool retry = false;
st = manager.check_and_set_batch_id(request.job_id, request.batch_id,
&retry);
if (!retry && st) {
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index 15c346e465d..7a304a872a4 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -17,7 +17,8 @@
#include "cloud/cloud_warm_up_manager.h"
-#include <bthread/countdown_event.h>
+#include <bvar/bvar.h>
+#include <bvar/reducer.h>
#include <algorithm>
#include <cstddef>
@@ -69,6 +70,7 @@ bvar::Adder<uint64_t>
g_file_cache_recycle_cache_requested_index_num(
"file_cache_recycle_cache_requested_index_num");
bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_call_unix_ts(
"file_cache_warm_up_rowset_last_call_unix_ts", 0);
+bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up",
"failed_task_num");
CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) :
_engine(engine) {
_download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
@@ -95,6 +97,52 @@ std::unordered_map<std::string, RowsetMetaSharedPtr>
snapshot_rs_metas(BaseTable
return id_to_rowset_meta_map;
}
+void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t
file_size,
+ io::FileSystemSPtr file_system,
+ int64_t expiration_time,
+
std::shared_ptr<bthread::CountdownEvent> wait) {
+ if (file_size < 0) {
+ auto st = file_system->file_size(path, &file_size);
+ if (!st.ok()) [[unlikely]] {
+ LOG(WARNING) << "get file size failed: " << path;
+ file_cache_warm_up_failed_task_num << 1;
+ return;
+ }
+ }
+
+ const int64_t chunk_size = 10 * 1024 * 1024; // 10MB
+ int64_t offset = 0;
+ int64_t remaining_size = file_size;
+
+ while (remaining_size > 0) {
+ int64_t current_chunk_size = std::min(chunk_size, remaining_size);
+ wait->add_count();
+
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path = path,
+ .file_size = file_size,
+ .offset = offset,
+ .download_size = current_chunk_size,
+ .file_system = file_system,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done =
+ [wait](Status st) {
+ if (!st) {
+ LOG_WARNING("Warm up error ").error(st);
+ }
+ wait->signal();
+ },
+ });
+
+ offset += current_chunk_size;
+ remaining_size -= current_chunk_size;
+ }
+}
+
void CloudWarmUpManager::handle_jobs() {
#ifndef BE_TEST
constexpr int WAIT_TIME_SECONDS = 600;
@@ -116,6 +164,10 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up job is null");
continue;
}
+
+ std::shared_ptr<bthread::CountdownEvent> wait =
+ std::make_shared<bthread::CountdownEvent>(0);
+
for (int64_t tablet_id : cur_job->tablet_ids) {
if (_cur_job_id == 0) { // The job is canceled
break;
@@ -131,8 +183,7 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up error ").tag("tablet_id",
tablet_id).error(st);
continue;
}
- std::shared_ptr<bthread::CountdownEvent> wait =
- std::make_shared<bthread::CountdownEvent>(0);
+
auto tablet_meta = tablet->tablet_meta();
auto rs_metas = snapshot_rs_metas(tablet.get());
for (auto& [_, rs] : rs_metas) {
@@ -151,96 +202,62 @@ void CloudWarmUpManager::handle_jobs() {
expiration_time = 0;
}
- wait->add_count();
g_file_cache_once_or_periodic_warm_up_submitted_segment_num << 1;
if (rs->segment_file_size(seg_id) > 0) {
g_file_cache_once_or_periodic_warm_up_submitted_segment_size
<< rs->segment_file_size(seg_id);
}
-
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
- .path =
storage_resource.value()->remote_segment_path(*rs, seg_id),
- .file_size = rs->segment_file_size(seg_id),
- .file_system = storage_resource.value()->fs,
- .ctx =
- {
- .expiration_time = expiration_time,
- .is_dryrun = config::
-
enable_reader_dryrun_when_download_file_cache,
- },
- .download_done =
- [wait](Status st) {
- if (!st) {
- LOG_WARNING("Warm up error
").error(st);
- }
- wait->signal();
- },
- });
-
- auto download_idx_file = [&](const io::Path& idx_path,
int64_t idx_size) {
- io::DownloadFileMeta meta {
- .path = idx_path,
- .file_size = idx_size,
- .file_system = storage_resource.value()->fs,
- .ctx =
- {
- .expiration_time =
expiration_time,
- .is_dryrun = config::
-
enable_reader_dryrun_when_download_file_cache,
- },
- .download_done =
- [wait](Status st) {
- if (!st) {
- LOG_WARNING("Warm up error
").error(st);
- }
- wait->signal();
- },
- };
- // clang-format on
-
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
-
g_file_cache_once_or_periodic_warm_up_submitted_index_size << idx_size;
-
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
- };
+ // 1st. download segment files
+ submit_download_tasks(
+ storage_resource.value()->remote_segment_path(*rs,
seg_id),
+ rs->segment_file_size(seg_id),
storage_resource.value()->fs,
+ expiration_time, wait);
+
+ // 2nd. download inverted index files
+ int64_t file_size = -1;
auto schema_ptr = rs->tablet_schema();
auto idx_version =
schema_ptr->get_inverted_index_storage_format();
+ const auto& idx_file_info =
rs->inverted_index_file_info(seg_id);
if (idx_version == InvertedIndexStorageFormatPB::V1) {
auto&& inverted_index_info =
rs->inverted_index_file_info(seg_id);
std::unordered_map<int64_t, int64_t> index_size_map;
- for (const auto& info :
inverted_index_info.index_info()) {
- if (info.index_file_size() != -1) {
- index_size_map[info.index_id()] =
info.index_file_size();
- } else {
- VLOG_DEBUG << "Invalid index_file_size for
segment_id " << seg_id
- << ", index_id " << info.index_id();
- }
- }
for (const auto& index :
schema_ptr->inverted_indexes()) {
- wait->add_count();
auto idx_path =
storage_resource.value()->remote_idx_v1_path(
*rs, seg_id, index->index_id(),
index->get_index_suffix());
- download_idx_file(idx_path,
index_size_map[index->index_id()]);
+ if (idx_file_info.index_info_size() > 0) {
+ for (const auto& idx_info :
idx_file_info.index_info()) {
+ if (index->index_id() ==
idx_info.index_id() &&
+ index->get_index_suffix() ==
idx_info.index_suffix()) {
+ file_size = idx_info.index_file_size();
+ break;
+ }
+ }
+ }
+ submit_download_tasks(idx_path, file_size,
storage_resource.value()->fs,
+ expiration_time, wait);
+
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
+
g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size;
}
} else {
if (schema_ptr->has_inverted_index()) {
- auto&& inverted_index_info =
rs->inverted_index_file_info(seg_id);
- int64_t idx_size = 0;
- if (inverted_index_info.has_index_size()) {
- idx_size = inverted_index_info.index_size();
- } else {
- VLOG_DEBUG << "index_size is not set for
segment " << seg_id;
- }
- wait->add_count();
auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
- download_idx_file(idx_path, idx_size);
+ file_size = idx_file_info.has_index_size() ?
idx_file_info.index_size()
+ : -1;
+ submit_download_tasks(idx_path, file_size,
storage_resource.value()->fs,
+ expiration_time, wait);
+
g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
+
g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size;
}
}
}
}
- timespec time;
- time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
- if (!wait->timed_wait(time)) {
- LOG_WARNING("Warm up {} tablets take a long time",
cur_job->tablet_ids.size());
- }
+ }
+
+ timespec time;
+ time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
+ if (wait->timed_wait(time)) {
+ LOG_WARNING("Warm up {} tablets take a long time",
cur_job->tablet_ids.size());
}
{
std::unique_lock lock(_mtx);
diff --git a/be/src/cloud/cloud_warm_up_manager.h
b/be/src/cloud/cloud_warm_up_manager.h
index 55fbcc476da..85a460fda1e 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -17,6 +17,8 @@
#pragma once
+#include <bthread/countdown_event.h>
+
#include <condition_variable>
#include <deque>
#include <mutex>
@@ -79,6 +81,9 @@ private:
void handle_jobs();
std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id);
+ void submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
+ int64_t expiration_time,
+ std::shared_ptr<bthread::CountdownEvent> wait);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ff3caf0ea67..b06decc84cb 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1131,6 +1131,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms,
"5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
+DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
+DEFINE_Int32(file_cache_downloader_thread_num_max, "32");
+
DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
// inverted index searcher cache size
diff --git a/be/src/common/config.h b/be/src/common/config.h
index c5d048c0295..6d85765c071 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1155,6 +1155,7 @@ DECLARE_mBool(enbale_dump_error_file);
DECLARE_mInt64(file_cache_error_log_limit_bytes);
DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us);
DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
+
// Base compaction may retrieve and produce some less frequently accessed data,
// potentially affecting the file cache hit rate.
// This configuration determines whether to retain the output within the file
cache.
@@ -1168,6 +1169,8 @@
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_batch);
+DECLARE_Int32(file_cache_downloader_thread_num_min);
+DECLARE_Int32(file_cache_downloader_thread_num_max);
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index dc4c622e807..1732197e5b4 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -49,8 +49,8 @@ bvar::Adder<uint64_t>
g_file_cache_download_failed_num("file_cache_download_fail
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine)
: _engine(engine) {
_poller = std::thread(&FileCacheBlockDownloader::polling_download_task,
this);
auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
- .set_min_threads(4)
- .set_max_threads(16)
+
.set_min_threads(config::file_cache_downloader_thread_num_min)
+
.set_max_threads(config::file_cache_downloader_thread_num_max)
.build(&_workers);
CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
}
diff --git a/be/src/io/cache/block_file_cache_downloader.h
b/be/src/io/cache/block_file_cache_downloader.h
index 30827b69580..c9a46891673 100644
--- a/be/src/io/cache/block_file_cache_downloader.h
+++ b/be/src/io/cache/block_file_cache_downloader.h
@@ -92,7 +92,7 @@ private:
// tablet id -> inflight block num of tablet
std::unordered_map<int64_t, int64_t> _inflight_tablets;
- static inline constexpr size_t _max_size {10240};
+ static inline constexpr size_t _max_size {102400};
};
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]