This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e003e69e00 [Opt](warmup) Limit the download rate of warmup task 
(#60180)
8e003e69e00 is described below

commit 8e003e69e0060d1fc00e63f1f6e05f3e191248f6
Author: bobhan1 <[email protected]>
AuthorDate: Wed Feb 25 20:29:43 2026 +0800

    [Opt](warmup) Limit the download rate of warmup task (#60180)
    
    ### What problem does this PR solve?
    
      Summary
    
    This PR adds rate limiting support for warmup download tasks in file
    cache and introduces a generic DEFINE_ON_UPDATE macro for config update
    callbacks.
    
      Key changes:
    - Add new config file_cache_warmup_download_rate_limit_bytes_per_second
    (default 100MB/s, ≤0 means no limit)
    - Introduce DEFINE_ON_UPDATE macro to register callbacks triggered when
    config values change at runtime
    - Implement S3RateLimiterHolder in ExecEnv for node-level warmup
    download rate limiting
    - Add bvar metrics for monitoring: warmup_download_rate_limit_latency,
    warmup_download_rate_limit_ns, warmup_download_rate_limit_exceed_req_num
      - Rate limit can be dynamically updated at runtime without restart
    
      Problem & Solution
    
    When file cache warmup tasks download large amounts of data from remote
    storage (e.g., S3), they can consume excessive bandwidth and impact
    normal query performance. This PR introduces a configurable rate limiter
    specifically for warmup downloads to prevent bandwidth saturation.
    
    The DEFINE_ON_UPDATE mechanism enables runtime config changes to take
    effect immediately by triggering registered callbacks, allowing the rate
    limiter to be adjusted dynamically without service restart.
    
      Test plan
    
    - Added unit tests for DEFINE_ON_UPDATE callback mechanism
    (config_on_update_test.cpp)
    - Tests cover Int64, String, and Bool config types with callback
    invocation verification
---
 be/src/cloud/config.cpp                       |   4 +
 be/src/cloud/config.h                         |   4 +
 be/src/common/config.cpp                      |   9 ++
 be/src/common/config.h                        |  39 +++++++
 be/src/io/cache/cached_remote_file_reader.cpp |  11 ++
 be/src/runtime/exec_env.h                     |   3 +
 be/src/runtime/exec_env_init.cpp              |  46 ++++++++
 be/test/common/config_on_update_test.cpp      | 147 ++++++++++++++++++++++++++
 8 files changed, 263 insertions(+)

diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index f5f62df8cfb..8c148982a7b 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -161,6 +161,10 @@ 
DEFINE_mDouble(compaction_rw_separation_version_threshold_ratio, "0.8");
 
 DEFINE_mBool(enable_cache_read_from_peer, "true");
 
+// Rate limit for warmup download in bytes per second, default 100MB/s
+// <= 0 means no limit
+DEFINE_mInt64(file_cache_warmup_download_rate_limit_bytes_per_second, 
"104857600");
+
 // Cache the expiration time of the peer address.
 // This can be configured to be less than the 
`rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
 // If the value is -1, use the `rehash_tablet_after_be_dead_seconds` setting 
in the `fe` configuration as the expiration time.
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 20c15c13d7f..6a88831ba7f 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -204,6 +204,10 @@ 
DECLARE_mDouble(compaction_rw_separation_version_threshold_ratio);
 
 DECLARE_mBool(enable_cache_read_from_peer);
 
+// Rate limit for warmup download in bytes per second, default 100MB/s
+// <= 0 means no limit
+DECLARE_mInt64(file_cache_warmup_download_rate_limit_bytes_per_second);
+
 DECLARE_mInt64(cache_read_from_peer_expired_seconds);
 
 // Base compaction output: only write index files to file cache, not data files
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index d384a7672fe..d7ec359ca55 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1719,6 +1719,8 @@ DEFINE_String(test_s3_prefix, "prefix");
 
 std::map<std::string, Register::Field>* Register::_s_field_map = nullptr;
 std::map<std::string, std::function<bool()>>* 
RegisterConfValidator::_s_field_validator = nullptr;
+std::map<std::string, RegisterConfUpdateCallback::CallbackFunc>*
+        RegisterConfUpdateCallback::_s_field_update_callback = nullptr;
 std::map<std::string, std::string>* full_conf_map = nullptr;
 
 std::mutex custom_conf_lock;
@@ -2077,6 +2079,13 @@ bool init(const char* conf_file, bool fill_conf_map, 
bool must_exist, bool set_t
         if (PERSIST) {                                                         
                    \
             RETURN_IF_ERROR(persist_config(std::string((FIELD).name), VALUE)); 
                    \
         }                                                                      
                    \
+        if (RegisterConfUpdateCallback::_s_field_update_callback != nullptr) { 
                    \
+            auto callback_it =                                                 
                    \
+                    
RegisterConfUpdateCallback::_s_field_update_callback->find((FIELD).name);      \
+            if (callback_it != 
RegisterConfUpdateCallback::_s_field_update_callback->end()) {      \
+                callback_it->second(&old_value, &new_value);                   
                    \
+            }                                                                  
                    \
+        }                                                                      
                    \
         update_config(std::string((FIELD).name), VALUE);                       
                    \
         return Status::OK();                                                   
                    \
     }
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a0c1de11177..c47e8f1dcb6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -56,6 +56,25 @@
     static RegisterConfValidator reg_validator_##FIELD_NAME( \
             #FIELD_NAME, []() -> bool { return 
validator_##FIELD_NAME(FIELD_NAME); });
 
+// DEFINE_ON_UPDATE macro is used to register a callback function that will be 
called
+// when the config field is updated at runtime.
+// The callback function signature is: void callback(T old_value, T new_value)
+// where T is the type of the config field.
+// Example:
+//   DEFINE_ON_UPDATE(my_config, [](int64_t old_val, int64_t new_val) {
+//       LOG(INFO) << "my_config changed from " << old_val << " to " << 
new_val;
+//   });
+#define DEFINE_ON_UPDATE_IMPL(FIELD_NAME, CALLBACK)                            
   \
+    static auto on_update_callback_##FIELD_NAME = CALLBACK;                    
   \
+    static RegisterConfUpdateCallback reg_update_callback_##FIELD_NAME(        
   \
+            #FIELD_NAME, [](const void* old_ptr, const void* new_ptr) {        
   \
+                on_update_callback_##FIELD_NAME(                               
   \
+                        *reinterpret_cast<const 
decltype(FIELD_NAME)*>(old_ptr),  \
+                        *reinterpret_cast<const 
decltype(FIELD_NAME)*>(new_ptr)); \
+            });
+
+#define DEFINE_ON_UPDATE(name, callback) DEFINE_ON_UPDATE_IMPL(name, callback)
+
 #define DEFINE_Int16(name, defaultstr) DEFINE_FIELD(int16_t, name, defaultstr, 
false)
 #define DEFINE_Bools(name, defaultstr) DEFINE_FIELD(std::vector<bool>, name, 
defaultstr, false)
 #define DEFINE_Doubles(name, defaultstr) DEFINE_FIELD(std::vector<double>, 
name, defaultstr, false)
@@ -1826,6 +1845,26 @@ public:
     }
 };
 
+// RegisterConfUpdateCallback class is used to store callback functions that 
will be called
+// when a config field is updated at runtime.
+// The callback function takes two void pointers: old_value and new_value.
+// The actual type casting is done in the DEFINE_ON_UPDATE macro.
+class RegisterConfUpdateCallback {
+public:
+    using CallbackFunc = std::function<void(const void* old_ptr, const void* 
new_ptr)>;
+    // Callback map for each config name.
+    static std::map<std::string, CallbackFunc>* _s_field_update_callback;
+
+public:
+    RegisterConfUpdateCallback(const char* fname, const CallbackFunc& 
callback) {
+        if (_s_field_update_callback == nullptr) {
+            _s_field_update_callback = new std::map<std::string, 
CallbackFunc>();
+        }
+        // register callback to _s_field_update_callback
+        _s_field_update_callback->insert(std::make_pair(std::string(fname), 
callback));
+    }
+};
+
 // configuration properties load from config file.
 class Properties {
 public:
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index bd7e8c83309..ce2cfbcf283 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -35,8 +35,10 @@
 #include <vector>
 
 #include "cloud/cloud_warm_up_manager.h"
+#include "cloud/config.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
+#include "cpp/s3_rate_limiter.h"
 #include "cpp/sync_point.h"
 #include "io/cache/block_file_cache.h"
 #include "io/cache/block_file_cache_factory.h"
@@ -435,6 +437,15 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
         size_t size = empty_end - empty_start + 1;
         std::unique_ptr<char[]> buffer(new char[size]);
 
+        // Apply rate limiting for warmup download tasks (node level)
+        // Rate limiting is applied before remote read to limit both S3 read 
and local cache write
+        if (io_ctx->is_warmup) {
+            auto* rate_limiter = 
ExecEnv::GetInstance()->warmup_download_rate_limiter();
+            if (rate_limiter != nullptr) {
+                rate_limiter->add(size);
+            }
+        }
+
         // Determine read type and execute remote read
         RETURN_IF_ERROR(
                 _execute_remote_read(empty_blocks, empty_start, size, buffer, 
stats, io_ctx));
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f65aae6f62f..e4ff416e639 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -61,6 +61,7 @@ struct RuntimeFilterTimerQueue;
 } // namespace pipeline
 class WorkloadGroupMgr;
 struct WriteCooldownMetaExecutors;
+class S3RateLimiterHolder;
 namespace io {
 class FileCacheFactory;
 class HdfsMgr;
@@ -309,6 +310,7 @@ public:
     io::HdfsMgr* hdfs_mgr() { return _hdfs_mgr; }
     io::PackedFileManager* packed_file_manager() { return 
_packed_file_manager; }
     IndexPolicyMgr* index_policy_mgr() { return _index_policy_mgr; }
+    S3RateLimiterHolder* warmup_download_rate_limiter() { return 
_warmup_download_rate_limiter; }
 
 #ifdef BE_TEST
     void set_tmp_file_dir(std::unique_ptr<segment_v2::TmpFileDirs> 
tmp_file_dirs) {
@@ -569,6 +571,7 @@ private:
     kerberos::KerberosTicketMgr* _kerberos_ticket_mgr = nullptr;
     io::HdfsMgr* _hdfs_mgr = nullptr;
     io::PackedFileManager* _packed_file_manager = nullptr;
+    S3RateLimiterHolder* _warmup_download_rate_limiter = nullptr;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 4857a9acf43..d8f5ddee5b5 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -43,6 +43,7 @@
 #include "common/kerberos/kerberos_ticket_mgr.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "cpp/s3_rate_limiter.h"
 #include "io/cache/block_file_cache_downloader.h"
 #include "io/cache/block_file_cache_factory.h"
 #include "io/cache/fs_file_cache_storage.h"
@@ -141,6 +142,12 @@ namespace doris {
 class PBackendService_Stub;
 class PFunctionService_Stub;
 
+// Warmup download rate limiter metrics
+bvar::LatencyRecorder 
warmup_download_rate_limit_latency("warmup_download_rate_limit_latency");
+bvar::Adder<int64_t> 
warmup_download_rate_limit_ns("warmup_download_rate_limit_ns");
+bvar::Adder<int64_t> warmup_download_rate_limit_exceed_req_num(
+        "warmup_download_rate_limit_exceed_req_num");
+
 static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
     bool init_system_metrics = config::enable_system_metrics;
     std::set<std::string> disk_devices;
@@ -428,6 +435,24 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
 
     _index_policy_mgr = new IndexPolicyMgr();
 
+    // Initialize warmup download rate limiter for cloud mode
+    // Always create the rate limiter in cloud mode to support dynamic rate 
limit changes
+    if (config::is_cloud_mode()) {
+        int64_t rate_limit = 
config::file_cache_warmup_download_rate_limit_bytes_per_second;
+        // When rate_limit <= 0, pass 0 to disable rate limiting
+        int64_t rate = rate_limit > 0 ? rate_limit : 0;
+        // max_burst is the same as rate (1 second burst)
+        // limit is 0 which means no total limit
+        // When rate is 0, S3RateLimiter will not throttle (no rate limiting)
+        _warmup_download_rate_limiter = new S3RateLimiterHolder(rate, rate, 0, 
[&](int64_t ns) {
+            if (ns > 0) {
+                warmup_download_rate_limit_latency << ns / 1000;
+                warmup_download_rate_limit_ns << ns;
+                warmup_download_rate_limit_exceed_req_num << 1;
+            }
+        });
+    }
+
     RETURN_IF_ERROR(_spill_stream_mgr->init());
     RETURN_IF_ERROR(_runtime_query_statistics_mgr->start_report_thread());
     _dict_factory = new doris::vectorized::DictionaryFactory();
@@ -940,6 +965,7 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_heap_profiler);
 
     SAFE_DELETE(_index_policy_mgr);
+    SAFE_DELETE(_warmup_download_rate_limiter);
 
     _s_tracking_memory = false;
 
@@ -949,3 +975,23 @@ void ExecEnv::destroy() {
 }
 
 } // namespace doris
+
+namespace doris::config {
+// Callback to update warmup download rate limiter when config changes is 
registered
+DEFINE_ON_UPDATE(file_cache_warmup_download_rate_limit_bytes_per_second,
+                 [](int64_t old_val, int64_t new_val) {
+                     auto* rate_limiter = 
ExecEnv::GetInstance()->warmup_download_rate_limiter();
+                     if (rate_limiter != nullptr && new_val != old_val) {
+                         // Reset rate limiter with new rate limit value
+                         // When new_val <= 0, pass 0 to disable rate limiting
+                         int64_t rate = new_val > 0 ? new_val : 0;
+                         rate_limiter->reset(rate, rate, 0);
+                         if (rate > 0) {
+                             LOG(INFO) << "Warmup download rate limiter 
updated from " << old_val
+                                       << " to " << new_val << " bytes/s";
+                         } else {
+                             LOG(INFO) << "Warmup download rate limiter 
disabled";
+                         }
+                     }
+                 });
+} // namespace doris::config
diff --git a/be/test/common/config_on_update_test.cpp 
b/be/test/common/config_on_update_test.cpp
new file mode 100644
index 00000000000..fcbb1a98d4f
--- /dev/null
+++ b/be/test/common/config_on_update_test.cpp
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "common/config.h"
+#include "common/status.h"
+
+namespace doris {
+using namespace config;
+
+// Test variables to track callback invocations
+static int64_t g_int64_old_val = 0;
+static int64_t g_int64_new_val = 0;
+static int g_int64_callback_count = 0;
+
+static std::string g_string_old_val;
+static std::string g_string_new_val;
+static int g_string_callback_count = 0;
+
+static bool g_bool_old_val = false;
+static bool g_bool_new_val = false;
+static int g_bool_callback_count = 0;
+
+// Define test configs with DEFINE_ON_UPDATE callbacks
+DEFINE_mInt64(cfg_on_update_int64, "100");
+DEFINE_ON_UPDATE(cfg_on_update_int64, [](int64_t old_val, int64_t new_val) {
+    g_int64_old_val = old_val;
+    g_int64_new_val = new_val;
+    g_int64_callback_count++;
+});
+
+DEFINE_mString(cfg_on_update_string, "default");
+DEFINE_ON_UPDATE(cfg_on_update_string, [](std::string old_val, std::string 
new_val) {
+    g_string_old_val = old_val;
+    g_string_new_val = new_val;
+    g_string_callback_count++;
+});
+
+DEFINE_mBool(cfg_on_update_bool, "false");
+DEFINE_ON_UPDATE(cfg_on_update_bool, [](bool old_val, bool new_val) {
+    g_bool_old_val = old_val;
+    g_bool_new_val = new_val;
+    g_bool_callback_count++;
+});
+
+class ConfigOnUpdateTest : public testing::Test {
+protected:
+    void SetUp() override {
+        // Reset tracking variables before each test
+        g_int64_old_val = 0;
+        g_int64_new_val = 0;
+        g_int64_callback_count = 0;
+
+        g_string_old_val.clear();
+        g_string_new_val.clear();
+        g_string_callback_count = 0;
+
+        g_bool_old_val = false;
+        g_bool_new_val = false;
+        g_bool_callback_count = 0;
+    }
+};
+
+TEST_F(ConfigOnUpdateTest, Int64Callback) {
+    // Initial value should be 100
+    EXPECT_EQ(cfg_on_update_int64, 100);
+    EXPECT_EQ(g_int64_callback_count, 0);
+
+    // Update config to 200
+    Status s = config::set_config("cfg_on_update_int64", "200");
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(cfg_on_update_int64, 200);
+
+    // Verify callback was invoked with correct values
+    EXPECT_EQ(g_int64_callback_count, 1);
+    EXPECT_EQ(g_int64_old_val, 100);
+    EXPECT_EQ(g_int64_new_val, 200);
+
+    // Update again
+    s = config::set_config("cfg_on_update_int64", "300");
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(cfg_on_update_int64, 300);
+
+    EXPECT_EQ(g_int64_callback_count, 2);
+    EXPECT_EQ(g_int64_old_val, 200);
+    EXPECT_EQ(g_int64_new_val, 300);
+}
+
+TEST_F(ConfigOnUpdateTest, StringCallback) {
+    // Initial value should be "default"
+    EXPECT_EQ(cfg_on_update_string, "default");
+    EXPECT_EQ(g_string_callback_count, 0);
+
+    // Update config
+    Status s = config::set_config("cfg_on_update_string", "new_value");
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(cfg_on_update_string, "new_value");
+
+    // Verify callback was invoked with correct values
+    EXPECT_EQ(g_string_callback_count, 1);
+    EXPECT_EQ(g_string_old_val, "default");
+    EXPECT_EQ(g_string_new_val, "new_value");
+}
+
+TEST_F(ConfigOnUpdateTest, BoolCallback) {
+    // Initial value should be false
+    EXPECT_EQ(cfg_on_update_bool, false);
+    EXPECT_EQ(g_bool_callback_count, 0);
+
+    // Update config to true
+    Status s = config::set_config("cfg_on_update_bool", "true");
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(cfg_on_update_bool, true);
+
+    // Verify callback was invoked with correct values
+    EXPECT_EQ(g_bool_callback_count, 1);
+    EXPECT_EQ(g_bool_old_val, false);
+    EXPECT_EQ(g_bool_new_val, true);
+
+    // Update back to false
+    s = config::set_config("cfg_on_update_bool", "false");
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(cfg_on_update_bool, false);
+
+    EXPECT_EQ(g_bool_callback_count, 2);
+    EXPECT_EQ(g_bool_old_val, true);
+    EXPECT_EQ(g_bool_new_val, false);
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to