This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 067c41009f6 branch-4.0: [Enhancement](client) Supports dynamically 
changing the rate limiter config #59465 (#60118)
067c41009f6 is described below

commit 067c41009f67dff0756ffad319a9cd161ce8312a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 30 16:43:13 2026 +0800

    branch-4.0: [Enhancement](client) Supports dynamically changing the rate 
limiter config #59465 (#60118)
    
    Cherry-picked from #59465
    
    Co-authored-by: Yixuan Wang <[email protected]>
---
 be/src/util/s3_util.cpp                   | 59 +++++++++++++++++++++++++++++++
 be/test/io/client/s3_file_system_test.cpp | 31 ++++++++++++++++
 common/cpp/s3_rate_limiter.h              | 12 +++++++
 3 files changed, 102 insertions(+)

diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index e1468ed1bb5..7dacf3afb4f 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -30,9 +30,11 @@
 #include <aws/s3/S3Client.h>
 #include <aws/sts/STSClient.h>
 #include <bvar/reducer.h>
+#include <cpp/s3_rate_limiter.h>
 #include <util/string_util.h>
 
 #include <atomic>
+
 #ifdef USE_AZURE
 #include <azure/core/diagnostics/logger.hpp>
 #include <azure/storage/blobs/blob_container_client.hpp>
@@ -131,11 +133,66 @@ bvar::Adder<int64_t> 
get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_nu
 bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
 bvar::Adder<int64_t> 
put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
 
+static std::atomic<int64_t> last_s3_get_token_bucket_tokens {0};
+static std::atomic<int64_t> last_s3_get_token_limit {0};
+static std::atomic<int64_t> last_s3_get_token_per_second {0};
+static std::atomic<int64_t> last_s3_put_token_per_second {0};
+static std::atomic<int64_t> last_s3_put_token_bucket_tokens {0};
+static std::atomic<int64_t> last_s3_put_token_limit {0};
+
+static std::atomic<bool> updating_get_limiter {false};
+static std::atomic<bool> updating_put_limiter {false};
+
 S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
     CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << 
to_string(type);
     return _rate_limiters[static_cast<size_t>(type)].get();
 }
 
+template <S3RateLimitType LimiterType>
+void update_rate_limiter_if_changed(int64_t current_tps, int64_t 
current_bucket,
+                                    int64_t current_limit, 
std::atomic<int64_t>& last_tps,
+                                    std::atomic<int64_t>& last_bucket,
+                                    std::atomic<int64_t>& last_limit,
+                                    std::atomic<bool>& updating_flag, const 
char* limiter_name) {
+    if (last_tps.load(std::memory_order_relaxed) != current_tps ||
+        last_bucket.load(std::memory_order_relaxed) != current_bucket ||
+        last_limit.load(std::memory_order_relaxed) != current_limit) {
+        bool expected = false;
+        if (!updating_flag.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
+            return;
+        }
+        if (last_tps.load(std::memory_order_acquire) != current_tps ||
+            last_bucket.load(std::memory_order_acquire) != current_bucket ||
+            last_limit.load(std::memory_order_acquire) != current_limit) {
+            int ret =
+                    reset_s3_rate_limiter(LimiterType, current_tps, 
current_bucket, current_limit);
+
+            if (ret == 0) {
+                last_tps.store(current_tps, std::memory_order_release);
+                last_bucket.store(current_bucket, std::memory_order_release);
+                last_limit.store(current_limit, std::memory_order_release);
+            } else {
+                LOG(WARNING) << "Failed to reset S3 " << limiter_name
+                             << " rate limiter, error code: " << ret;
+            }
+        }
+
+        updating_flag.store(false, std::memory_order_release);
+    }
+}
+
+void check_s3_rate_limiter_config_changed() {
+    update_rate_limiter_if_changed<S3RateLimitType::GET>(
+            config::s3_get_token_per_second, config::s3_get_bucket_tokens,
+            config::s3_get_token_limit, last_s3_get_token_per_second,
+            last_s3_get_token_bucket_tokens, last_s3_get_token_limit, 
updating_get_limiter, "GET");
+
+    update_rate_limiter_if_changed<S3RateLimitType::PUT>(
+            config::s3_put_token_per_second, config::s3_put_bucket_tokens,
+            config::s3_put_token_limit, last_s3_put_token_per_second,
+            last_s3_put_token_bucket_tokens, last_s3_put_token_limit, 
updating_put_limiter, "PUT");
+}
+
 int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t 
max_burst, size_t limit) {
     if (type == S3RateLimitType::UNKNOWN) {
         return -1;
@@ -204,6 +261,8 @@ std::shared_ptr<io::ObjStorageClient> 
S3ClientFactory::create(const S3ClientConf
         return nullptr;
     }
 
+    check_s3_rate_limiter_config_changed();
+
 #ifdef BE_TEST
     {
         std::lock_guard l(_lock);
diff --git a/be/test/io/client/s3_file_system_test.cpp 
b/be/test/io/client/s3_file_system_test.cpp
index cc76c276759..d33240485af 100644
--- a/be/test/io/client/s3_file_system_test.cpp
+++ b/be/test/io/client/s3_file_system_test.cpp
@@ -2414,4 +2414,35 @@ TEST_F(S3FileSystemTest, 
AzureRateLimiterDeleteDirectoryExceptionHandlingTest) {
     ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " << 
status.to_string();
 }
 
+TEST_F(S3FileSystemTest, DynamicUpdateRateLimiterConfig) {
+    // Save original config values
+    int64_t original_get_bucket_tokens = config::s3_get_bucket_tokens;
+    int64_t original_get_token_per_second = config::s3_get_token_per_second;
+    int64_t original_get_token_limit = config::s3_get_token_limit;
+
+    std::cout << "Original GET config: bucket_tokens=" << 
original_get_bucket_tokens
+              << ", token_per_second=" << original_get_token_per_second
+              << ", limit=" << original_get_token_limit << std::endl;
+
+    int64_t new_s3_get_bucket_tokens_val = 50;
+    int64_t new_s3_get_token_per_second_val = 1;
+
+    auto [success1, msg7] = config::set_config(
+            "s3_get_bucket_tokens", 
std::to_string(new_s3_get_bucket_tokens_val), false, false);
+    ASSERT_EQ(success1, 0) << "Failed to set s3_get_bucket_tokens: " << msg7;
+    auto [success2, msg8] =
+            config::set_config("s3_get_token_per_second",
+                               
std::to_string(new_s3_get_token_per_second_val), false, false);
+    ASSERT_EQ(success2, 0) << "Failed to set s3_get_token_per_second: " << 
msg8;
+
+    auto st = create_client();
+    ASSERT_TRUE(st.ok());
+
+    // Verify restoration
+    
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_burst(),
+              new_s3_get_bucket_tokens_val);
+    
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_speed(),
+              new_s3_get_token_per_second_val);
+}
+
 } // namespace doris
diff --git a/common/cpp/s3_rate_limiter.h b/common/cpp/s3_rate_limiter.h
index 07ae8006941..357b8f9fcc1 100644
--- a/common/cpp/s3_rate_limiter.h
+++ b/common/cpp/s3_rate_limiter.h
@@ -52,6 +52,12 @@ public:
     // Returns duration of sleep in nanoseconds (to distinguish sleeping on 
different kinds of S3RateLimiters for metrics)
     int64_t add(size_t amount);
 
+    size_t get_max_speed() const { return _max_speed; }
+
+    size_t get_max_burst() const { return _max_burst; }
+
+    size_t get_limit() const { return _limit; }
+
 private:
     std::pair<size_t, double> _update_remain_token(long now, size_t amount);
     size_t _count {0};
@@ -75,6 +81,12 @@ public:
 
     int reset(size_t max_speed, size_t max_burst, size_t limit);
 
+    size_t get_max_speed() const { return rate_limiter->get_max_speed(); }
+
+    size_t get_max_burst() const { return rate_limiter->get_max_burst(); }
+
+    size_t get_limit() const { return rate_limiter->get_limit(); }
+
 private:
     std::shared_mutex rate_limiter_rw_lock;
     std::unique_ptr<S3RateLimiter> rate_limiter;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to