This is an automated email from the ASF dual-hosted git repository. jiashunzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new bf88565c support dynamic update method concurrency (#2923) bf88565c is described below commit bf88565ca987379a72d45c771b1ca1d7af746d66 Author: Yang,Liming <liming.y...@139.com> AuthorDate: Sun Mar 30 23:12:49 2025 +0800 support dynamic update method concurrency (#2923) --- src/brpc/adaptive_max_concurrency.cpp | 10 ++++++++++ src/brpc/adaptive_max_concurrency.h | 4 ++++ src/brpc/concurrency_limiter.h | 3 +++ src/brpc/policy/auto_concurrency_limiter.cpp | 4 ++++ src/brpc/policy/auto_concurrency_limiter.h | 2 ++ src/brpc/policy/constant_concurrency_limiter.cpp | 6 ++++++ src/brpc/policy/constant_concurrency_limiter.h | 2 ++ src/brpc/policy/timeout_concurrency_limiter.cpp | 5 +++++ src/brpc/policy/timeout_concurrency_limiter.h | 2 ++ src/brpc/server.cpp | 9 +-------- 10 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/brpc/adaptive_max_concurrency.cpp b/src/brpc/adaptive_max_concurrency.cpp index ae11ceff..3b6443b6 100644 --- a/src/brpc/adaptive_max_concurrency.cpp +++ b/src/brpc/adaptive_max_concurrency.cpp @@ -21,6 +21,7 @@ #include "butil/logging.h" #include "butil/strings/string_number_conversions.h" #include "brpc/adaptive_max_concurrency.h" +#include "brpc/concurrency_limiter.h" namespace brpc { @@ -72,6 +73,9 @@ void AdaptiveMaxConcurrency::operator=(const butil::StringPiece& value) { value.CopyToString(&_value); _max_concurrency = -1; } + if (_cl) { + _cl->ResetMaxConcurrency(*this); + } } void AdaptiveMaxConcurrency::operator=(int max_concurrency) { @@ -82,12 +86,18 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency) { _value = butil::string_printf("%d", max_concurrency); _max_concurrency = max_concurrency; } + if (_cl) { + _cl->ResetMaxConcurrency(*this); + } } void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) { _value = "timeout"; _max_concurrency = -1; _timeout_conf = value; + if (_cl) { + _cl->ResetMaxConcurrency(*this); + } } const std::string& AdaptiveMaxConcurrency::type() const { diff --git a/src/brpc/adaptive_max_concurrency.h b/src/brpc/adaptive_max_concurrency.h index 6bdad1ef..5bea0ec4 100644 --- a/src/brpc/adaptive_max_concurrency.h +++ b/src/brpc/adaptive_max_concurrency.h @@ -32,6 +32,7 @@ struct TimeoutConcurrencyConf { int max_concurrency; }; +class ConcurrencyLimiter; class AdaptiveMaxConcurrency{ public: explicit AdaptiveMaxConcurrency(); @@ -68,11 +69,14 @@ public: static const std::string& UNLIMITED(); static const std::string& CONSTANT(); + void SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl = cl; } + private: std::string _value; int _max_concurrency; TimeoutConcurrencyConf _timeout_conf; // TODO std::varient for different type + ConcurrencyLimiter* _cl{nullptr}; }; inline std::ostream& operator<<(std::ostream& os, const AdaptiveMaxConcurrency& amc) { diff --git a/src/brpc/concurrency_limiter.h b/src/brpc/concurrency_limiter.h index 083e2cf9..351dd0de 100644 --- a/src/brpc/concurrency_limiter.h +++ b/src/brpc/concurrency_limiter.h @@ -47,6 +47,9 @@ public: // The return value is only for logging. virtual int MaxConcurrency() = 0; + // Reset max_concurrency + virtual int ResetMaxConcurrency(const AdaptiveMaxConcurrency& amc) = 0; + // Create an instance from the amc // Caller is responsible for delete the instance after usage. virtual ConcurrencyLimiter* New(const AdaptiveMaxConcurrency& amc) const = 0; diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp b/src/brpc/policy/auto_concurrency_limiter.cpp index d1d52d6d..dd5a02ec 100644 --- a/src/brpc/policy/auto_concurrency_limiter.cpp +++ b/src/brpc/policy/auto_concurrency_limiter.cpp @@ -134,6 +134,10 @@ int AutoConcurrencyLimiter::MaxConcurrency() { return _max_concurrency; } +int AutoConcurrencyLimiter::ResetMaxConcurrency(const AdaptiveMaxConcurrency&) { + return -1; +} + int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) { int64_t reset_start_us = sampling_time_us + (FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 + diff --git a/src/brpc/policy/auto_concurrency_limiter.h b/src/brpc/policy/auto_concurrency_limiter.h index 6cf5e10c..d221f73b 100644 --- a/src/brpc/policy/auto_concurrency_limiter.h +++ b/src/brpc/policy/auto_concurrency_limiter.h @@ -35,6 +35,8 @@ public: int MaxConcurrency() override; + int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override; + AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override; private: diff --git a/src/brpc/policy/constant_concurrency_limiter.cpp b/src/brpc/policy/constant_concurrency_limiter.cpp index be5f071c..7d73e2ec 100644 --- a/src/brpc/policy/constant_concurrency_limiter.cpp +++ b/src/brpc/policy/constant_concurrency_limiter.cpp @@ -35,6 +35,12 @@ int ConstantConcurrencyLimiter::MaxConcurrency() { return _max_concurrency.load(butil::memory_order_relaxed); } +int ConstantConcurrencyLimiter::ResetMaxConcurrency( + const AdaptiveMaxConcurrency& amc) { + _max_concurrency.store(static_cast<int>(amc), butil::memory_order_relaxed); + return 0; +} + ConstantConcurrencyLimiter* ConstantConcurrencyLimiter::New(const AdaptiveMaxConcurrency& amc) const { CHECK_EQ(amc.type(), AdaptiveMaxConcurrency::CONSTANT()); diff --git a/src/brpc/policy/constant_concurrency_limiter.h b/src/brpc/policy/constant_concurrency_limiter.h index f58a6286..9bae9393 100644 --- a/src/brpc/policy/constant_concurrency_limiter.h +++ b/src/brpc/policy/constant_concurrency_limiter.h @@ -33,6 +33,8 @@ public: int MaxConcurrency() override; + int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override; + ConstantConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override; private: diff --git a/src/brpc/policy/timeout_concurrency_limiter.cpp b/src/brpc/policy/timeout_concurrency_limiter.cpp index 98c1a200..b2582eb1 100644 --- a/src/brpc/policy/timeout_concurrency_limiter.cpp +++ b/src/brpc/policy/timeout_concurrency_limiter.cpp @@ -117,6 +117,11 @@ int TimeoutConcurrencyLimiter::MaxConcurrency() { return FLAGS_timeout_cl_max_concurrency; } +int TimeoutConcurrencyLimiter::ResetMaxConcurrency( + const AdaptiveMaxConcurrency &) { + return -1; +} + bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us) { std::unique_lock<butil::Mutex> lock_guard(_sw_mutex); diff --git a/src/brpc/policy/timeout_concurrency_limiter.h b/src/brpc/policy/timeout_concurrency_limiter.h index 3f0485ee..f7e4dde6 100644 --- a/src/brpc/policy/timeout_concurrency_limiter.h +++ b/src/brpc/policy/timeout_concurrency_limiter.h @@ -34,6 +34,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter { int MaxConcurrency() override; + int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override; + TimeoutConcurrencyLimiter* New( const AdaptiveMaxConcurrency&) const override; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 2da703ef..d27c73ec 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -1095,6 +1095,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, return -1; } it->second.status->SetConcurrencyLimiter(cl); + it->second.max_concurrency.SetConcurrencyLimiter(cl); } } if (0 != SetServiceMaxConcurrency(_options.nshead_service)) { @@ -2221,10 +2222,6 @@ int Server::ResetMaxConcurrency(int max_concurrency) { } AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) { - if (IsRunning()) { - LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server started"; - return g_default_max_concurrency_of_method; - } if (mp->status == NULL) { LOG(ERROR) << "method=" << mp->method->full_name() << " does not support max_concurrency"; @@ -2235,10 +2232,6 @@ AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) { } int Server::MaxConcurrencyOf(const MethodProperty* mp) const { - if (IsRunning()) { - LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server started"; - return g_default_max_concurrency_of_method; - } if (mp == NULL || mp->status == NULL) { return 0; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org