This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 067c41009f6 branch-4.0: [Enhancement](client) Supports dynamically
changing the rate limiter config #59465 (#60118)
067c41009f6 is described below
commit 067c41009f67dff0756ffad319a9cd161ce8312a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 30 16:43:13 2026 +0800
branch-4.0: [Enhancement](client) Supports dynamically changing the rate
limiter config #59465 (#60118)
Cherry-picked from #59465
Co-authored-by: Yixuan Wang <[email protected]>
---
be/src/util/s3_util.cpp | 59 +++++++++++++++++++++++++++++++
be/test/io/client/s3_file_system_test.cpp | 31 ++++++++++++++++
common/cpp/s3_rate_limiter.h | 12 +++++++
3 files changed, 102 insertions(+)
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index e1468ed1bb5..7dacf3afb4f 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -30,9 +30,11 @@
#include <aws/s3/S3Client.h>
#include <aws/sts/STSClient.h>
#include <bvar/reducer.h>
+#include <cpp/s3_rate_limiter.h>
#include <util/string_util.h>
#include <atomic>
+
#ifdef USE_AZURE
#include <azure/core/diagnostics/logger.hpp>
#include <azure/storage/blobs/blob_container_client.hpp>
@@ -131,11 +133,66 @@ bvar::Adder<int64_t>
get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_nu
bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
bvar::Adder<int64_t>
put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
+static std::atomic<int64_t> last_s3_get_token_bucket_tokens {0};
+static std::atomic<int64_t> last_s3_get_token_limit {0};
+static std::atomic<int64_t> last_s3_get_token_per_second {0};
+static std::atomic<int64_t> last_s3_put_token_per_second {0};
+static std::atomic<int64_t> last_s3_put_token_bucket_tokens {0};
+static std::atomic<int64_t> last_s3_put_token_limit {0};
+
+static std::atomic<bool> updating_get_limiter {false};
+static std::atomic<bool> updating_put_limiter {false};
+
S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) <<
to_string(type);
return _rate_limiters[static_cast<size_t>(type)].get();
}
+template <S3RateLimitType LimiterType>
+void update_rate_limiter_if_changed(int64_t current_tps, int64_t
current_bucket,
+ int64_t current_limit,
std::atomic<int64_t>& last_tps,
+ std::atomic<int64_t>& last_bucket,
+ std::atomic<int64_t>& last_limit,
+ std::atomic<bool>& updating_flag, const
char* limiter_name) {
+ if (last_tps.load(std::memory_order_relaxed) != current_tps ||
+ last_bucket.load(std::memory_order_relaxed) != current_bucket ||
+ last_limit.load(std::memory_order_relaxed) != current_limit) {
+ bool expected = false;
+ if (!updating_flag.compare_exchange_strong(expected, true,
std::memory_order_acq_rel)) {
+ return;
+ }
+ if (last_tps.load(std::memory_order_acquire) != current_tps ||
+ last_bucket.load(std::memory_order_acquire) != current_bucket ||
+ last_limit.load(std::memory_order_acquire) != current_limit) {
+ int ret =
+ reset_s3_rate_limiter(LimiterType, current_tps,
current_bucket, current_limit);
+
+ if (ret == 0) {
+ last_tps.store(current_tps, std::memory_order_release);
+ last_bucket.store(current_bucket, std::memory_order_release);
+ last_limit.store(current_limit, std::memory_order_release);
+ } else {
+ LOG(WARNING) << "Failed to reset S3 " << limiter_name
+ << " rate limiter, error code: " << ret;
+ }
+ }
+
+ updating_flag.store(false, std::memory_order_release);
+ }
+}
+
+void check_s3_rate_limiter_config_changed() {
+ update_rate_limiter_if_changed<S3RateLimitType::GET>(
+ config::s3_get_token_per_second, config::s3_get_bucket_tokens,
+ config::s3_get_token_limit, last_s3_get_token_per_second,
+ last_s3_get_token_bucket_tokens, last_s3_get_token_limit,
updating_get_limiter, "GET");
+
+ update_rate_limiter_if_changed<S3RateLimitType::PUT>(
+ config::s3_put_token_per_second, config::s3_put_bucket_tokens,
+ config::s3_put_token_limit, last_s3_put_token_per_second,
+ last_s3_put_token_bucket_tokens, last_s3_put_token_limit,
updating_put_limiter, "PUT");
+}
+
int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t
max_burst, size_t limit) {
if (type == S3RateLimitType::UNKNOWN) {
return -1;
@@ -204,6 +261,8 @@ std::shared_ptr<io::ObjStorageClient>
S3ClientFactory::create(const S3ClientConf
return nullptr;
}
+ check_s3_rate_limiter_config_changed();
+
#ifdef BE_TEST
{
std::lock_guard l(_lock);
diff --git a/be/test/io/client/s3_file_system_test.cpp
b/be/test/io/client/s3_file_system_test.cpp
index cc76c276759..d33240485af 100644
--- a/be/test/io/client/s3_file_system_test.cpp
+++ b/be/test/io/client/s3_file_system_test.cpp
@@ -2414,4 +2414,35 @@ TEST_F(S3FileSystemTest,
AzureRateLimiterDeleteDirectoryExceptionHandlingTest) {
ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " <<
status.to_string();
}
+TEST_F(S3FileSystemTest, DynamicUpdateRateLimiterConfig) {
+ // Save original config values
+ int64_t original_get_bucket_tokens = config::s3_get_bucket_tokens;
+ int64_t original_get_token_per_second = config::s3_get_token_per_second;
+ int64_t original_get_token_limit = config::s3_get_token_limit;
+
+ std::cout << "Original GET config: bucket_tokens=" <<
original_get_bucket_tokens
+ << ", token_per_second=" << original_get_token_per_second
+ << ", limit=" << original_get_token_limit << std::endl;
+
+ int64_t new_s3_get_bucket_tokens_val = 50;
+ int64_t new_s3_get_token_per_second_val = 1;
+
+ auto [success1, msg7] = config::set_config(
+ "s3_get_bucket_tokens",
std::to_string(new_s3_get_bucket_tokens_val), false, false);
+ ASSERT_EQ(success1, 0) << "Failed to set s3_get_bucket_tokens: " << msg7;
+ auto [success2, msg8] =
+ config::set_config("s3_get_token_per_second",
+
std::to_string(new_s3_get_token_per_second_val), false, false);
+ ASSERT_EQ(success2, 0) << "Failed to set s3_get_token_per_second: " <<
msg8;
+
+ auto st = create_client();
+ ASSERT_TRUE(st.ok());
+
+ // Verify restoration
+
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_burst(),
+ new_s3_get_bucket_tokens_val);
+
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_speed(),
+ new_s3_get_token_per_second_val);
+}
+
} // namespace doris
diff --git a/common/cpp/s3_rate_limiter.h b/common/cpp/s3_rate_limiter.h
index 07ae8006941..357b8f9fcc1 100644
--- a/common/cpp/s3_rate_limiter.h
+++ b/common/cpp/s3_rate_limiter.h
@@ -52,6 +52,12 @@ public:
// Returns duration of sleep in nanoseconds (to distinguish sleeping on
different kinds of S3RateLimiters for metrics)
int64_t add(size_t amount);
+ size_t get_max_speed() const { return _max_speed; }
+
+ size_t get_max_burst() const { return _max_burst; }
+
+ size_t get_limit() const { return _limit; }
+
private:
std::pair<size_t, double> _update_remain_token(long now, size_t amount);
size_t _count {0};
@@ -75,6 +81,12 @@ public:
int reset(size_t max_speed, size_t max_burst, size_t limit);
+ size_t get_max_speed() const { return rate_limiter->get_max_speed(); }
+
+ size_t get_max_burst() const { return rate_limiter->get_max_burst(); }
+
+ size_t get_limit() const { return rate_limiter->get_limit(); }
+
private:
std::shared_mutex rate_limiter_rw_lock;
std::unique_ptr<S3RateLimiter> rate_limiter;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]