This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8c6809b4d3a [optimization](filecache) speed up filecache warm up
#51776 (#52556)
8c6809b4d3a is described below
commit 8c6809b4d3a280046d4f43f500a4e410f1a2a88b
Author: zhengyu <[email protected]>
AuthorDate: Tue Jul 1 12:07:00 2025 +0800
[optimization](filecache) speed up filecache warm up #51776 (#52556)
pick #51776 pick #51776 pick #51776
this pr does the following:
make file cache downloader worker pool thread num configurable make warm
up job split batch size configurable
split large file downloading task to smaller ones to maintain load
balance between threads, thus improve concurrency use meta info to
deduce size of inverted idx file size to reduce S3 HEAD ops some log
print optimization
in our test, this opt can improve more than 3x file cache warm up
performance
Signed-off-by: zhengyu <[email protected]>
---
be/src/cloud/cloud_backend_service.cpp | 4 +-
be/src/cloud/cloud_tablet.cpp | 12 +-
be/src/cloud/cloud_warm_up_manager.cpp | 138 +++++++++++++--------
be/src/cloud/cloud_warm_up_manager.h | 6 +-
be/src/common/config.cpp | 3 +
be/src/common/config.h | 3 +
be/src/io/cache/block_file_cache_downloader.cpp | 4 +-
be/src/io/cache/block_file_cache_downloader.h | 2 +-
.../main/java/org/apache/doris/common/Config.java | 3 +
.../apache/doris/cloud/CacheHotspotManager.java | 2 +-
10 files changed, 116 insertions(+), 61 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index 265e6c44aac..f94807282b3 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -105,7 +105,9 @@ void
CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_BATCH")
.tag("job_id", request.job_id)
.tag("batch_id", request.batch_id)
- .tag("jobs size", request.job_metas.size());
+ .tag("jobs size", request.job_metas.size())
+ .tag("tablet num of first meta",
+ request.job_metas.empty() ? 0 :
request.job_metas[0].tablet_ids.size());
bool retry = false;
st = manager.check_and_set_batch_id(request.job_id, request.batch_id,
&retry);
if (!retry && st) {
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 4344b085a83..78663de6ed8 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -279,7 +279,11 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr>
to_add, bool version_
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
- .download_done {},
+ .download_done {[](Status st) {
+ if (!st) {
+ LOG_WARNING("add rowset warm up error
").error(st);
+ }
+ }},
});
auto download_idx_file = [&](const io::Path& idx_path) {
@@ -292,7 +296,11 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr>
to_add, bool version_
.expiration_time =
expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
- .download_done {},
+ .download_done {[](Status st) {
+ if (!st) {
+ LOG_WARNING("add rowset warm up error
").error(st);
+ }
+ }},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index d8bce097465..510c677f06f 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -17,7 +17,8 @@
#include "cloud/cloud_warm_up_manager.h"
-#include <bthread/countdown_event.h>
+#include <bvar/bvar.h>
+#include <bvar/reducer.h>
#include <algorithm>
#include <cstddef>
@@ -34,6 +35,8 @@
namespace doris {
+bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up",
"failed_task_num");
+
CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) :
_engine(engine) {
_download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
}
@@ -59,6 +62,52 @@ std::unordered_map<std::string, RowsetMetaSharedPtr>
snapshot_rs_metas(BaseTable
return id_to_rowset_meta_map;
}
+void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t
file_size,
+ io::FileSystemSPtr file_system,
+ int64_t expiration_time,
+
std::shared_ptr<bthread::CountdownEvent> wait) {
+ if (file_size < 0) {
+ auto st = file_system->file_size(path, &file_size);
+ if (!st.ok()) [[unlikely]] {
+ LOG(WARNING) << "get file size failed: " << path;
+ file_cache_warm_up_failed_task_num << 1;
+ return;
+ }
+ }
+
+ const int64_t chunk_size = 10 * 1024 * 1024; // 10MB
+ int64_t offset = 0;
+ int64_t remaining_size = file_size;
+
+ while (remaining_size > 0) {
+ int64_t current_chunk_size = std::min(chunk_size, remaining_size);
+ wait->add_count();
+
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path = path,
+ .file_size = file_size,
+ .offset = offset,
+ .download_size = current_chunk_size,
+ .file_system = file_system,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done =
+ [wait](Status st) {
+ if (!st) {
+ LOG_WARNING("Warm up error ").error(st);
+ }
+ wait->signal();
+ },
+ });
+
+ offset += current_chunk_size;
+ remaining_size -= current_chunk_size;
+ }
+}
+
void CloudWarmUpManager::handle_jobs() {
#ifndef BE_TEST
constexpr int WAIT_TIME_SECONDS = 600;
@@ -77,6 +126,10 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up job is null");
continue;
}
+
+ std::shared_ptr<bthread::CountdownEvent> wait =
+ std::make_shared<bthread::CountdownEvent>(0);
+
for (int64_t tablet_id : cur_job->tablet_ids) {
if (_cur_job_id == 0) { // The job is canceled
break;
@@ -92,8 +145,7 @@ void CloudWarmUpManager::handle_jobs() {
LOG_WARNING("Warm up error ").tag("tablet_id",
tablet_id).error(st);
continue;
}
- std::shared_ptr<bthread::CountdownEvent> wait =
- std::make_shared<bthread::CountdownEvent>(0);
+
auto tablet_meta = tablet->tablet_meta();
auto rs_metas = snapshot_rs_metas(tablet.get());
for (auto& [_, rs] : rs_metas) {
@@ -112,71 +164,51 @@ void CloudWarmUpManager::handle_jobs() {
expiration_time = 0;
}
- wait->add_count();
- // clang-format off
-
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
- .path =
storage_resource.value()->remote_segment_path(*rs, seg_id),
- .file_size = rs->segment_file_size(seg_id),
- .file_system = storage_resource.value()->fs,
- .ctx =
- {
- .expiration_time = expiration_time,
- .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
- },
- .download_done =
- [wait](Status st) {
- if (!st) {
- LOG_WARNING("Warm up error
").error(st);
- }
- wait->signal();
- },
- });
-
- auto download_idx_file = [&](const io::Path& idx_path) {
- io::DownloadFileMeta meta {
- .path = idx_path,
- .file_size = -1,
- .file_system = storage_resource.value()->fs,
- .ctx =
- {
- .expiration_time =
expiration_time,
- .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
- },
- .download_done =
- [wait](Status st) {
- if (!st) {
- LOG_WARNING("Warm up error
").error(st);
- }
- wait->signal();
- },
- };
- // clang-format on
-
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
- };
+ // 1st. download segment files
+ submit_download_tasks(
+ storage_resource.value()->remote_segment_path(*rs,
seg_id),
+ rs->segment_file_size(seg_id),
storage_resource.value()->fs,
+ expiration_time, wait);
+
+ // 2nd. download inverted index files
+ int64_t file_size = -1;
auto schema_ptr = rs->tablet_schema();
auto idx_version =
schema_ptr->get_inverted_index_storage_format();
+ const auto& idx_file_info =
rs->inverted_index_file_info(seg_id);
if (idx_version == InvertedIndexStorageFormatPB::V1) {
for (const auto& index :
schema_ptr->inverted_indexes()) {
- wait->add_count();
auto idx_path =
storage_resource.value()->remote_idx_v1_path(
*rs, seg_id, index->index_id(),
index->get_index_suffix());
- download_idx_file(idx_path);
+ if (idx_file_info.index_info_size() > 0) {
+ for (const auto& idx_info :
idx_file_info.index_info()) {
+ if (index->index_id() ==
idx_info.index_id() &&
+ index->get_index_suffix() ==
idx_info.index_suffix()) {
+ file_size = idx_info.index_file_size();
+ break;
+ }
+ }
+ }
+ submit_download_tasks(idx_path, file_size,
storage_resource.value()->fs,
+ expiration_time, wait);
}
} else {
if (schema_ptr->has_inverted_index()) {
- wait->add_count();
auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
- download_idx_file(idx_path);
+ file_size = idx_file_info.has_index_size() ?
idx_file_info.index_size()
+ : -1;
+ submit_download_tasks(idx_path, file_size,
storage_resource.value()->fs,
+ expiration_time, wait);
}
}
}
}
- timespec time;
- time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
- if (!wait->timed_wait(time)) {
- LOG_WARNING("Warm up tablet {} take a long time",
tablet_meta->tablet_id());
- }
+ }
+
+ timespec time;
+ time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
+ if (wait->timed_wait(time)) {
+ LOG_WARNING("Warm up {} tablets take a long time",
cur_job->tablet_ids.size());
}
{
std::unique_lock lock(_mtx);
diff --git a/be/src/cloud/cloud_warm_up_manager.h
b/be/src/cloud/cloud_warm_up_manager.h
index 219dedc5806..356d7284f6f 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -17,6 +17,8 @@
#pragma once
+#include <bthread/countdown_event.h>
+
#include <condition_variable>
#include <deque>
#include <mutex>
@@ -69,7 +71,9 @@ public:
private:
void handle_jobs();
-
+ void submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
+ int64_t expiration_time,
+ std::shared_ptr<bthread::CountdownEvent> wait);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 3df7058c12d..2fcedc2fd23 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1128,6 +1128,9 @@ DEFINE_mInt64(file_cache_background_monitor_interval_ms,
"5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
+DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
+DEFINE_Int32(file_cache_downloader_thread_num_max, "32");
+
DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
// inverted index searcher cache size
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0fa93d0f8f8..5f5a8f14c2d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1152,6 +1152,7 @@ DECLARE_mBool(enbale_dump_error_file);
DECLARE_mInt64(file_cache_error_log_limit_bytes);
DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us);
DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
+
// Base compaction may retrieve and produce some less frequently accessed data,
// potentially affecting the file cache hit rate.
// This configuration determines whether to retain the output within the file
cache.
@@ -1165,6 +1166,8 @@
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_batch);
+DECLARE_Int32(file_cache_downloader_thread_num_min);
+DECLARE_Int32(file_cache_downloader_thread_num_max);
// inverted index searcher cache
// cache entry stay time after lookup
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index b9944e39989..05c18e0b945 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -43,8 +43,8 @@ namespace doris::io {
FileCacheBlockDownloader::FileCacheBlockDownloader(CloudStorageEngine& engine)
: _engine(engine) {
_poller = std::thread(&FileCacheBlockDownloader::polling_download_task,
this);
auto st = ThreadPoolBuilder("FileCacheBlockDownloader")
- .set_min_threads(4)
- .set_max_threads(16)
+
.set_min_threads(config::file_cache_downloader_thread_num_min)
+
.set_max_threads(config::file_cache_downloader_thread_num_max)
.build(&_workers);
CHECK(st.ok()) << "failed to create FileCacheBlockDownloader";
}
diff --git a/be/src/io/cache/block_file_cache_downloader.h
b/be/src/io/cache/block_file_cache_downloader.h
index 30827b69580..c9a46891673 100644
--- a/be/src/io/cache/block_file_cache_downloader.h
+++ b/be/src/io/cache/block_file_cache_downloader.h
@@ -92,7 +92,7 @@ private:
// tablet id -> inflight block num of tablet
std::unordered_map<int64_t, int64_t> _inflight_tablets;
- static inline constexpr size_t _max_size {10240};
+ static inline constexpr size_t _max_size {102400};
};
} // namespace doris::io
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index de2604ad011..b3d92cad6f7 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3210,6 +3210,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int cloud_warm_up_job_scheduler_interval_millisecond = 1000;
// 1 seconds
+ @ConfField(mutable = true, masterOnly = true)
+ public static long cloud_warm_up_job_max_bytes_per_batch = 21474836480L;
// 20GB
+
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_fetch_cluster_cache_hotspot = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index b73e467836d..4e073b21473 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -365,7 +365,7 @@ public class CacheHotspotManager extends MasterDaemon {
}
private Map<Long, List<List<Long>>> splitBatch(Map<Long, List<Tablet>>
beToWarmUpTablets) {
- final Long maxSizePerBatch = 10737418240L; // 10G
+ final Long maxSizePerBatch =
Config.cloud_warm_up_job_max_bytes_per_batch;
Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
for (Map.Entry<Long, List<Tablet>> entry :
beToWarmUpTablets.entrySet()) {
List<List<Long>> batches = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]