This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c6b4367bcb5 [feature](Cloud) Introduce S3 rate limiter to recycler 
(#34339)
c6b4367bcb5 is described below

commit c6b4367bcb5d586242a9c2e9e7aa2689f089243a
Author: AlexYue <[email protected]>
AuthorDate: Sat May 4 09:36:37 2024 +0800

    [feature](Cloud) Introduce S3 rate limiter to recycler (#34339)
---
 cloud/src/common/config.h                  |   9 ++
 cloud/src/common/sync_point.h              |   4 +-
 cloud/src/rate-limiter/CMakeLists.txt      |   1 +
 cloud/src/rate-limiter/s3_rate_limiter.cpp | 114 ++++++++++++++++++++++++
 cloud/src/rate-limiter/s3_rate_limiter.h   |  94 ++++++++++++++++++++
 cloud/src/recycler/recycler_service.cpp    |  29 +++++++
 cloud/src/recycler/s3_accessor.cpp         | 135 ++++++++++++++++++++++-------
 cloud/src/recycler/s3_accessor.h           |   4 +
 cloud/test/CMakeLists.txt                  |   4 +
 cloud/test/s3_rate_limiter_test.cpp        |  78 +++++++++++++++++
 10 files changed, 438 insertions(+), 34 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 73b8580c1f6..805cc712c40 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -160,6 +160,15 @@ CONF_Int32(txn_store_retry_base_intervals_ms, "500");
 // Whether to retry the txn conflict errors that returns by the underlying txn 
store.
 CONF_Bool(enable_retry_txn_conflict, "true");
 
+CONF_mBool(enable_s3_rate_limiter, "false");
+CONF_mInt64(s3_get_bucket_tokens, "1000000000000000000");
+CONF_mInt64(s3_get_token_per_second, "1000000000000000000");
+CONF_mInt64(s3_get_token_limit, "0");
+
+CONF_mInt64(s3_put_bucket_tokens, "1000000000000000000");
+CONF_mInt64(s3_put_token_per_second, "1000000000000000000");
+CONF_mInt64(s3_put_token_limit, "0");
+
 // The secondary package name of the MetaService.
 CONF_String(secondary_package_name, "");
 
diff --git a/cloud/src/common/sync_point.h b/cloud/src/common/sync_point.h
index b81ce31a5c1..30ecd3e0887 100644
--- a/cloud/src/common/sync_point.h
+++ b/cloud/src/common/sync_point.h
@@ -172,14 +172,14 @@ static_assert(ret_val_ptr != nullptr, "ret_val_ptr cannot 
be nullptr");\
 TEST_SYNC_POINT_CALLBACK(sync_point_name, ret_val_ptr); \
 { \
   bool pred = false; \
-  TEST_SYNC_POINT_CALLBACK(sync_point_name"::pred", &pred); \
+  TEST_SYNC_POINT_CALLBACK(std::string(sync_point_name) + "::pred", &pred); \
   if (pred) return *ret_val_ptr; \
 }
 
 # define TEST_SYNC_POINT_RETURN_WITH_VOID(sync_point_name) \
 { \
   bool pred = false; \
-  TEST_SYNC_POINT_CALLBACK(sync_point_name"::pred", &pred); \
+  TEST_SYNC_POINT_CALLBACK(std::string(sync_point_name) + "::pred", &pred); \
   if (pred) return; \
 }
 
diff --git a/cloud/src/rate-limiter/CMakeLists.txt 
b/cloud/src/rate-limiter/CMakeLists.txt
index 631c90d656a..6d4a5e0a54c 100644
--- a/cloud/src/rate-limiter/CMakeLists.txt
+++ b/cloud/src/rate-limiter/CMakeLists.txt
@@ -9,4 +9,5 @@ set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lfdb_c 
-L${THIRDPARTY_DIR
 
 add_library(RateLimiter
     rate_limiter.cpp
+    s3_rate_limiter.cpp
 )
diff --git a/cloud/src/rate-limiter/s3_rate_limiter.cpp 
b/cloud/src/rate-limiter/s3_rate_limiter.cpp
new file mode 100644
index 00000000000..92c13e232bc
--- /dev/null
+++ b/cloud/src/rate-limiter/s3_rate_limiter.cpp
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_rate_limiter.h"
+
+#include <fmt/format.h>
+
+#include <chrono>
+#include <thread>
+
+namespace doris::cloud {
+// Just 10^6.
+static constexpr auto MS = 1000000UL;
+
+std::pair<size_t, double> S3RateLimiter::_update_remain_token(
+        std::chrono::system_clock::time_point now, size_t amount) {
+    // Values obtained under lock to be checked after release
+    size_t count_value;
+    double tokens_value;
+    {
+        std::lock_guard<SpinLock> lock(_mutex);
+        if (_max_speed) {
+            double delta_seconds = static_cast<double>((now - 
_prev_ms).count()) / MS;
+            _remain_tokens = std::min<double>(_remain_tokens + _max_speed * 
delta_seconds - amount,
+                                              _max_burst);
+        }
+        _count += amount;
+        count_value = _count;
+        tokens_value = _remain_tokens;
+        _prev_ms = now;
+    }
+    return {count_value, tokens_value};
+}
+
+int64_t S3RateLimiter::add(size_t amount) {
+    // Values obtained under lock to be checked after release
+    auto [count_value, tokens_value] =
+            _update_remain_token(std::chrono::high_resolution_clock::now(), 
amount);
+
+    if (_limit && count_value > _limit) {
+        // CK would throw exception
+        return -1;
+    }
+
+    // Wait unless there is positive amount of remain_tokens - throttling
+    int64_t sleep_time_ms = 0;
+    if (_max_speed && tokens_value < 0) {
+        sleep_time_ms = static_cast<int64_t>(-tokens_value / _max_speed * MS);
+        std::this_thread::sleep_for(std::chrono::microseconds(sleep_time_ms));
+    }
+
+    return sleep_time_ms;
+}
+
+S3RateLimiterHolder::S3RateLimiterHolder(S3RateLimitType type, size_t 
max_speed, size_t max_burst,
+                                         size_t limit)
+        : rate_limiter(std::make_unique<S3RateLimiter>(max_speed, max_burst, 
limit)),
+          
rate_limit_bvar(bvar::Adder<uint64_t>(fmt::format("{}_rate_limit_ms", 
to_string(type)))) {
+}
+
+int64_t S3RateLimiterHolder::add(size_t amount) {
+    int64_t sleep;
+    {
+        std::shared_lock read {rate_limiter_rw_lock};
+        sleep = rate_limiter->add(amount);
+    }
+    if (sleep > 0) {
+        rate_limit_bvar << sleep;
+    }
+    return sleep;
+}
+
+int S3RateLimiterHolder::reset(size_t max_speed, size_t max_burst, size_t 
limit) {
+    {
+        std::unique_lock write {rate_limiter_rw_lock};
+        rate_limiter = std::make_unique<S3RateLimiter>(max_speed, max_burst, 
limit);
+    }
+    return 0;
+}
+
+std::string to_string(S3RateLimitType type) {
+    switch (type) {
+    case S3RateLimitType::GET:
+        return "get";
+    case S3RateLimitType::PUT:
+        return "put";
+    default:
+        return std::to_string(static_cast<size_t>(type));
+    }
+}
+
+S3RateLimitType string_to_s3_rate_limit_type(std::string_view value) {
+    if (value == "get") {
+        return S3RateLimitType::GET;
+    } else if (value == "put") {
+        return S3RateLimitType::PUT;
+    }
+    return S3RateLimitType::UNKNOWN;
+}
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/rate-limiter/s3_rate_limiter.h 
b/cloud/src/rate-limiter/s3_rate_limiter.h
new file mode 100644
index 00000000000..9e30df0da1f
--- /dev/null
+++ b/cloud/src/rate-limiter/s3_rate_limiter.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bvar/bvar.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <shared_mutex>
+
+namespace doris::cloud {
+
+enum class S3RateLimitType : int {
+    GET = 0,
+    PUT,
+    UNKNOWN,
+};
+
+extern std::string to_string(S3RateLimitType type);
+extern S3RateLimitType string_to_s3_rate_limit_type(std::string_view value);
+
+class SpinLock {
+public:
+    SpinLock() = default;
+
+    void lock() {
+        while (_flag.test_and_set(std::memory_order_acq_rel)) {
+            // Spin until we acquire the lock
+        }
+    }
+
+    void unlock() { _flag.clear(std::memory_order_acq_rel); }
+
+private:
+    std::atomic_flag _flag = ATOMIC_FLAG_INIT;
+};
+
+class S3RateLimiter {
+public:
+    static constexpr size_t default_burst_seconds = 1;
+
+    S3RateLimiter(size_t max_speed, size_t max_burst, size_t limit)
+            : _max_speed(max_speed),
+              _max_burst(max_burst),
+              _limit(limit),
+              _remain_tokens(max_burst) {}
+
+    // Use `amount` remain_tokens, sleeps if required or throws exception on 
limit overflow.
+    // Returns duration of sleep in nanoseconds (to distinguish sleeping on 
different kinds of S3RateLimiters for metrics)
+    int64_t add(size_t amount);
+
+private:
+    std::pair<size_t, double> 
_update_remain_token(std::chrono::system_clock::time_point now,
+                                                   size_t amount);
+    size_t _count {0};
+    const size_t _max_speed {0}; // in tokens per second. which indicates the 
QPS
+    const size_t _max_burst {0}; // in tokens. which indicates the token 
bucket size
+    const uint64_t _limit {0};   // 0 - not limited.
+    SpinLock _mutex;
+    // Amount of remain_tokens available in token bucket. Updated in `add` 
method.
+    double _remain_tokens {0};
+    std::chrono::system_clock::time_point _prev_ms; // Previous `add` call 
time (in nanoseconds).
+};
+
+class S3RateLimiterHolder {
+public:
+    S3RateLimiterHolder(S3RateLimitType type, size_t max_speed, size_t 
max_burst, size_t limit);
+
+    int64_t add(size_t amount);
+
+    int reset(size_t max_speed, size_t max_burst, size_t limit);
+
+private:
+    std::shared_mutex rate_limiter_rw_lock;
+    std::unique_ptr<S3RateLimiter> rate_limiter;
+    bvar::Adder<uint64_t> rate_limit_bvar;
+};
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/recycler_service.cpp 
b/cloud/src/recycler/recycler_service.cpp
index 5b0b3e8078c..657797d1c81 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -28,6 +28,7 @@
 #include "common/util.h"
 #include "meta-service/keys.h"
 #include "meta-service/txn_kv_error.h"
+#include "rate-limiter/s3_rate_limiter.h"
 #include "recycler/checker.h"
 #include "recycler/meta_checker.h"
 #include "recycler/recycler.h"
@@ -399,6 +400,34 @@ void 
RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
         return;
     }
 
+    if (unresolved_path == "adjust_rate_limiter") {
+        auto type_string = uri.GetQuery("type");
+        auto speed = uri.GetQuery("speed");
+        auto burst = uri.GetQuery("burst");
+        auto limit = uri.GetQuery("limit");
+        if (type_string->empty() || speed->empty() || burst->empty() || 
limit->empty() ||
+            (*type_string != "get" && *type_string != "put")) {
+            msg = "argument not suitable";
+            response_body = msg;
+            status_code = 400;
+            return;
+        }
+        auto max_speed = speed->empty() ? 0 : std::stoul(*speed);
+        auto max_burst = burst->empty() ? 0 : std::stoul(*burst);
+        auto max_limit = burst->empty() ? 0 : std::stoul(*limit);
+        if (0 != 
reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed,
+                                       max_burst, max_limit)) {
+            msg = "adjust failed";
+            response_body = msg;
+            status_code = 400;
+            return;
+        }
+
+        status_code = 200;
+        response_body = msg;
+        return;
+    }
+
     status_code = 404;
     msg = "not found";
     response_body = msg;
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index d1ebfe62a1d..3f5d78b1c8f 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -34,11 +34,42 @@
 #include <execution>
 #include <utility>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "common/sync_point.h"
+#include "rate-limiter/s3_rate_limiter.h"
 #include "recycler/obj_store_accessor.h"
 
 namespace doris::cloud {
+
+struct AccessorRateLimiter {
+public:
+    ~AccessorRateLimiter() = default;
+    static AccessorRateLimiter& instance();
+    S3RateLimiterHolder* rate_limiter(S3RateLimitType type);
+
+private:
+    AccessorRateLimiter();
+    std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
+};
+
+[[maybe_unused]] static Aws::Client::AWSError<Aws::S3::S3Errors> 
s3_error_factory() {
+    return {Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds 
limit", false};
+}
+
+template <typename Func>
+auto do_s3_rate_limit(S3RateLimitType type, Func callback) -> 
decltype(callback()) {
+    using T = decltype(callback());
+    if (!config::enable_s3_rate_limiter) {
+        return callback();
+    }
+    auto sleep_duration = 
AccessorRateLimiter::instance().rate_limiter(type)->add(1);
+    if (sleep_duration < 0) {
+        return T(s3_error_factory());
+    }
+    return callback();
+}
+
 #ifndef UNIT_TEST
 #define HELP_MACRO(ret, req, point_name)
 #else
@@ -50,15 +81,50 @@ namespace doris::cloud {
             return p;                                          \
         }();                                                   \
         return ret;                                            \
-    } while (false)
+    } while (false);
 #endif
-#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, req) \
-    [&]() mutable {                                         \
-        [[maybe_unused]] decltype((expr)) t;                \
-        HELP_MACRO(t, req, point_name);                     \
-        return (expr);                                      \
+#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name, type) \
+    [&]() -> decltype(auto) {                                         \
+        using T = decltype((expr));                                   \
+        [[maybe_unused]] T t;                                         \
+        HELP_MACRO(t, request, point_name)                            \
+        return do_s3_rate_limit(type, [&]() { return (expr); });      \
     }()
 
+AccessorRateLimiter::AccessorRateLimiter()
+        : _rate_limiters {std::make_unique<S3RateLimiterHolder>(
+                                  S3RateLimitType::GET, 
config::s3_get_token_per_second,
+                                  config::s3_get_bucket_tokens, 
config::s3_get_token_limit),
+                          std::make_unique<S3RateLimiterHolder>(
+                                  S3RateLimitType::PUT, 
config::s3_put_token_per_second,
+                                  config::s3_put_bucket_tokens, 
config::s3_put_token_limit)} {}
+
+S3RateLimiterHolder* AccessorRateLimiter::rate_limiter(S3RateLimitType type) {
+    CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << 
to_string(type);
+    return _rate_limiters[static_cast<size_t>(type)].get();
+}
+
+AccessorRateLimiter& AccessorRateLimiter::instance() {
+    static AccessorRateLimiter instance;
+    return instance;
+}
+
+int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t 
max_burst, size_t limit) {
+    if (type == S3RateLimitType::UNKNOWN) {
+        return -1;
+    }
+    if (type == S3RateLimitType::GET) {
+        max_speed = (max_speed == 0) ? config::s3_get_token_per_second : 
max_speed;
+        max_burst = (max_burst == 0) ? config::s3_get_bucket_tokens : 
max_burst;
+        limit = (limit == 0) ? config::s3_get_token_limit : limit;
+    } else {
+        max_speed = (max_speed == 0) ? config::s3_put_token_per_second : 
max_speed;
+        max_burst = (max_burst == 0) ? config::s3_put_bucket_tokens : 
max_burst;
+        limit = (limit == 0) ? config::s3_put_token_limit : limit;
+    }
+    return 
AccessorRateLimiter::instance().rate_limiter(type)->reset(max_speed, max_burst, 
limit);
+}
+
 class S3Environment {
 public:
     S3Environment() { Aws::InitAPI(aws_options_); }
@@ -107,8 +173,9 @@ int S3Accessor::delete_objects_by_prefix(const std::string& 
relative_path) {
     delete_request.SetBucket(conf_.bucket);
     bool is_truncated = false;
     do {
-        auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
-                                                    
"s3_client::list_objects_v2", request);
+        auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+                s3_client_->ListObjectsV2(request), std::ref(request).get(),
+                "s3_client::list_objects_v2", S3RateLimitType::GET);
         if (!outcome.IsSuccess()) {
             LOG_WARNING("failed to list objects")
                     .tag("endpoint", conf_.endpoint)
@@ -136,9 +203,9 @@ int S3Accessor::delete_objects_by_prefix(const std::string& 
relative_path) {
             Aws::S3::Model::Delete del;
             del.WithObjects(std::move(objects)).SetQuiet(true);
             delete_request.SetDelete(std::move(del));
-            auto delete_outcome =
-                    
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
-                                                 "s3_client::delete_objects", 
delete_request);
+            auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+                    s3_client_->DeleteObjects(delete_request), 
std::ref(delete_request).get(),
+                    "s3_client::delete_objects", S3RateLimitType::PUT);
             if (!delete_outcome.IsSuccess()) {
                 LOG_WARNING("failed to delete objects")
                         .tag("endpoint", conf_.endpoint)
@@ -198,9 +265,9 @@ int S3Accessor::delete_objects(const 
std::vector<std::string>& relative_paths) {
         }
         del.WithObjects(std::move(objects)).SetQuiet(true);
         delete_request.SetDelete(std::move(del));
-        auto delete_outcome =
-                
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
-                                             "s3_client::delete_objects", 
delete_request);
+        auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+                s3_client_->DeleteObjects(delete_request), 
std::ref(delete_request).get(),
+                "s3_client::delete_objects", S3RateLimitType::PUT);
         if (!delete_outcome.IsSuccess()) {
             LOG_WARNING("failed to delete objects")
                     .tag("endpoint", conf_.endpoint)
@@ -231,8 +298,9 @@ int S3Accessor::delete_object(const std::string& 
relative_path) {
     Aws::S3::Model::DeleteObjectRequest request;
     auto key = get_key(relative_path);
     request.WithBucket(conf_.bucket).WithKey(key);
-    auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request),
-                                                "s3_client::delete_object", 
request);
+    auto outcome =
+            SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request), 
std::ref(request).get(),
+                                         "s3_client::delete_object", 
S3RateLimitType::PUT);
     if (!outcome.IsSuccess()) {
         LOG_WARNING("failed to delete object")
                 .tag("endpoint", conf_.endpoint)
@@ -253,8 +321,9 @@ int S3Accessor::put_object(const std::string& 
relative_path, const std::string&
     auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
     *input << content;
     request.SetBody(input);
-    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request),
-                                                "s3_client::put_object", 
request);
+    auto outcome =
+            SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request), 
std::ref(request).get(),
+                                         "s3_client::put_object", 
S3RateLimitType::PUT);
     if (!outcome.IsSuccess()) {
         LOG_WARNING("failed to put object")
                 .tag("endpoint", conf_.endpoint)
@@ -274,8 +343,9 @@ int S3Accessor::list(const std::string& relative_path, 
std::vector<ObjectMeta>*
 
     bool is_truncated = false;
     do {
-        auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
-                                                    
"s3_client::list_objects_v2", request);
+        auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+                s3_client_->ListObjectsV2(request), std::ref(request).get(),
+                "s3_client::list_objects_v2", S3RateLimitType::GET);
         ;
         if (!outcome.IsSuccess()) {
             LOG_WARNING("failed to list objects")
@@ -301,9 +371,9 @@ int S3Accessor::exist(const std::string& relative_path) {
     Aws::S3::Model::HeadObjectRequest request;
     auto key = get_key(relative_path);
     request.WithBucket(conf_.bucket).WithKey(key);
-    auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request),
-                                                "s3_client::head_object", 
request);
-    ;
+    auto outcome =
+            SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request), 
std::ref(request).get(),
+                                         "s3_client::head_object", 
S3RateLimitType::GET);
     if (outcome.IsSuccess()) {
         return 0;
     } else if (outcome.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
@@ -326,9 +396,9 @@ int S3Accessor::delete_expired_objects(const std::string& 
relative_path, int64_t
 
     bool is_truncated = false;
     do {
-        auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
-                                                    
"s3_client::list_objects_v2", request);
-        ;
+        auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+                s3_client_->ListObjectsV2(request), std::ref(request).get(),
+                "s3_client::list_objects_v2", S3RateLimitType::GET);
         if (!outcome.IsSuccess()) {
             LOG_WARNING("failed to list objects")
                     .tag("endpoint", conf_.endpoint)
@@ -379,9 +449,9 @@ int S3Accessor::get_bucket_lifecycle(int64_t* 
expiration_days) {
     Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
     request.SetBucket(conf_.bucket);
 
-    auto outcome =
-            
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketLifecycleConfiguration(request),
-                                         
"s3_client::get_bucket_lifecycle_configuration", request);
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_client_->GetBucketLifecycleConfiguration(request), 
std::ref(request).get(),
+            "s3_client::get_bucket_lifecycle_configuration", 
S3RateLimitType::GET);
     bool has_lifecycle = false;
     if (outcome.IsSuccess()) {
         const auto& rules = outcome.GetResult().GetRules();
@@ -414,8 +484,9 @@ int S3Accessor::get_bucket_lifecycle(int64_t* 
expiration_days) {
 int S3Accessor::check_bucket_versioning() {
     Aws::S3::Model::GetBucketVersioningRequest request;
     request.SetBucket(conf_.bucket);
-    auto outcome = 
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketVersioning(request),
-                                                
"s3_client::get_bucket_versioning", request);
+    auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+            s3_client_->GetBucketVersioning(request), std::ref(request).get(),
+            "s3_client::get_bucket_versioning", S3RateLimitType::GET);
 
     if (outcome.IsSuccess()) {
         const auto& versioning_configuration = outcome.GetResult().GetStatus();
@@ -452,6 +523,6 @@ int GcsAccessor::delete_objects(const 
std::vector<std::string>& relative_paths)
     }
     return ret;
 }
-
+#undef SYNC_POINT_HOOK_RETURN_VALUE
 #undef HELP_MACRO
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 1025ceab52e..1a44b0971cb 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -27,6 +27,10 @@ class S3Client;
 
 namespace doris::cloud {
 
+enum class S3RateLimitType;
+extern int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, 
size_t max_burst,
+                                 size_t limit);
+
 struct S3Conf {
     std::string ak;
     std::string sk;
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 331e85fab2e..94b84aa4ba0 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -31,6 +31,8 @@ add_executable(meta_server_test meta_server_test.cpp)
 
 add_executable(rate_limiter_test rate_limiter_test.cpp)
 
+add_executable(s3_rate_limiter_test s3_rate_limiter_test.cpp)
+
 add_executable(encryption_test encryption_test.cpp)
 
 add_executable(metric_test metric_test.cpp)
@@ -64,6 +66,8 @@ target_link_libraries(meta_server_test ${TEST_LINK_LIBS})
 
 target_link_libraries(rate_limiter_test ${TEST_LINK_LIBS})
 
+target_link_libraries(s3_rate_limiter_test ${TEST_LINK_LIBS})
+
 target_link_libraries(encryption_test ${TEST_LINK_LIBS})
 
 target_link_libraries(metric_test ${TEST_LINK_LIBS})
diff --git a/cloud/test/s3_rate_limiter_test.cpp 
b/cloud/test/s3_rate_limiter_test.cpp
new file mode 100644
index 00000000000..fc20500f682
--- /dev/null
+++ b/cloud/test/s3_rate_limiter_test.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <rate-limiter/s3_rate_limiter.h>
+
+#include <atomic>
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include "common/configbase.h"
+#include "common/logging.h"
+
+using namespace doris::cloud;
+
+int main(int argc, char** argv) {
+    auto conf_file = "doris_cloud.conf";
+    if (!doris::cloud::config::init(conf_file, true)) {
+        std::cerr << "failed to init config file, conf=" << conf_file << 
std::endl;
+        return -1;
+    }
+    if (!doris::cloud::init_glog("util")) {
+        std::cerr << "failed to init glog" << std::endl;
+        return -1;
+    }
+
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
+
+TEST(S3RateLimiterTest, normal) {
+    auto rate_limiter = S3RateLimiter(1, 5, 10);
+    std::atomic_int64_t failed;
+    std::atomic_int64_t succ;
+    std::atomic_int64_t sleep_thread_num;
+    std::atomic_int64_t sleep;
+    auto request_thread = [&]() {
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+        auto ms = rate_limiter.add(1);
+        if (ms < 0) {
+            failed++;
+        } else if (ms == 0) {
+            succ++;
+        } else {
+            sleep += ms;
+            sleep_thread_num++;
+        }
+    };
+    {
+        std::vector<std::thread> threads;
+        for (size_t i = 0; i < 20; i++) {
+            threads.emplace_back(request_thread);
+        }
+        for (auto& t : threads) {
+            if (t.joinable()) {
+                t.join();
+            }
+        }
+    }
+    ASSERT_EQ(failed, 10);
+    ASSERT_EQ(succ, 6);
+    ASSERT_EQ(sleep_thread_num, 4);
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to