This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4783fc09011 [feature](meta-service) Support querying and adjusting rpc
qps limit on meta service (#42413)
4783fc09011 is described below
commit 4783fc0901131287983620719dcaef1517fd1e43
Author: Siyang Tang <[email protected]>
AuthorDate: Wed Nov 13 18:34:53 2024 +0800
[feature](meta-service) Support querying and adjusting rpc qps limit on
meta service (#42413)
## Proposed changes
Usage
1. adjust limit
```
curl http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?${params}
```
| Entry | Description |
| ----------- | ----------- |
| param | uint64 qps_limit |
|behavior | set qps_limit global default value |
|example|```curl
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000```|
| Entry | Description |
| ----------- | ----------- |
| param | uint64 qps_limit, string rpc_name |
|behavior | set RPC specific qps_limit |
|example|curl
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&rpc_name=get_cluster|
| Entry | Description |
| ----------- | ----------- |
| param | uint64 qps_limit, string rpc_name, string instance_id |
|behavior | set instance qps_limit for specific RPC |
|example|```ccurl
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&rpc_name=get_cluster&instance_id="doris-0"```|
| Entry | Description |
| ----------- | ----------- |
| param | uint64 qps_limit, string instance_id |
|behavior | set global qps_limit for specific instance |
|example|```curl
http://ms_ip:ms_port/MetaService/http/v1/adjust_rate_limit?qps_limit=5000000&instance_id="doris-0"```|
2. query limit
| Entry | Description |
| ----------- | ----------- |
| param | none |
|behavior | query qps limit for all RPC interface |
|example|```curl
http://ms_ip:ms_port/MetaService/http/v1/list_rate_limit```|
---
cloud/src/meta-service/meta_service_http.cpp | 121 ++++++++++++-
cloud/src/rate-limiter/rate_limiter.cpp | 115 ++++++++++--
cloud/src/rate-limiter/rate_limiter.h | 76 +++++++-
cloud/test/meta_service_http_test.cpp | 90 ++++++++++
cloud/test/rate_limiter_test.cpp | 250 +++++++++++++++++++++++----
5 files changed, 605 insertions(+), 47 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_http.cpp
b/cloud/src/meta-service/meta_service_http.cpp
index 95907376dd2..2f7536e9989 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -22,6 +22,7 @@
#include <brpc/uri.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
#include <google/protobuf/message.h>
#include <google/protobuf/service.h>
#include <google/protobuf/util/json_util.h>
@@ -30,8 +31,14 @@
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>
+#include <algorithm>
+#include <array>
+#include <cstdint>
+#include <functional>
#include <memory>
#include <optional>
+#include <string>
+#include <string_view>
#include <type_traits>
#include <variant>
#include <vector>
@@ -42,6 +49,7 @@
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "meta_service.h"
+#include "rate-limiter/rate_limiter.h"
namespace doris::cloud {
@@ -333,6 +341,113 @@ static HttpResponse process_alter_iam(MetaServiceImpl*
service, brpc::Controller
return http_json_reply(resp.status());
}
+static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service,
brpc::Controller* cntl) {
+ const auto& uri = cntl->http_request().uri();
+ auto qps_limit_str = std::string {http_query(uri, "qps_limit")};
+ auto rpc_name = std::string {http_query(uri, "rpc_name")};
+ auto instance_id = std::string {http_query(uri, "instance_id")};
+
+ auto process_set_qps_limit = [&](std::function<bool(int64_t)> cb) ->
HttpResponse {
+ DCHECK(!qps_limit_str.empty());
+ int64_t qps_limit = -1;
+ try {
+ qps_limit = std::stoll(qps_limit_str);
+ } catch (const std::exception& ex) {
+ return http_json_reply(
+ MetaServiceCode::INVALID_ARGUMENT,
+ fmt::format("param `qps_limit` is not a legal int64
type:{}", ex.what()));
+ }
+ if (qps_limit < 0) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+ "`qps_limit` should not be less than 0");
+ }
+ if (cb(qps_limit)) {
+ return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate
limit");
+ }
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+ fmt::format("failed to adjust rate limit for
qps_limit={}, "
+ "rpc_name={}, instance_id={}, plz
ensure correct "
+ "rpc/instance name",
+ qps_limit_str, rpc_name,
instance_id));
+ };
+
+ auto set_global_qps_limit = [process_set_qps_limit, service]() {
+ return process_set_qps_limit([service](int64_t qps_limit) {
+ return service->rate_limiter()->set_rate_limit(qps_limit);
+ });
+ };
+
+ auto set_rpc_qps_limit = [&]() {
+ return process_set_qps_limit([&](int64_t qps_limit) {
+ return service->rate_limiter()->set_rate_limit(qps_limit,
rpc_name);
+ });
+ };
+
+ auto set_instance_qps_limit = [&]() {
+ return process_set_qps_limit([&](int64_t qps_limit) {
+ return service->rate_limiter()->set_instance_rate_limit(qps_limit,
instance_id);
+ });
+ };
+
+ auto set_instance_rpc_qps_limit = [&]() {
+ return process_set_qps_limit([&](int64_t qps_limit) {
+ return service->rate_limiter()->set_rate_limit(qps_limit,
rpc_name, instance_id);
+ });
+ };
+
+ auto process_invalid_arguments = [&]() -> HttpResponse {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+ fmt::format("invalid argument:
qps_limit(required)={}, "
+ "rpc_name(optional)={},
instance_id(optional)={}",
+ qps_limit_str, rpc_name,
instance_id));
+ };
+
+ // We have 3 optional params and 2^3 combination, and 4 of them are
illegal.
+ // We register callbacks for them in porcessors accordings to the level,
represented by 3 bits.
+ std::array<std::function<HttpResponse()>, 8> processors;
+ std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments));
+ processors[0b001] = std::move(set_global_qps_limit);
+ processors[0b011] = std::move(set_rpc_qps_limit);
+ processors[0b101] = std::move(set_instance_qps_limit);
+ processors[0b111] = std::move(set_instance_rpc_qps_limit);
+
+ uint8_t level = (0x01 & qps_limit_str.empty()) | ((0x01 &
rpc_name.empty()) << 1) |
+ ((0x01 & instance_id.empty()) << 2);
+
+ DCHECK_LT(level, 8);
+
+ return processors[level]();
+}
+
+static HttpResponse process_query_rate_limit(MetaServiceImpl* service,
brpc::Controller* cntl) {
+ auto rate_limiter = service->rate_limiter();
+ rapidjson::Document d;
+ auto get_qps_limit = [&d](std::string_view rpc_name,
+ std::shared_ptr<RpcRateLimiter> rpc_limiter) {
+ rapidjson::Document node;
+ rapidjson::Document sub;
+ auto get_qps_token_limit = [&](std::string_view instance_id,
+
std::shared_ptr<RpcRateLimiter::QpsToken> qps_token) {
+ sub.AddMember(rapidjson::StringRef(instance_id.data(),
instance_id.size()),
+ qps_token->max_qps_limit(), d.GetAllocator());
+ };
+ rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit));
+
+ auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit());
+ node.AddMember("RPC qps limit",
+ rapidjson::StringRef(max_qps_limit.data(),
max_qps_limit.size()),
+ d.GetAllocator());
+ node.AddMember("instance specific qps limit", sub, d.GetAllocator());
+ d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()),
node, d.GetAllocator());
+ };
+ rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit));
+
+ rapidjson::StringBuffer sb;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
+ d.Accept(writer);
+ return http_json_reply(MetaServiceCode::OK, sb.GetString());
+}
+
static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller*
ctrl) {
auto& uri = ctrl->http_request().uri();
std::string_view key = http_query(uri, "key");
@@ -615,13 +730,17 @@ void
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"abort_tablet_job", process_abort_tablet_job},
{"alter_ram_user", process_alter_ram_user},
{"alter_iam", process_alter_iam},
+ {"adjust_rate_limit", process_adjust_rate_limit},
+ {"list_rate_limit", process_query_rate_limit},
{"v1/abort_txn", process_abort_txn},
{"v1/abort_tablet_job", process_abort_tablet_job},
{"v1/alter_ram_user", process_alter_ram_user},
{"v1/alter_iam", process_alter_iam},
+ {"v1/adjust_rate_limit", process_adjust_rate_limit},
+ {"v1/list_rate_limit", process_query_rate_limit},
};
- auto cntl = static_cast<brpc::Controller*>(controller);
+ auto* cntl = static_cast<brpc::Controller*>(controller);
brpc::ClosureGuard closure_guard(done);
// Prepare input request info
diff --git a/cloud/src/rate-limiter/rate_limiter.cpp
b/cloud/src/rate-limiter/rate_limiter.cpp
index 8988ff0560b..1d7d0a10ac8 100644
--- a/cloud/src/rate-limiter/rate_limiter.cpp
+++ b/cloud/src/rate-limiter/rate_limiter.cpp
@@ -17,11 +17,16 @@
#include "rate_limiter.h"
+#include <bthread/mutex.h>
#include <butil/strings/string_split.h>
+#include <algorithm>
#include <chrono>
+#include <cstdint>
#include <memory>
#include <mutex>
+#include <ranges>
+#include <shared_mutex>
#include "common/bvars.h"
#include "common/config.h"
@@ -29,10 +34,10 @@
namespace doris::cloud {
-void RateLimiter::init(google::protobuf::Service* service) {
- std::map<std::string, int64_t> rpc_name_to_max_qps_limit;
+std::unordered_map<std::string, int64_t> parse_specific_qps_limit(const
std::string& list_str) {
+ std::unordered_map<std::string, int64_t> rpc_name_to_max_qps_limit;
std::vector<std::string> max_qps_limit_list;
- butil::SplitString(config::specific_max_qps_limit, ';',
&max_qps_limit_list);
+ butil::SplitString(list_str, ';', &max_qps_limit_list);
for (const auto& v : max_qps_limit_list) {
auto p = v.find(':');
if (p != std::string::npos && p != (v.size() - 1)) {
@@ -41,29 +46,42 @@ void RateLimiter::init(google::protobuf::Service* service) {
int64_t max_qps_limit = std::stoll(v.substr(p + 1));
if (max_qps_limit > 0) {
rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit;
- LOG(INFO) << "set rpc: " << rpc_name << " max_qps_limit: "
<< max_qps_limit;
}
} catch (...) {
- LOG(WARNING) << "failed to set max_qps_limit to rpc: " <<
rpc_name
+ LOG(WARNING) << "failed to parse max_qps_limit to rpc: " <<
rpc_name
<< " config: " << v;
}
}
}
+ return rpc_name_to_max_qps_limit;
+}
+
+template <typename Callable>
+void for_each_rpc_name(google::protobuf::Service* service, Callable cb) {
auto method_size = service->GetDescriptor()->method_count();
for (auto i = 0; i < method_size; ++i) {
std::string rpc_name = service->GetDescriptor()->method(i)->name();
- int64_t max_qps_limit = config::default_max_qps_limit;
+ cb(rpc_name);
+ }
+}
- auto it = rpc_name_to_max_qps_limit.find(rpc_name);
- if (it != rpc_name_to_max_qps_limit.end()) {
+void RateLimiter::init(google::protobuf::Service* service) {
+ auto rpc_name_to_specific_limit =
parse_specific_qps_limit(config::specific_max_qps_limit);
+ std::unique_lock write_lock(mutex_);
+ for_each_rpc_name(service, [&](const std::string& rpc_name) {
+ auto it = rpc_name_to_specific_limit.find(rpc_name);
+ int64_t max_qps_limit = config::default_max_qps_limit;
+ if (it != rpc_name_to_specific_limit.end()) {
max_qps_limit = it->second;
}
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name,
max_qps_limit);
+ });
+ for (const auto& [k, _] : rpc_name_to_specific_limit) {
+ rpc_with_specific_limit_.insert(k);
}
}
std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const
std::string& rpc_name) {
- // no need to be locked, because it is only modified during initialization
auto it = limiters_.find(rpc_name);
if (it == limiters_.end()) {
return nullptr;
@@ -71,6 +89,49 @@ std::shared_ptr<RpcRateLimiter>
RateLimiter::get_rpc_rate_limiter(const std::str
return it->second;
}
+bool RateLimiter::set_rate_limit(int64_t qps_limit) {
+ std::lock_guard lock(mutex_);
+ auto filter = [this](const auto& kv) { return
!rpc_with_specific_limit_.contains(kv.first); };
+ for (const auto& [_, v] : limiters_ |
std::views::filter(std::move(filter))) {
+ v->set_max_qps_limit(qps_limit);
+ }
+ return true;
+}
+
+bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string&
rpc_name) {
+ if (!limiters_.contains(rpc_name)) {
+ return false;
+ }
+ auto limiter = limiters_.at(rpc_name);
+ std::lock_guard lock(mutex_);
+ limiter->set_max_qps_limit(qps_limit);
+ rpc_with_specific_limit_.insert(rpc_name);
+ return true;
+}
+
+bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string&
rpc_name,
+ const std::string& instance_id) {
+ if (!limiters_.contains(rpc_name)) {
+ return false;
+ }
+ auto limiter = limiters_.at(rpc_name);
+ return limiter->set_max_qps_limit(qps_limit, instance_id);
+}
+
+bool RateLimiter::set_instance_rate_limit(int64_t qps_limit, const
std::string& instance_id) {
+ return std::ranges::all_of(limiters_, [&](const auto& kv) {
+ return kv.second->set_max_qps_limit(qps_limit, instance_id);
+ });
+ return true;
+}
+
+void RateLimiter::for_each_rpc_limiter(
+ std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)>
cb) {
+ for (const auto& [rpc_name, rpc_limiter] : limiters_) {
+ cb(rpc_name, rpc_limiter);
+ }
+}
+
bool RpcRateLimiter::get_qps_token(const std::string& instance_id,
std::function<int()>& get_bvar_qps) {
if (!config::use_detailed_metrics || instance_id.empty()) {
@@ -93,6 +154,35 @@ bool RpcRateLimiter::get_qps_token(const std::string&
instance_id,
return qps_token->get_token(get_bvar_qps);
}
+void RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit) {
+ std::lock_guard<bthread::Mutex> l(mutex_);
+ max_qps_limit_ = max_qps_limit;
+ auto filter = [this](const auto& kv) {
+ return !instance_with_specific_limit_.contains(kv.first);
+ };
+ for (auto& [k, v] : qps_limiter_ | std::views::filter(std::move(filter))) {
+ v->set_max_qps_limit(max_qps_limit);
+ }
+}
+
+bool RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit, const
std::string& instance_id) {
+ std::lock_guard<bthread::Mutex> l(mutex_);
+ if (!qps_limiter_.contains(instance_id)) {
+ qps_limiter_[instance_id] = std::make_shared<QpsToken>(max_qps_limit);
+ } else {
+ qps_limiter_.at(instance_id)->set_max_qps_limit(max_qps_limit);
+ }
+ instance_with_specific_limit_.insert(instance_id);
+ return true;
+}
+
+void RpcRateLimiter::for_each_qps_token(
+ std::function<void(std::string_view, std::shared_ptr<QpsToken>)> cb) {
+ for (const auto& [instance_id, qps_token] : qps_limiter_) {
+ cb(instance_id, qps_token);
+ }
+}
+
bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
using namespace std::chrono;
auto now = steady_clock::now();
@@ -110,4 +200,9 @@ bool
RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
return current_qps_ < max_qps_limit_;
}
-} // namespace doris::cloud
\ No newline at end of file
+void RpcRateLimiter::QpsToken::set_max_qps_limit(int64_t max_qps_limit) {
+ std::lock_guard<bthread::Mutex> l(mutex_);
+ max_qps_limit_ = max_qps_limit;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/rate-limiter/rate_limiter.h
b/cloud/src/rate-limiter/rate_limiter.h
index df441656aa4..e557d0d8a10 100644
--- a/cloud/src/rate-limiter/rate_limiter.h
+++ b/cloud/src/rate-limiter/rate_limiter.h
@@ -19,10 +19,13 @@
#include <brpc/server.h>
#include <bthread/mutex.h>
+#include <google/protobuf/service.h>
#include <cstdint>
#include <memory>
+#include <shared_mutex>
#include <string>
+#include <string_view>
#include <unordered_map>
#include "common/config.h"
@@ -35,12 +38,54 @@ class RateLimiter {
public:
RateLimiter() = default;
~RateLimiter() = default;
+
void init(google::protobuf::Service* service);
+
std::shared_ptr<RpcRateLimiter> get_rpc_rate_limiter(const std::string&
rpc_name);
+ /**
+ * @brief for each rpc limiter, apply callback
+ *
+ * @param cb callback function with params rpc name and rate limiter
+ */
+ void for_each_rpc_limiter(
+ std::function<void(std::string_view,
std::shared_ptr<RpcRateLimiter>)> cb);
+
+ /**
+ * @brief set global default rate limit, will not infulence rpc and
instance specific qps limit setting
+ *
+ * @return true if set sucessfully
+ */
+ bool set_rate_limit(int64_t qps_limit);
+
+ /**
+ * @brief set rpc level rate limit, will not infulence instance specific
qps limit setting
+ *
+ * @return true if set sucessfully
+ */
+ bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name);
+
+ /**
+ * @brief set instance level rate limit for specific rpc
+ *
+ * @return true if set sucessfully
+ */
+ bool set_rate_limit(int64_t qps_limit, const std::string& rpc_name,
+ const std::string& instance_id);
+
+ /**
+ * @brief set instance level rate limit globally, will influence settings
for the same instance of specific rpc
+ *
+ * @return true if set sucessfully
+ */
+ bool set_instance_rate_limit(int64_t qps_limit, const std::string&
instance_id);
+
private:
// rpc_name -> RpcRateLimiter
std::unordered_map<std::string, std::shared_ptr<RpcRateLimiter>> limiters_;
+ // rpc names which specific limit have been set
+ std::unordered_set<std::string> rpc_with_specific_limit_;
+ bthread::Mutex mutex_;
};
class RpcRateLimiter {
@@ -58,15 +103,34 @@ public:
*/
bool get_qps_token(const std::string& instance_id, std::function<int()>&
get_bvar_qps);
- // Todo: Recycle outdated instance_id
+ std::string_view rpc_name() const { return rpc_name_; }
+
+ int64_t max_qps_limit() const { return max_qps_limit_; }
+
+ /**
+ * @brief set max qps limit for this limiter
+ *
+ * @return true if set sucessfully
+ */
+ void set_max_qps_limit(int64_t max_qps_limit);
+
+ /**
+ * @brief set max qps limit for specific instance within this limiter
+ *
+ * @return true if set sucessfully
+ */
+ bool set_max_qps_limit(int64_t max_qps_limit, const std::string& instance);
-private:
class QpsToken {
public:
QpsToken(const int64_t max_qps_limit) : max_qps_limit_(max_qps_limit)
{}
bool get_token(std::function<int()>& get_bvar_qps);
+ void set_max_qps_limit(int64_t max_qps_limit);
+
+ int64_t max_qps_limit() const { return max_qps_limit_; }
+
private:
bthread::Mutex mutex_;
std::chrono::steady_clock::time_point last_update_time_;
@@ -75,12 +139,18 @@ private:
int64_t max_qps_limit_;
};
+ void for_each_qps_token(std::function<void(std::string_view,
std::shared_ptr<QpsToken>)> cb);
+
+ // Todo: Recycle outdated instance_id
+
private:
bthread::Mutex mutex_;
// instance_id -> QpsToken
std::unordered_map<std::string, std::shared_ptr<QpsToken>> qps_limiter_;
+ // instance ids which specific limit have been set
+ std::unordered_set<std::string> instance_with_specific_limit_;
std::string rpc_name_;
int64_t max_qps_limit_;
};
-} // namespace doris::cloud
\ No newline at end of file
+} // namespace doris::cloud
diff --git a/cloud/test/meta_service_http_test.cpp
b/cloud/test/meta_service_http_test.cpp
index d1b8fd66943..4360efeb442 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -1535,4 +1535,94 @@ TEST(MetaServiceHttpTest,
get_obj_store_info_response_sk) {
ms->get_obj_store_info(&cntl, &req1, &res1, nullptr);
}
+TEST(MetaServiceHttpTest, AdjustRateLimit) {
+ HttpContext ctx;
+ {
+ auto [status_code, content] =
+ ctx.query<std::string>("adjust_rate_limit", "qps_limit=10000");
+ ASSERT_EQ(status_code, 200);
+ }
+ {
+ auto [status_code, content] =
+ ctx.query<std::string>("adjust_rate_limit",
"qps_limit=10000&rpc_name=get_cluster");
+ ASSERT_EQ(status_code, 200);
+ }
+ {
+ auto [status_code, content] = ctx.query<std::string>(
+ "adjust_rate_limit",
+
"qps_limit=10000&rpc_name=get_cluster&instance_id=test_instance");
+ ASSERT_EQ(status_code, 200);
+ }
+ {
+ auto [status_code, content] = ctx.query<std::string>(
+ "adjust_rate_limit",
"qps_limit=10000&instance_id=test_instance");
+ ASSERT_EQ(status_code, 200);
+ }
+ {
+ auto [status_code, content] =
+ ctx.query<std::string>("adjust_rate_limit",
"qps_limit=invalid");
+ ASSERT_EQ(status_code, 400);
+ std::string msg = "param `qps_limit` is not a legal int64 type:";
+ ASSERT_NE(content.find(msg), std::string::npos);
+ }
+ {
+ auto [status_code, content] =
ctx.query<std::string>("adjust_rate_limit", "qps_limit=-1");
+ ASSERT_EQ(status_code, 400);
+ std::string msg = "qps_limit` should not be less than 0";
+ ASSERT_NE(content.find(msg), std::string::npos);
+ }
+ {
+ auto [status_code, content] =
+ ctx.query<std::string>("adjust_rate_limit",
"rpc_name=get_cluster");
+ ASSERT_EQ(status_code, 400);
+ std::string msg = "invalid argument:";
+ ASSERT_NE(content.find(msg), std::string::npos);
+ }
+ {
+ auto [status_code, content] =
+ ctx.query<std::string>("adjust_rate_limit",
"instance_id=test_instance");
+ ASSERT_EQ(status_code, 400);
+ std::string msg = "invalid argument:";
+ ASSERT_NE(content.find(msg), std::string::npos);
+ }
+ {
+ auto [status_code, content] = ctx.query<std::string>(
+ "adjust_rate_limit",
"rpc_name=get_cluster&instance_id=test_instance");
+ ASSERT_EQ(status_code, 400);
+ std::string msg = "invalid argument:";
+ ASSERT_NE(content.find(msg), std::string::npos);
+ }
+ {
+ auto [status_code, content] =
ctx.query<std::string>("adjust_rate_limit", "");
+ ASSERT_EQ(status_code, 400);
+ std::string msg = "invalid argument:";
+ ASSERT_NE(content.find(msg), std::string::npos);
+ }
+ {
+ auto [status_code, content] =
+ ctx.query<std::string>("adjust_rate_limit",
"qps_limit=1000&rpc_name=invalid");
+ ASSERT_EQ(status_code, 400);
+ std::string msg = "failed to adjust rate limit for qps_limit";
+ ASSERT_NE(content.find(msg), std::string::npos);
+ }
+ {
+ auto [status_code, content] =
+ ctx.query<std::string>("adjust_rate_limit",
"qps_limit=1000&instance_id=invalid");
+ ASSERT_EQ(status_code, 200);
+ }
+ {
+ auto [status_code, content] = ctx.query<std::string>(
+ "adjust_rate_limit",
"qps_limit=1000&rpc_name=get_cluster&instance_id=invalid");
+ ASSERT_EQ(status_code, 200);
+ }
+}
+
+TEST(MetaServiceHttpTest, QueryRateLimit) {
+ HttpContext ctx;
+ {
+ auto [status_code, content] =
ctx.query<std::string>("list_rate_limit", "");
+ ASSERT_EQ(status_code, 200);
+ }
+}
+
} // namespace doris::cloud
diff --git a/cloud/test/rate_limiter_test.cpp b/cloud/test/rate_limiter_test.cpp
index 2a10451a69f..cab7b01774c 100644
--- a/cloud/test/rate_limiter_test.cpp
+++ b/cloud/test/rate_limiter_test.cpp
@@ -17,8 +17,11 @@
#include "rate-limiter/rate_limiter.h"
+#include <gen_cpp/cloud.pb.h>
#include <gtest/gtest.h>
+#include <cstddef>
+
#include "common/config.h"
#include "common/util.h"
#include "meta-service/keys.h"
@@ -26,6 +29,7 @@
#include "meta-service/meta_service.h"
#include "meta-service/txn_kv_error.h"
#include "mock_resource_manager.h"
+#include "resource-manager/resource_manager.h"
int main(int argc, char** argv) {
doris::cloud::config::init(nullptr, true);
@@ -35,25 +39,50 @@ int main(int argc, char** argv) {
using namespace doris::cloud;
+const std::string mock_instance_0 = "mock_instance_0";
+const std::string mock_instance_1 = "mock_instance_1";
+const std::string mock_cluster_0 = "mock_cluster_0";
+const std::string mock_cluster_1 = "mock_cluster_1";
+const std::string mock_cluster_id_0 = "mock_cluster_id_0";
+const std::string mock_cluster_id_1 = "mock_cluster_id_1";
+const std::string mock_cloud_unique_id_0 = "mock_cloud_unique_id_0";
+const std::string mock_cloud_unique_id_1 = "mock_cloud_unique_id_1";
+
+class MockMultiInstanceRsMgr : public MockResourceManager {
+public:
+ using MockResourceManager::MockResourceManager;
+
+ std::string get_node(const std::string& cloud_unique_id,
+ std::vector<NodeInfo>* nodes) override {
+ if (cloud_unique_id == mock_cloud_unique_id_0) {
+ nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_0,
mock_cluster_0,
+ mock_cluster_id_0);
+ } else if (cloud_unique_id == mock_cloud_unique_id_1) {
+ nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_1,
mock_cluster_1,
+ mock_cluster_id_1);
+ }
+ return {};
+ };
+};
+
std::unique_ptr<MetaServiceProxy> get_meta_service() {
auto txn_kv =
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
[&] { ASSERT_NE(txn_kv.get(), nullptr); }();
- auto rs = std::make_shared<MockResourceManager>(txn_kv);
+ auto rs = std::make_shared<MockMultiInstanceRsMgr>(txn_kv);
auto rl = std::make_shared<RateLimiter>();
auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl);
return std::make_unique<MetaServiceProxy>(std::move(meta_service));
}
-TEST(RateLimiterTest, RateLimitGetClusterTest) {
- auto meta_service = get_meta_service();
+void mock_add_cluster(MetaServiceProxy& meta_service, std::string instance_id)
{
// add cluster first
- InstanceKeyInfo key_info {mock_instance};
+ InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
InstanceInfoPB instance;
- instance.set_instance_id(mock_instance);
+ instance.set_instance_id(instance_id);
ClusterPB c1;
c1.set_cluster_name(mock_cluster_name);
c1.set_cluster_id(mock_cluster_id);
@@ -63,50 +92,205 @@ TEST(RateLimiterTest, RateLimitGetClusterTest) {
std::unique_ptr<Transaction> txn;
std::string get_val;
- ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(meta_service.txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
- auto get_cluster = [&](MetaServiceCode code) {
- GetClusterRequest req;
- req.set_cloud_unique_id("test_cloud_unique_id");
- req.set_cluster_id(mock_cluster_id);
- req.set_cluster_name(mock_cluster_name);
- brpc::Controller cntl;
- GetClusterResponse res;
-
meta_service->get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
- &res, nullptr);
-
- ASSERT_EQ(res.status().code(), code);
- };
+void mock_get_cluster(MetaServiceProxy& meta_service, const std::string&
cloud_uid,
+ MetaServiceCode code) {
+ GetClusterRequest req;
+ req.set_cloud_unique_id(cloud_uid);
+ req.set_cluster_id(mock_cluster_id);
+ req.set_cluster_name(mock_cluster_name);
+ brpc::Controller cntl;
+ GetClusterResponse res;
+
meta_service.get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+
+ ASSERT_EQ(res.status().code(), code);
+}
+
+template <typename Rpc>
+void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, const
std::string& cloud_uid,
+ MetaServiceCode expected, size_t times) {
std::vector<std::thread> threads;
- for (int i = 0; i < 20; ++i) {
- threads.emplace_back(get_cluster, MetaServiceCode::OK);
+ for (size_t i = 0; i < times; ++i) {
+ threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected);
});
}
for (auto& t : threads) {
t.join();
}
- threads.clear();
+}
+
+TEST(RateLimiterTest, RateLimitGetClusterTest) {
+ auto meta_service = get_meta_service();
+ mock_add_cluster(*meta_service, mock_instance_0);
+
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 20);
std::this_thread::sleep_for(std::chrono::seconds(1));
meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
- ->qps_limiter_[mock_instance]
+ ->qps_limiter_[mock_instance_0]
->max_qps_limit_ = 1;
- threads.emplace_back(get_cluster, MetaServiceCode::MAX_QPS_LIMIT);
- for (auto& t : threads) {
- t.join();
- }
- threads.clear();
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 1);
std::this_thread::sleep_for(std::chrono::seconds(1));
meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
- ->qps_limiter_[mock_instance]
+ ->qps_limiter_[mock_instance_0]
->max_qps_limit_ = 10000;
- threads.emplace_back(get_cluster, MetaServiceCode::OK);
- for (auto& t : threads) {
- t.join();
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 1);
+}
+
+TEST(RateLimiterTest, AdjustLimitInfluenceTest) {
+ auto meta_service = get_meta_service();
+ mock_add_cluster(*meta_service, mock_instance_0);
+ mock_add_cluster(*meta_service, mock_instance_1);
+
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 1);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1,
mock_instance_1));
+ ASSERT_TRUE(
+ meta_service->rate_limiter()->set_rate_limit(100,
"get_cluster", mock_instance_0));
+
+ auto limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_0)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 100);
+ limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_1)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 1);
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+ auto limit =
+
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+ ASSERT_EQ(limit, 1);
+ limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+ ASSERT_EQ(limit, 5000000);
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(5000,
"get_cluster"));
+ auto limit =
+
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+ ASSERT_EQ(limit, 1);
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1000));
+ limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+ ASSERT_EQ(limit, 5000);
+ limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+ ASSERT_EQ(limit, 1000);
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(3000,
"commit_txn"));
+ auto limit =
+
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
+ ASSERT_EQ(limit, 3000);
+ limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+ ASSERT_EQ(limit, 5000);
+ }
+ {
+ auto limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_0)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 100);
+ limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_1)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 1);
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(200,
mock_instance_1));
+ auto limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_0)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 100);
+ limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_1)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 200);
}
- threads.clear();
-}
\ No newline at end of file
+}
+
+TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
+ auto meta_service = get_meta_service();
+ mock_add_cluster(*meta_service, mock_instance_0);
+ mock_add_cluster(*meta_service, mock_instance_1);
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 20);
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::OK, 20);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000,
"get_cluster"));
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 20);
+ auto limit =
+
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+ ASSERT_EQ(limit, 10000);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1,
"get_cluster"));
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 1);
+ auto limit =
+
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
+ ASSERT_EQ(limit, 1);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000,
"get_cluster",
+
mock_instance_0));
+ ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000,
"get_cluster",
+
mock_instance_1));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::OK, 20);
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::OK, 20);
+ auto limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_0)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 10000);
+ limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_1)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 10000);
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ {
+ ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1,
mock_instance_0));
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_0,
+ MetaServiceCode::MAX_QPS_LIMIT, 1);
+ mock_parallel_rpc(mock_get_cluster, meta_service.get(),
mock_cloud_unique_id_1,
+ MetaServiceCode::OK, 20);
+ auto limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_0)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 1);
+ limit = meta_service->rate_limiter()
+ ->get_rpc_rate_limiter("get_cluster")
+ ->qps_limiter_.at(mock_instance_1)
+ ->max_qps_limit();
+ ASSERT_EQ(limit, 10000);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]