This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8e003e69e00 [Opt](warmup) Limit the download rate of warmup task
(#60180)
8e003e69e00 is described below
commit 8e003e69e0060d1fc00e63f1f6e05f3e191248f6
Author: bobhan1 <[email protected]>
AuthorDate: Wed Feb 25 20:29:43 2026 +0800
[Opt](warmup) Limit the download rate of warmup task (#60180)
### What problem does this PR solve?
Summary
This PR adds rate limiting support for warmup download tasks in file
cache and introduces a generic DEFINE_ON_UPDATE macro for config update
callbacks.
Key changes:
- Add new config file_cache_warmup_download_rate_limit_bytes_per_second
(default 100MB/s, ≤0 means no limit)
- Introduce DEFINE_ON_UPDATE macro to register callbacks triggered when
config values change at runtime
- Implement S3RateLimiterHolder in ExecEnv for node-level warmup
download rate limiting
- Add bvar metrics for monitoring: warmup_download_rate_limit_latency,
warmup_download_rate_limit_ns, warmup_download_rate_limit_exceed_req_num
- Rate limit can be dynamically updated at runtime without restart
Problem & Solution
When file cache warmup tasks download large amounts of data from remote
storage (e.g., S3), they can consume excessive bandwidth and impact
normal query performance. This PR introduces a configurable rate limiter
specifically for warmup downloads to prevent bandwidth saturation.
The DEFINE_ON_UPDATE mechanism enables runtime config changes to take
effect immediately by triggering registered callbacks, allowing the rate
limiter to be adjusted dynamically without service restart.
Test plan
- Added unit tests for DEFINE_ON_UPDATE callback mechanism
(config_on_update_test.cpp)
- Tests cover Int64, String, and Bool config types with callback
invocation verification
---
be/src/cloud/config.cpp | 4 +
be/src/cloud/config.h | 4 +
be/src/common/config.cpp | 9 ++
be/src/common/config.h | 39 +++++++
be/src/io/cache/cached_remote_file_reader.cpp | 11 ++
be/src/runtime/exec_env.h | 3 +
be/src/runtime/exec_env_init.cpp | 46 ++++++++
be/test/common/config_on_update_test.cpp | 147 ++++++++++++++++++++++++++
8 files changed, 263 insertions(+)
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index f5f62df8cfb..8c148982a7b 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -161,6 +161,10 @@
DEFINE_mDouble(compaction_rw_separation_version_threshold_ratio, "0.8");
DEFINE_mBool(enable_cache_read_from_peer, "true");
+// Rate limit for warmup download in bytes per second, default 100MB/s
+// <= 0 means no limit
+DEFINE_mInt64(file_cache_warmup_download_rate_limit_bytes_per_second,
"104857600");
+
// Cache the expiration time of the peer address.
// This can be configured to be less than the
`rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
// If the value is -1, use the `rehash_tablet_after_be_dead_seconds` setting
in the `fe` configuration as the expiration time.
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 20c15c13d7f..6a88831ba7f 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -204,6 +204,10 @@
DECLARE_mDouble(compaction_rw_separation_version_threshold_ratio);
DECLARE_mBool(enable_cache_read_from_peer);
+// Rate limit for warmup download in bytes per second, default 100MB/s
+// <= 0 means no limit
+DECLARE_mInt64(file_cache_warmup_download_rate_limit_bytes_per_second);
+
DECLARE_mInt64(cache_read_from_peer_expired_seconds);
// Base compaction output: only write index files to file cache, not data files
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index d384a7672fe..d7ec359ca55 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1719,6 +1719,8 @@ DEFINE_String(test_s3_prefix, "prefix");
std::map<std::string, Register::Field>* Register::_s_field_map = nullptr;
std::map<std::string, std::function<bool()>>*
RegisterConfValidator::_s_field_validator = nullptr;
+std::map<std::string, RegisterConfUpdateCallback::CallbackFunc>*
+ RegisterConfUpdateCallback::_s_field_update_callback = nullptr;
std::map<std::string, std::string>* full_conf_map = nullptr;
std::mutex custom_conf_lock;
@@ -2077,6 +2079,13 @@ bool init(const char* conf_file, bool fill_conf_map,
bool must_exist, bool set_t
if (PERSIST) {
\
RETURN_IF_ERROR(persist_config(std::string((FIELD).name), VALUE));
\
}
\
+ if (RegisterConfUpdateCallback::_s_field_update_callback != nullptr) {
\
+ auto callback_it =
\
+
RegisterConfUpdateCallback::_s_field_update_callback->find((FIELD).name); \
+ if (callback_it !=
RegisterConfUpdateCallback::_s_field_update_callback->end()) { \
+ callback_it->second(&old_value, &new_value);
\
+ }
\
+ }
\
update_config(std::string((FIELD).name), VALUE);
\
return Status::OK();
\
}
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a0c1de11177..c47e8f1dcb6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -56,6 +56,25 @@
static RegisterConfValidator reg_validator_##FIELD_NAME( \
#FIELD_NAME, []() -> bool { return
validator_##FIELD_NAME(FIELD_NAME); });
+// DEFINE_ON_UPDATE macro is used to register a callback function that will be
called
+// when the config field is updated at runtime.
+// The callback function signature is: void callback(T old_value, T new_value)
+// where T is the type of the config field.
+// Example:
+// DEFINE_ON_UPDATE(my_config, [](int64_t old_val, int64_t new_val) {
+// LOG(INFO) << "my_config changed from " << old_val << " to " <<
new_val;
+// });
+#define DEFINE_ON_UPDATE_IMPL(FIELD_NAME, CALLBACK)
\
+ static auto on_update_callback_##FIELD_NAME = CALLBACK;
\
+ static RegisterConfUpdateCallback reg_update_callback_##FIELD_NAME(
\
+ #FIELD_NAME, [](const void* old_ptr, const void* new_ptr) {
\
+ on_update_callback_##FIELD_NAME(
\
+ *reinterpret_cast<const
decltype(FIELD_NAME)*>(old_ptr), \
+ *reinterpret_cast<const
decltype(FIELD_NAME)*>(new_ptr)); \
+ });
+
+#define DEFINE_ON_UPDATE(name, callback) DEFINE_ON_UPDATE_IMPL(name, callback)
+
#define DEFINE_Int16(name, defaultstr) DEFINE_FIELD(int16_t, name, defaultstr,
false)
#define DEFINE_Bools(name, defaultstr) DEFINE_FIELD(std::vector<bool>, name,
defaultstr, false)
#define DEFINE_Doubles(name, defaultstr) DEFINE_FIELD(std::vector<double>,
name, defaultstr, false)
@@ -1826,6 +1845,26 @@ public:
}
};
+// RegisterConfUpdateCallback class is used to store callback functions that
will be called
+// when a config field is updated at runtime.
+// The callback function takes two void pointers: old_value and new_value.
+// The actual type casting is done in the DEFINE_ON_UPDATE macro.
+class RegisterConfUpdateCallback {
+public:
+ using CallbackFunc = std::function<void(const void* old_ptr, const void*
new_ptr)>;
+ // Callback map for each config name.
+ static std::map<std::string, CallbackFunc>* _s_field_update_callback;
+
+public:
+ RegisterConfUpdateCallback(const char* fname, const CallbackFunc&
callback) {
+ if (_s_field_update_callback == nullptr) {
+ _s_field_update_callback = new std::map<std::string,
CallbackFunc>();
+ }
+ // register callback to _s_field_update_callback
+ _s_field_update_callback->insert(std::make_pair(std::string(fname),
callback));
+ }
+};
+
// configuration properties load from config file.
class Properties {
public:
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index bd7e8c83309..ce2cfbcf283 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -35,8 +35,10 @@
#include <vector>
#include "cloud/cloud_warm_up_manager.h"
+#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
+#include "cpp/s3_rate_limiter.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
@@ -435,6 +437,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
size_t size = empty_end - empty_start + 1;
std::unique_ptr<char[]> buffer(new char[size]);
+ // Apply rate limiting for warmup download tasks (node level)
+ // Rate limiting is applied before remote read to limit both S3 read
and local cache write
+ if (io_ctx->is_warmup) {
+ auto* rate_limiter =
ExecEnv::GetInstance()->warmup_download_rate_limiter();
+ if (rate_limiter != nullptr) {
+ rate_limiter->add(size);
+ }
+ }
+
// Determine read type and execute remote read
RETURN_IF_ERROR(
_execute_remote_read(empty_blocks, empty_start, size, buffer,
stats, io_ctx));
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f65aae6f62f..e4ff416e639 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -61,6 +61,7 @@ struct RuntimeFilterTimerQueue;
} // namespace pipeline
class WorkloadGroupMgr;
struct WriteCooldownMetaExecutors;
+class S3RateLimiterHolder;
namespace io {
class FileCacheFactory;
class HdfsMgr;
@@ -309,6 +310,7 @@ public:
io::HdfsMgr* hdfs_mgr() { return _hdfs_mgr; }
io::PackedFileManager* packed_file_manager() { return
_packed_file_manager; }
IndexPolicyMgr* index_policy_mgr() { return _index_policy_mgr; }
+ S3RateLimiterHolder* warmup_download_rate_limiter() { return
_warmup_download_rate_limiter; }
#ifdef BE_TEST
void set_tmp_file_dir(std::unique_ptr<segment_v2::TmpFileDirs>
tmp_file_dirs) {
@@ -569,6 +571,7 @@ private:
kerberos::KerberosTicketMgr* _kerberos_ticket_mgr = nullptr;
io::HdfsMgr* _hdfs_mgr = nullptr;
io::PackedFileManager* _packed_file_manager = nullptr;
+ S3RateLimiterHolder* _warmup_download_rate_limiter = nullptr;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 4857a9acf43..d8f5ddee5b5 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -43,6 +43,7 @@
#include "common/kerberos/kerberos_ticket_mgr.h"
#include "common/logging.h"
#include "common/status.h"
+#include "cpp/s3_rate_limiter.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/fs_file_cache_storage.h"
@@ -141,6 +142,12 @@ namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;
+// Warmup download rate limiter metrics
+bvar::LatencyRecorder
warmup_download_rate_limit_latency("warmup_download_rate_limit_latency");
+bvar::Adder<int64_t>
warmup_download_rate_limit_ns("warmup_download_rate_limit_ns");
+bvar::Adder<int64_t> warmup_download_rate_limit_exceed_req_num(
+ "warmup_download_rate_limit_exceed_req_num");
+
static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
bool init_system_metrics = config::enable_system_metrics;
std::set<std::string> disk_devices;
@@ -428,6 +435,24 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_index_policy_mgr = new IndexPolicyMgr();
+ // Initialize warmup download rate limiter for cloud mode
+ // Always create the rate limiter in cloud mode to support dynamic rate
limit changes
+ if (config::is_cloud_mode()) {
+ int64_t rate_limit =
config::file_cache_warmup_download_rate_limit_bytes_per_second;
+ // When rate_limit <= 0, pass 0 to disable rate limiting
+ int64_t rate = rate_limit > 0 ? rate_limit : 0;
+ // max_burst is the same as rate (1 second burst)
+ // limit is 0 which means no total limit
+ // When rate is 0, S3RateLimiter will not throttle (no rate limiting)
+ _warmup_download_rate_limiter = new S3RateLimiterHolder(rate, rate, 0,
[&](int64_t ns) {
+ if (ns > 0) {
+ warmup_download_rate_limit_latency << ns / 1000;
+ warmup_download_rate_limit_ns << ns;
+ warmup_download_rate_limit_exceed_req_num << 1;
+ }
+ });
+ }
+
RETURN_IF_ERROR(_spill_stream_mgr->init());
RETURN_IF_ERROR(_runtime_query_statistics_mgr->start_report_thread());
_dict_factory = new doris::vectorized::DictionaryFactory();
@@ -940,6 +965,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_heap_profiler);
SAFE_DELETE(_index_policy_mgr);
+ SAFE_DELETE(_warmup_download_rate_limiter);
_s_tracking_memory = false;
@@ -949,3 +975,23 @@ void ExecEnv::destroy() {
}
} // namespace doris
+
+namespace doris::config {
+// Callback to update warmup download rate limiter when config changes is
registered
+DEFINE_ON_UPDATE(file_cache_warmup_download_rate_limit_bytes_per_second,
+ [](int64_t old_val, int64_t new_val) {
+ auto* rate_limiter =
ExecEnv::GetInstance()->warmup_download_rate_limiter();
+ if (rate_limiter != nullptr && new_val != old_val) {
+ // Reset rate limiter with new rate limit value
+ // When new_val <= 0, pass 0 to disable rate limiting
+ int64_t rate = new_val > 0 ? new_val : 0;
+ rate_limiter->reset(rate, rate, 0);
+ if (rate > 0) {
+ LOG(INFO) << "Warmup download rate limiter
updated from " << old_val
+ << " to " << new_val << " bytes/s";
+ } else {
+ LOG(INFO) << "Warmup download rate limiter
disabled";
+ }
+ }
+ });
+} // namespace doris::config
diff --git a/be/test/common/config_on_update_test.cpp
b/be/test/common/config_on_update_test.cpp
new file mode 100644
index 00000000000..fcbb1a98d4f
--- /dev/null
+++ b/be/test/common/config_on_update_test.cpp
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "common/config.h"
+#include "common/status.h"
+
+namespace doris {
+using namespace config;
+
+// Test variables to track callback invocations
+static int64_t g_int64_old_val = 0;
+static int64_t g_int64_new_val = 0;
+static int g_int64_callback_count = 0;
+
+static std::string g_string_old_val;
+static std::string g_string_new_val;
+static int g_string_callback_count = 0;
+
+static bool g_bool_old_val = false;
+static bool g_bool_new_val = false;
+static int g_bool_callback_count = 0;
+
+// Define test configs with DEFINE_ON_UPDATE callbacks
+DEFINE_mInt64(cfg_on_update_int64, "100");
+DEFINE_ON_UPDATE(cfg_on_update_int64, [](int64_t old_val, int64_t new_val) {
+ g_int64_old_val = old_val;
+ g_int64_new_val = new_val;
+ g_int64_callback_count++;
+});
+
+DEFINE_mString(cfg_on_update_string, "default");
+DEFINE_ON_UPDATE(cfg_on_update_string, [](std::string old_val, std::string
new_val) {
+ g_string_old_val = old_val;
+ g_string_new_val = new_val;
+ g_string_callback_count++;
+});
+
+DEFINE_mBool(cfg_on_update_bool, "false");
+DEFINE_ON_UPDATE(cfg_on_update_bool, [](bool old_val, bool new_val) {
+ g_bool_old_val = old_val;
+ g_bool_new_val = new_val;
+ g_bool_callback_count++;
+});
+
+class ConfigOnUpdateTest : public testing::Test {
+protected:
+ void SetUp() override {
+ // Reset tracking variables before each test
+ g_int64_old_val = 0;
+ g_int64_new_val = 0;
+ g_int64_callback_count = 0;
+
+ g_string_old_val.clear();
+ g_string_new_val.clear();
+ g_string_callback_count = 0;
+
+ g_bool_old_val = false;
+ g_bool_new_val = false;
+ g_bool_callback_count = 0;
+ }
+};
+
+TEST_F(ConfigOnUpdateTest, Int64Callback) {
+ // Initial value should be 100
+ EXPECT_EQ(cfg_on_update_int64, 100);
+ EXPECT_EQ(g_int64_callback_count, 0);
+
+ // Update config to 200
+ Status s = config::set_config("cfg_on_update_int64", "200");
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(cfg_on_update_int64, 200);
+
+ // Verify callback was invoked with correct values
+ EXPECT_EQ(g_int64_callback_count, 1);
+ EXPECT_EQ(g_int64_old_val, 100);
+ EXPECT_EQ(g_int64_new_val, 200);
+
+ // Update again
+ s = config::set_config("cfg_on_update_int64", "300");
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(cfg_on_update_int64, 300);
+
+ EXPECT_EQ(g_int64_callback_count, 2);
+ EXPECT_EQ(g_int64_old_val, 200);
+ EXPECT_EQ(g_int64_new_val, 300);
+}
+
+TEST_F(ConfigOnUpdateTest, StringCallback) {
+ // Initial value should be "default"
+ EXPECT_EQ(cfg_on_update_string, "default");
+ EXPECT_EQ(g_string_callback_count, 0);
+
+ // Update config
+ Status s = config::set_config("cfg_on_update_string", "new_value");
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(cfg_on_update_string, "new_value");
+
+ // Verify callback was invoked with correct values
+ EXPECT_EQ(g_string_callback_count, 1);
+ EXPECT_EQ(g_string_old_val, "default");
+ EXPECT_EQ(g_string_new_val, "new_value");
+}
+
+TEST_F(ConfigOnUpdateTest, BoolCallback) {
+ // Initial value should be false
+ EXPECT_EQ(cfg_on_update_bool, false);
+ EXPECT_EQ(g_bool_callback_count, 0);
+
+ // Update config to true
+ Status s = config::set_config("cfg_on_update_bool", "true");
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(cfg_on_update_bool, true);
+
+ // Verify callback was invoked with correct values
+ EXPECT_EQ(g_bool_callback_count, 1);
+ EXPECT_EQ(g_bool_old_val, false);
+ EXPECT_EQ(g_bool_new_val, true);
+
+ // Update back to false
+ s = config::set_config("cfg_on_update_bool", "false");
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(cfg_on_update_bool, false);
+
+ EXPECT_EQ(g_bool_callback_count, 2);
+ EXPECT_EQ(g_bool_old_val, true);
+ EXPECT_EQ(g_bool_new_val, false);
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]