This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 59a68dd6 Fix max concurrency of thrift protocol and nshead protocol
(#2613)
59a68dd6 is described below
commit 59a68dd65e250c96a81b9d162018797bdbf3e9be
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jun 3 16:50:09 2024 +0800
Fix max concurrency of thrift protocol and nshead protocol (#2613)
---
src/brpc/nshead_service.h | 13 +++++++------
src/brpc/server.cpp | 47 ++++++++++++++++++++++++++++++++++++++---------
src/brpc/server.h | 21 +++++++++++++++++++++
src/brpc/thrift_service.h | 6 ++++--
4 files changed, 70 insertions(+), 17 deletions(-)
diff --git a/src/brpc/nshead_service.h b/src/brpc/nshead_service.h
index c90c7873..49ff9d79 100644
--- a/src/brpc/nshead_service.h
+++ b/src/brpc/nshead_service.h
@@ -22,7 +22,7 @@
#include "brpc/controller.h" // Controller
#include "brpc/nshead_message.h" // NsheadMessage
#include "brpc/describable.h"
-
+#include "brpc/adaptive_max_concurrency.h"
namespace brpc {
@@ -40,7 +40,7 @@ public:
explicit NsheadClosure(void* additional_space);
// [Required] Call this to send response back to the client.
- void Run();
+ void Run() override;
// [Optional] Set the full method name. If unset, use name of the service.
void SetMethodName(const std::string& full_method_name);
@@ -59,7 +59,7 @@ private:
friend void policy::ProcessNsheadRequest(InputMessageBase* msg_base);
friend class DeleteNsheadClosure;
// Only callable by Run().
- ~NsheadClosure();
+ ~NsheadClosure() override;
const Server* _server;
int64_t _received_us;
@@ -84,8 +84,8 @@ struct NsheadServiceOptions {
class NsheadService : public Describable {
public:
NsheadService();
- NsheadService(const NsheadServiceOptions&);
- virtual ~NsheadService();
+ explicit NsheadService(const NsheadServiceOptions&);
+ ~NsheadService() override;
// Implement this method to handle nshead requests. Notice that this
// method can be called with a failed Controller(something wrong with the
@@ -104,7 +104,7 @@ public:
NsheadClosure* done) = 0;
// Put descriptions into the stream.
- void Describe(std::ostream &os, const DescribeOptions&) const;
+ void Describe(std::ostream &os, const DescribeOptions&) const override;
private:
DISALLOW_COPY_AND_ASSIGN(NsheadService);
@@ -118,6 +118,7 @@ private:
// Tracking status of non NsheadPbService
MethodStatus* _status;
+ AdaptiveMaxConcurrency _max_concurrency;
size_t _additional_space;
std::string _cached_name;
};
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 93228289..51fb1d16 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -738,8 +738,8 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port);
}
-static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
- ConcurrencyLimiter** out) {
+bool Server::CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
+ ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
@@ -1055,6 +1055,15 @@ int Server::StartInternal(const butil::EndPoint&
endpoint,
it->second.status->SetConcurrencyLimiter(cl);
}
}
+ if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
+ return -1;
+ }
+#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
+ if (0 != SetServiceMaxConcurrency(_options.thrift_service)) {
+ return -1;
+ }
+#endif
+
// Create listening ports
if (port_range.min_port > port_range.max_port) {
@@ -2216,13 +2225,33 @@ int Server::MaxConcurrencyOf(const MethodProperty* mp)
const {
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece&
full_method_name) {
- MethodProperty* mp = _method_map.seek(full_method_name);
- if (mp == NULL) {
- LOG(ERROR) << "Fail to find method=" << full_method_name;
- _failed_to_set_max_concurrency_of_method = true;
- return g_default_max_concurrency_of_method;
- }
- return MaxConcurrencyOf(mp);
+ do {
+ if (full_method_name == butil::class_name_str<NsheadService>()) {
+ if (NULL == options().nshead_service) {
+ break;
+ }
+ return options().nshead_service->_max_concurrency;
+ }
+#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
+ if (full_method_name == butil::class_name_str<ThriftService>()) {
+ if (NULL == options().thrift_service) {
+ break;
+ }
+ return options().thrift_service->_max_concurrency;
+ }
+#endif
+
+ MethodProperty* mp = _method_map.seek(full_method_name);
+ if (mp == NULL) {
+ break;
+ }
+ return MaxConcurrencyOf(mp);
+
+ } while (false);
+
+ LOG(ERROR) << "Fail to find method=" << full_method_name;
+ _failed_to_set_max_concurrency_of_method = true;
+ return g_default_max_concurrency_of_method;
}
int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const
{
diff --git a/src/brpc/server.h b/src/brpc/server.h
index 4843d0d0..5bc518ef 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -42,6 +42,7 @@
#include "brpc/http2.h"
#include "brpc/redis.h"
#include "brpc/interceptor.h"
+#include "brpc/concurrency_limiter.h"
namespace brpc {
@@ -674,6 +675,26 @@ friend class Controller;
AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;
+ static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
+ ConcurrencyLimiter** out);
+
+ template <typename T>
+ int SetServiceMaxConcurrency(T* service) {
+ if (NULL != service) {
+ const AdaptiveMaxConcurrency* amc = &service->_max_concurrency;
+ if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
+ amc = &_options.method_max_concurrency;
+ }
+ ConcurrencyLimiter* cl = NULL;
+ if (!CreateConcurrencyLimiter(*amc, &cl)) {
+ LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
+ return -1;
+ }
+ service->_status->SetConcurrencyLimiter(cl);
+ }
+ return 0;
+ }
+
DISALLOW_COPY_AND_ASSIGN(Server);
// Put frequently-accessed data pool at first.
diff --git a/src/brpc/thrift_service.h b/src/brpc/thrift_service.h
index c3d341e0..bd4ca44a 100644
--- a/src/brpc/thrift_service.h
+++ b/src/brpc/thrift_service.h
@@ -22,6 +22,7 @@
#include "brpc/controller.h" // Controller
#include "brpc/thrift_message.h" // ThriftFramedMessage
#include "brpc/describable.h"
+#include "brpc/adaptive_max_concurrency.h"
namespace brpc {
@@ -38,7 +39,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base);
class ThriftService : public Describable {
public:
ThriftService();
- virtual ~ThriftService();
+ ~ThriftService() override;
// Implement this method to handle thrift_binary requests.
// Parameters:
@@ -53,7 +54,7 @@ public:
::google::protobuf::Closure* done) = 0;
// Put descriptions into the stream.
- void Describe(std::ostream &os, const DescribeOptions&) const;
+ void Describe(std::ostream &os, const DescribeOptions&) const override;
private:
DISALLOW_COPY_AND_ASSIGN(ThriftService);
@@ -66,6 +67,7 @@ private:
void Expose(const butil::StringPiece& prefix);
MethodStatus* _status;
+ AdaptiveMaxConcurrency _max_concurrency;
};
} // namespace brpc
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]