This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c6b4367bcb5 [feature](Cloud) Introduce S3 rate limiter to recycler
(#34339)
c6b4367bcb5 is described below
commit c6b4367bcb5d586242a9c2e9e7aa2689f089243a
Author: AlexYue <[email protected]>
AuthorDate: Sat May 4 09:36:37 2024 +0800
[feature](Cloud) Introduce S3 rate limiter to recycler (#34339)
---
cloud/src/common/config.h | 9 ++
cloud/src/common/sync_point.h | 4 +-
cloud/src/rate-limiter/CMakeLists.txt | 1 +
cloud/src/rate-limiter/s3_rate_limiter.cpp | 114 ++++++++++++++++++++++++
cloud/src/rate-limiter/s3_rate_limiter.h | 94 ++++++++++++++++++++
cloud/src/recycler/recycler_service.cpp | 29 +++++++
cloud/src/recycler/s3_accessor.cpp | 135 ++++++++++++++++++++++-------
cloud/src/recycler/s3_accessor.h | 4 +
cloud/test/CMakeLists.txt | 4 +
cloud/test/s3_rate_limiter_test.cpp | 78 +++++++++++++++++
10 files changed, 438 insertions(+), 34 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 73b8580c1f6..805cc712c40 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -160,6 +160,15 @@ CONF_Int32(txn_store_retry_base_intervals_ms, "500");
// Whether to retry the txn conflict errors that returns by the underlying txn
store.
CONF_Bool(enable_retry_txn_conflict, "true");
+CONF_mBool(enable_s3_rate_limiter, "false");
+CONF_mInt64(s3_get_bucket_tokens, "1000000000000000000");
+CONF_mInt64(s3_get_token_per_second, "1000000000000000000");
+CONF_mInt64(s3_get_token_limit, "0");
+
+CONF_mInt64(s3_put_bucket_tokens, "1000000000000000000");
+CONF_mInt64(s3_put_token_per_second, "1000000000000000000");
+CONF_mInt64(s3_put_token_limit, "0");
+
// The secondary package name of the MetaService.
CONF_String(secondary_package_name, "");
diff --git a/cloud/src/common/sync_point.h b/cloud/src/common/sync_point.h
index b81ce31a5c1..30ecd3e0887 100644
--- a/cloud/src/common/sync_point.h
+++ b/cloud/src/common/sync_point.h
@@ -172,14 +172,14 @@ static_assert(ret_val_ptr != nullptr, "ret_val_ptr cannot
be nullptr");\
TEST_SYNC_POINT_CALLBACK(sync_point_name, ret_val_ptr); \
{ \
bool pred = false; \
- TEST_SYNC_POINT_CALLBACK(sync_point_name"::pred", &pred); \
+ TEST_SYNC_POINT_CALLBACK(std::string(sync_point_name) + "::pred", &pred); \
if (pred) return *ret_val_ptr; \
}
# define TEST_SYNC_POINT_RETURN_WITH_VOID(sync_point_name) \
{ \
bool pred = false; \
- TEST_SYNC_POINT_CALLBACK(sync_point_name"::pred", &pred); \
+ TEST_SYNC_POINT_CALLBACK(std::string(sync_point_name) + "::pred", &pred); \
if (pred) return; \
}
diff --git a/cloud/src/rate-limiter/CMakeLists.txt
b/cloud/src/rate-limiter/CMakeLists.txt
index 631c90d656a..6d4a5e0a54c 100644
--- a/cloud/src/rate-limiter/CMakeLists.txt
+++ b/cloud/src/rate-limiter/CMakeLists.txt
@@ -9,4 +9,5 @@ set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lfdb_c
-L${THIRDPARTY_DIR
add_library(RateLimiter
rate_limiter.cpp
+ s3_rate_limiter.cpp
)
diff --git a/cloud/src/rate-limiter/s3_rate_limiter.cpp
b/cloud/src/rate-limiter/s3_rate_limiter.cpp
new file mode 100644
index 00000000000..92c13e232bc
--- /dev/null
+++ b/cloud/src/rate-limiter/s3_rate_limiter.cpp
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_rate_limiter.h"
+
+#include <fmt/format.h>
+
+#include <chrono>
+#include <thread>
+
+namespace doris::cloud {
+// Just 10^6.
+static constexpr auto MS = 1000000UL;
+
+std::pair<size_t, double> S3RateLimiter::_update_remain_token(
+ std::chrono::system_clock::time_point now, size_t amount) {
+ // Values obtained under lock to be checked after release
+ size_t count_value;
+ double tokens_value;
+ {
+ std::lock_guard<SpinLock> lock(_mutex);
+ if (_max_speed) {
+ double delta_seconds = static_cast<double>((now -
_prev_ms).count()) / MS;
+ _remain_tokens = std::min<double>(_remain_tokens + _max_speed *
delta_seconds - amount,
+ _max_burst);
+ }
+ _count += amount;
+ count_value = _count;
+ tokens_value = _remain_tokens;
+ _prev_ms = now;
+ }
+ return {count_value, tokens_value};
+}
+
+int64_t S3RateLimiter::add(size_t amount) {
+ // Values obtained under lock to be checked after release
+ auto [count_value, tokens_value] =
+ _update_remain_token(std::chrono::high_resolution_clock::now(),
amount);
+
+ if (_limit && count_value > _limit) {
+ // CK would throw exception
+ return -1;
+ }
+
+ // Wait unless there is positive amount of remain_tokens - throttling
+ int64_t sleep_time_ms = 0;
+ if (_max_speed && tokens_value < 0) {
+ sleep_time_ms = static_cast<int64_t>(-tokens_value / _max_speed * MS);
+ std::this_thread::sleep_for(std::chrono::microseconds(sleep_time_ms));
+ }
+
+ return sleep_time_ms;
+}
+
+S3RateLimiterHolder::S3RateLimiterHolder(S3RateLimitType type, size_t
max_speed, size_t max_burst,
+ size_t limit)
+ : rate_limiter(std::make_unique<S3RateLimiter>(max_speed, max_burst,
limit)),
+
rate_limit_bvar(bvar::Adder<uint64_t>(fmt::format("{}_rate_limit_ms",
to_string(type)))) {
+}
+
+int64_t S3RateLimiterHolder::add(size_t amount) {
+ int64_t sleep;
+ {
+ std::shared_lock read {rate_limiter_rw_lock};
+ sleep = rate_limiter->add(amount);
+ }
+ if (sleep > 0) {
+ rate_limit_bvar << sleep;
+ }
+ return sleep;
+}
+
+int S3RateLimiterHolder::reset(size_t max_speed, size_t max_burst, size_t
limit) {
+ {
+ std::unique_lock write {rate_limiter_rw_lock};
+ rate_limiter = std::make_unique<S3RateLimiter>(max_speed, max_burst,
limit);
+ }
+ return 0;
+}
+
+std::string to_string(S3RateLimitType type) {
+ switch (type) {
+ case S3RateLimitType::GET:
+ return "get";
+ case S3RateLimitType::PUT:
+ return "put";
+ default:
+ return std::to_string(static_cast<size_t>(type));
+ }
+}
+
+S3RateLimitType string_to_s3_rate_limit_type(std::string_view value) {
+ if (value == "get") {
+ return S3RateLimitType::GET;
+ } else if (value == "put") {
+ return S3RateLimitType::PUT;
+ }
+ return S3RateLimitType::UNKNOWN;
+}
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/rate-limiter/s3_rate_limiter.h
b/cloud/src/rate-limiter/s3_rate_limiter.h
new file mode 100644
index 00000000000..9e30df0da1f
--- /dev/null
+++ b/cloud/src/rate-limiter/s3_rate_limiter.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bvar/bvar.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <shared_mutex>
+
+namespace doris::cloud {
+
+enum class S3RateLimitType : int {
+ GET = 0,
+ PUT,
+ UNKNOWN,
+};
+
+extern std::string to_string(S3RateLimitType type);
+extern S3RateLimitType string_to_s3_rate_limit_type(std::string_view value);
+
+class SpinLock {
+public:
+ SpinLock() = default;
+
+ void lock() {
+ while (_flag.test_and_set(std::memory_order_acq_rel)) {
+ // Spin until we acquire the lock
+ }
+ }
+
+ void unlock() { _flag.clear(std::memory_order_acq_rel); }
+
+private:
+ std::atomic_flag _flag = ATOMIC_FLAG_INIT;
+};
+
+class S3RateLimiter {
+public:
+ static constexpr size_t default_burst_seconds = 1;
+
+ S3RateLimiter(size_t max_speed, size_t max_burst, size_t limit)
+ : _max_speed(max_speed),
+ _max_burst(max_burst),
+ _limit(limit),
+ _remain_tokens(max_burst) {}
+
+ // Use `amount` remain_tokens, sleeps if required or throws exception on
limit overflow.
+ // Returns duration of sleep in nanoseconds (to distinguish sleeping on
different kinds of S3RateLimiters for metrics)
+ int64_t add(size_t amount);
+
+private:
+ std::pair<size_t, double>
_update_remain_token(std::chrono::system_clock::time_point now,
+ size_t amount);
+ size_t _count {0};
+ const size_t _max_speed {0}; // in tokens per second. which indicates the
QPS
+ const size_t _max_burst {0}; // in tokens. which indicates the token
bucket size
+ const uint64_t _limit {0}; // 0 - not limited.
+ SpinLock _mutex;
+ // Amount of remain_tokens available in token bucket. Updated in `add`
method.
+ double _remain_tokens {0};
+ std::chrono::system_clock::time_point _prev_ms; // Previous `add` call
time (in nanoseconds).
+};
+
+class S3RateLimiterHolder {
+public:
+ S3RateLimiterHolder(S3RateLimitType type, size_t max_speed, size_t
max_burst, size_t limit);
+
+ int64_t add(size_t amount);
+
+ int reset(size_t max_speed, size_t max_burst, size_t limit);
+
+private:
+ std::shared_mutex rate_limiter_rw_lock;
+ std::unique_ptr<S3RateLimiter> rate_limiter;
+ bvar::Adder<uint64_t> rate_limit_bvar;
+};
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/recycler_service.cpp
b/cloud/src/recycler/recycler_service.cpp
index 5b0b3e8078c..657797d1c81 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -28,6 +28,7 @@
#include "common/util.h"
#include "meta-service/keys.h"
#include "meta-service/txn_kv_error.h"
+#include "rate-limiter/s3_rate_limiter.h"
#include "recycler/checker.h"
#include "recycler/meta_checker.h"
#include "recycler/recycler.h"
@@ -399,6 +400,34 @@ void
RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
return;
}
+ if (unresolved_path == "adjust_rate_limiter") {
+ auto type_string = uri.GetQuery("type");
+ auto speed = uri.GetQuery("speed");
+ auto burst = uri.GetQuery("burst");
+ auto limit = uri.GetQuery("limit");
+ if (type_string->empty() || speed->empty() || burst->empty() ||
limit->empty() ||
+ (*type_string != "get" && *type_string != "put")) {
+ msg = "argument not suitable";
+ response_body = msg;
+ status_code = 400;
+ return;
+ }
+ auto max_speed = speed->empty() ? 0 : std::stoul(*speed);
+ auto max_burst = burst->empty() ? 0 : std::stoul(*burst);
+ auto max_limit = burst->empty() ? 0 : std::stoul(*limit);
+ if (0 !=
reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed,
+ max_burst, max_limit)) {
+ msg = "adjust failed";
+ response_body = msg;
+ status_code = 400;
+ return;
+ }
+
+ status_code = 200;
+ response_body = msg;
+ return;
+ }
+
status_code = 404;
msg = "not found";
response_body = msg;
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index d1ebfe62a1d..3f5d78b1c8f 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -34,11 +34,42 @@
#include <execution>
#include <utility>
+#include "common/config.h"
#include "common/logging.h"
#include "common/sync_point.h"
+#include "rate-limiter/s3_rate_limiter.h"
#include "recycler/obj_store_accessor.h"
namespace doris::cloud {
+
+struct AccessorRateLimiter {
+public:
+ ~AccessorRateLimiter() = default;
+ static AccessorRateLimiter& instance();
+ S3RateLimiterHolder* rate_limiter(S3RateLimitType type);
+
+private:
+ AccessorRateLimiter();
+ std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
+};
+
+[[maybe_unused]] static Aws::Client::AWSError<Aws::S3::S3Errors>
s3_error_factory() {
+ return {Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds
limit", false};
+}
+
+template <typename Func>
+auto do_s3_rate_limit(S3RateLimitType type, Func callback) ->
decltype(callback()) {
+ using T = decltype(callback());
+ if (!config::enable_s3_rate_limiter) {
+ return callback();
+ }
+ auto sleep_duration =
AccessorRateLimiter::instance().rate_limiter(type)->add(1);
+ if (sleep_duration < 0) {
+ return T(s3_error_factory());
+ }
+ return callback();
+}
+
#ifndef UNIT_TEST
#define HELP_MACRO(ret, req, point_name)
#else
@@ -50,15 +81,50 @@ namespace doris::cloud {
return p; \
}(); \
return ret; \
- } while (false)
+ } while (false);
#endif
-#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, req) \
- [&]() mutable { \
- [[maybe_unused]] decltype((expr)) t; \
- HELP_MACRO(t, req, point_name); \
- return (expr); \
+#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name, type) \
+ [&]() -> decltype(auto) { \
+ using T = decltype((expr)); \
+ [[maybe_unused]] T t; \
+ HELP_MACRO(t, request, point_name) \
+ return do_s3_rate_limit(type, [&]() { return (expr); }); \
}()
+AccessorRateLimiter::AccessorRateLimiter()
+ : _rate_limiters {std::make_unique<S3RateLimiterHolder>(
+ S3RateLimitType::GET,
config::s3_get_token_per_second,
+ config::s3_get_bucket_tokens,
config::s3_get_token_limit),
+ std::make_unique<S3RateLimiterHolder>(
+ S3RateLimitType::PUT,
config::s3_put_token_per_second,
+ config::s3_put_bucket_tokens,
config::s3_put_token_limit)} {}
+
+S3RateLimiterHolder* AccessorRateLimiter::rate_limiter(S3RateLimitType type) {
+ CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) <<
to_string(type);
+ return _rate_limiters[static_cast<size_t>(type)].get();
+}
+
+AccessorRateLimiter& AccessorRateLimiter::instance() {
+ static AccessorRateLimiter instance;
+ return instance;
+}
+
+int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t
max_burst, size_t limit) {
+ if (type == S3RateLimitType::UNKNOWN) {
+ return -1;
+ }
+ if (type == S3RateLimitType::GET) {
+ max_speed = (max_speed == 0) ? config::s3_get_token_per_second :
max_speed;
+ max_burst = (max_burst == 0) ? config::s3_get_bucket_tokens :
max_burst;
+ limit = (limit == 0) ? config::s3_get_token_limit : limit;
+ } else {
+ max_speed = (max_speed == 0) ? config::s3_put_token_per_second :
max_speed;
+ max_burst = (max_burst == 0) ? config::s3_put_bucket_tokens :
max_burst;
+ limit = (limit == 0) ? config::s3_put_token_limit : limit;
+ }
+ return
AccessorRateLimiter::instance().rate_limiter(type)->reset(max_speed, max_burst,
limit);
+}
+
class S3Environment {
public:
S3Environment() { Aws::InitAPI(aws_options_); }
@@ -107,8 +173,9 @@ int S3Accessor::delete_objects_by_prefix(const std::string&
relative_path) {
delete_request.SetBucket(conf_.bucket);
bool is_truncated = false;
do {
- auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
-
"s3_client::list_objects_v2", request);
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->ListObjectsV2(request), std::ref(request).get(),
+ "s3_client::list_objects_v2", S3RateLimitType::GET);
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
.tag("endpoint", conf_.endpoint)
@@ -136,9 +203,9 @@ int S3Accessor::delete_objects_by_prefix(const std::string&
relative_path) {
Aws::S3::Model::Delete del;
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
- auto delete_outcome =
-
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
- "s3_client::delete_objects",
delete_request);
+ auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->DeleteObjects(delete_request),
std::ref(delete_request).get(),
+ "s3_client::delete_objects", S3RateLimitType::PUT);
if (!delete_outcome.IsSuccess()) {
LOG_WARNING("failed to delete objects")
.tag("endpoint", conf_.endpoint)
@@ -198,9 +265,9 @@ int S3Accessor::delete_objects(const
std::vector<std::string>& relative_paths) {
}
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
- auto delete_outcome =
-
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
- "s3_client::delete_objects",
delete_request);
+ auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->DeleteObjects(delete_request),
std::ref(delete_request).get(),
+ "s3_client::delete_objects", S3RateLimitType::PUT);
if (!delete_outcome.IsSuccess()) {
LOG_WARNING("failed to delete objects")
.tag("endpoint", conf_.endpoint)
@@ -231,8 +298,9 @@ int S3Accessor::delete_object(const std::string&
relative_path) {
Aws::S3::Model::DeleteObjectRequest request;
auto key = get_key(relative_path);
request.WithBucket(conf_.bucket).WithKey(key);
- auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request),
- "s3_client::delete_object",
request);
+ auto outcome =
+ SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request),
std::ref(request).get(),
+ "s3_client::delete_object",
S3RateLimitType::PUT);
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to delete object")
.tag("endpoint", conf_.endpoint)
@@ -253,8 +321,9 @@ int S3Accessor::put_object(const std::string&
relative_path, const std::string&
auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
*input << content;
request.SetBody(input);
- auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request),
- "s3_client::put_object",
request);
+ auto outcome =
+ SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request),
std::ref(request).get(),
+ "s3_client::put_object",
S3RateLimitType::PUT);
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to put object")
.tag("endpoint", conf_.endpoint)
@@ -274,8 +343,9 @@ int S3Accessor::list(const std::string& relative_path,
std::vector<ObjectMeta>*
bool is_truncated = false;
do {
- auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
-
"s3_client::list_objects_v2", request);
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->ListObjectsV2(request), std::ref(request).get(),
+ "s3_client::list_objects_v2", S3RateLimitType::GET);
;
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
@@ -301,9 +371,9 @@ int S3Accessor::exist(const std::string& relative_path) {
Aws::S3::Model::HeadObjectRequest request;
auto key = get_key(relative_path);
request.WithBucket(conf_.bucket).WithKey(key);
- auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request),
- "s3_client::head_object",
request);
- ;
+ auto outcome =
+ SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request),
std::ref(request).get(),
+ "s3_client::head_object",
S3RateLimitType::GET);
if (outcome.IsSuccess()) {
return 0;
} else if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
@@ -326,9 +396,9 @@ int S3Accessor::delete_expired_objects(const std::string&
relative_path, int64_t
bool is_truncated = false;
do {
- auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
-
"s3_client::list_objects_v2", request);
- ;
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->ListObjectsV2(request), std::ref(request).get(),
+ "s3_client::list_objects_v2", S3RateLimitType::GET);
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
.tag("endpoint", conf_.endpoint)
@@ -379,9 +449,9 @@ int S3Accessor::get_bucket_lifecycle(int64_t*
expiration_days) {
Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
request.SetBucket(conf_.bucket);
- auto outcome =
-
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketLifecycleConfiguration(request),
-
"s3_client::get_bucket_lifecycle_configuration", request);
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->GetBucketLifecycleConfiguration(request),
std::ref(request).get(),
+ "s3_client::get_bucket_lifecycle_configuration",
S3RateLimitType::GET);
bool has_lifecycle = false;
if (outcome.IsSuccess()) {
const auto& rules = outcome.GetResult().GetRules();
@@ -414,8 +484,9 @@ int S3Accessor::get_bucket_lifecycle(int64_t*
expiration_days) {
int S3Accessor::check_bucket_versioning() {
Aws::S3::Model::GetBucketVersioningRequest request;
request.SetBucket(conf_.bucket);
- auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketVersioning(request),
-
"s3_client::get_bucket_versioning", request);
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->GetBucketVersioning(request), std::ref(request).get(),
+ "s3_client::get_bucket_versioning", S3RateLimitType::GET);
if (outcome.IsSuccess()) {
const auto& versioning_configuration = outcome.GetResult().GetStatus();
@@ -452,6 +523,6 @@ int GcsAccessor::delete_objects(const
std::vector<std::string>& relative_paths)
}
return ret;
}
-
+#undef SYNC_POINT_HOOK_RETURN_VALUE
#undef HELP_MACRO
} // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 1025ceab52e..1a44b0971cb 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -27,6 +27,10 @@ class S3Client;
namespace doris::cloud {
+enum class S3RateLimitType;
+extern int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed,
size_t max_burst,
+ size_t limit);
+
struct S3Conf {
std::string ak;
std::string sk;
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 331e85fab2e..94b84aa4ba0 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -31,6 +31,8 @@ add_executable(meta_server_test meta_server_test.cpp)
add_executable(rate_limiter_test rate_limiter_test.cpp)
+add_executable(s3_rate_limiter_test s3_rate_limiter_test.cpp)
+
add_executable(encryption_test encryption_test.cpp)
add_executable(metric_test metric_test.cpp)
@@ -64,6 +66,8 @@ target_link_libraries(meta_server_test ${TEST_LINK_LIBS})
target_link_libraries(rate_limiter_test ${TEST_LINK_LIBS})
+target_link_libraries(s3_rate_limiter_test ${TEST_LINK_LIBS})
+
target_link_libraries(encryption_test ${TEST_LINK_LIBS})
target_link_libraries(metric_test ${TEST_LINK_LIBS})
diff --git a/cloud/test/s3_rate_limiter_test.cpp
b/cloud/test/s3_rate_limiter_test.cpp
new file mode 100644
index 00000000000..fc20500f682
--- /dev/null
+++ b/cloud/test/s3_rate_limiter_test.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <rate-limiter/s3_rate_limiter.h>
+
+#include <atomic>
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include "common/configbase.h"
+#include "common/logging.h"
+
+using namespace doris::cloud;
+
+int main(int argc, char** argv) {
+ auto conf_file = "doris_cloud.conf";
+ if (!doris::cloud::config::init(conf_file, true)) {
+ std::cerr << "failed to init config file, conf=" << conf_file <<
std::endl;
+ return -1;
+ }
+ if (!doris::cloud::init_glog("util")) {
+ std::cerr << "failed to init glog" << std::endl;
+ return -1;
+ }
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+TEST(S3RateLimiterTest, normal) {
+ auto rate_limiter = S3RateLimiter(1, 5, 10);
+ std::atomic_int64_t failed;
+ std::atomic_int64_t succ;
+ std::atomic_int64_t sleep_thread_num;
+ std::atomic_int64_t sleep;
+ auto request_thread = [&]() {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ auto ms = rate_limiter.add(1);
+ if (ms < 0) {
+ failed++;
+ } else if (ms == 0) {
+ succ++;
+ } else {
+ sleep += ms;
+ sleep_thread_num++;
+ }
+ };
+ {
+ std::vector<std::thread> threads;
+ for (size_t i = 0; i < 20; i++) {
+ threads.emplace_back(request_thread);
+ }
+ for (auto& t : threads) {
+ if (t.joinable()) {
+ t.join();
+ }
+ }
+ }
+ ASSERT_EQ(failed, 10);
+ ASSERT_EQ(succ, 6);
+ ASSERT_EQ(sleep_thread_num, 4);
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]