This is an automated email from the ASF dual-hosted git repository.

jiashunzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new bf88565c support dynamic update method concurrency (#2923)
bf88565c is described below

commit bf88565ca987379a72d45c771b1ca1d7af746d66
Author: Yang,Liming <liming.y...@139.com>
AuthorDate: Sun Mar 30 23:12:49 2025 +0800

    support dynamic update method concurrency (#2923)
---
 src/brpc/adaptive_max_concurrency.cpp            | 10 ++++++++++
 src/brpc/adaptive_max_concurrency.h              |  4 ++++
 src/brpc/concurrency_limiter.h                   |  3 +++
 src/brpc/policy/auto_concurrency_limiter.cpp     |  4 ++++
 src/brpc/policy/auto_concurrency_limiter.h       |  2 ++
 src/brpc/policy/constant_concurrency_limiter.cpp |  6 ++++++
 src/brpc/policy/constant_concurrency_limiter.h   |  2 ++
 src/brpc/policy/timeout_concurrency_limiter.cpp  |  5 +++++
 src/brpc/policy/timeout_concurrency_limiter.h    |  2 ++
 src/brpc/server.cpp                              |  9 +--------
 10 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/src/brpc/adaptive_max_concurrency.cpp 
b/src/brpc/adaptive_max_concurrency.cpp
index ae11ceff..3b6443b6 100644
--- a/src/brpc/adaptive_max_concurrency.cpp
+++ b/src/brpc/adaptive_max_concurrency.cpp
@@ -21,6 +21,7 @@
 #include "butil/logging.h"
 #include "butil/strings/string_number_conversions.h"
 #include "brpc/adaptive_max_concurrency.h"
+#include "brpc/concurrency_limiter.h"
 
 namespace brpc {
 
@@ -72,6 +73,9 @@ void AdaptiveMaxConcurrency::operator=(const 
butil::StringPiece& value) {
         value.CopyToString(&_value);
         _max_concurrency = -1;
     }
+    if (_cl) {
+        _cl->ResetMaxConcurrency(*this);
+    }
 }
 
 void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
@@ -82,12 +86,18 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency) 
{
         _value = butil::string_printf("%d", max_concurrency);
         _max_concurrency = max_concurrency;
     }
+    if (_cl) {
+        _cl->ResetMaxConcurrency(*this);
+    }
 }
 
 void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) {
     _value = "timeout";
     _max_concurrency = -1;
     _timeout_conf = value;
+    if (_cl) {
+        _cl->ResetMaxConcurrency(*this);
+    }
 }
 
 const std::string& AdaptiveMaxConcurrency::type() const {
diff --git a/src/brpc/adaptive_max_concurrency.h 
b/src/brpc/adaptive_max_concurrency.h
index 6bdad1ef..5bea0ec4 100644
--- a/src/brpc/adaptive_max_concurrency.h
+++ b/src/brpc/adaptive_max_concurrency.h
@@ -32,6 +32,7 @@ struct TimeoutConcurrencyConf {
     int max_concurrency;
 };
 
+class ConcurrencyLimiter;
 class AdaptiveMaxConcurrency{
 public:
     explicit AdaptiveMaxConcurrency();
@@ -68,11 +69,14 @@ public:
     static const std::string& UNLIMITED();
     static const std::string& CONSTANT();
 
+    void SetConcurrencyLimiter(ConcurrencyLimiter* cl) { _cl = cl; }
+
 private:
     std::string _value;
     int _max_concurrency;
     TimeoutConcurrencyConf
         _timeout_conf;  // TODO std::varient for different type
+    ConcurrencyLimiter* _cl{nullptr};
 };
 
 inline std::ostream& operator<<(std::ostream& os, const 
AdaptiveMaxConcurrency& amc) {
diff --git a/src/brpc/concurrency_limiter.h b/src/brpc/concurrency_limiter.h
index 083e2cf9..351dd0de 100644
--- a/src/brpc/concurrency_limiter.h
+++ b/src/brpc/concurrency_limiter.h
@@ -47,6 +47,9 @@ public:
     // The return value is only for logging.
     virtual int MaxConcurrency() = 0;
 
+    // Reset max_concurrency
+    virtual int ResetMaxConcurrency(const AdaptiveMaxConcurrency& amc) = 0;
+
     // Create an instance from the amc
     // Caller is responsible for delete the instance after usage.
     virtual ConcurrencyLimiter* New(const AdaptiveMaxConcurrency& amc) const = 
0;
diff --git a/src/brpc/policy/auto_concurrency_limiter.cpp 
b/src/brpc/policy/auto_concurrency_limiter.cpp
index d1d52d6d..dd5a02ec 100644
--- a/src/brpc/policy/auto_concurrency_limiter.cpp
+++ b/src/brpc/policy/auto_concurrency_limiter.cpp
@@ -134,6 +134,10 @@ int AutoConcurrencyLimiter::MaxConcurrency() {
     return _max_concurrency;
 }
 
+int AutoConcurrencyLimiter::ResetMaxConcurrency(const AdaptiveMaxConcurrency&) 
{
+    return -1;
+}
+
 int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
     int64_t reset_start_us = sampling_time_us + 
         (FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 + 
diff --git a/src/brpc/policy/auto_concurrency_limiter.h 
b/src/brpc/policy/auto_concurrency_limiter.h
index 6cf5e10c..d221f73b 100644
--- a/src/brpc/policy/auto_concurrency_limiter.h
+++ b/src/brpc/policy/auto_concurrency_limiter.h
@@ -35,6 +35,8 @@ public:
 
     int MaxConcurrency() override;
 
+    int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
+
     AutoConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const override;
 
 private:
diff --git a/src/brpc/policy/constant_concurrency_limiter.cpp 
b/src/brpc/policy/constant_concurrency_limiter.cpp
index be5f071c..7d73e2ec 100644
--- a/src/brpc/policy/constant_concurrency_limiter.cpp
+++ b/src/brpc/policy/constant_concurrency_limiter.cpp
@@ -35,6 +35,12 @@ int ConstantConcurrencyLimiter::MaxConcurrency() {
     return _max_concurrency.load(butil::memory_order_relaxed);
 }
 
+int ConstantConcurrencyLimiter::ResetMaxConcurrency(
+    const AdaptiveMaxConcurrency& amc) {
+    _max_concurrency.store(static_cast<int>(amc), butil::memory_order_relaxed);
+    return 0;
+}
+
 ConstantConcurrencyLimiter*
 ConstantConcurrencyLimiter::New(const AdaptiveMaxConcurrency& amc) const {
     CHECK_EQ(amc.type(), AdaptiveMaxConcurrency::CONSTANT());
diff --git a/src/brpc/policy/constant_concurrency_limiter.h 
b/src/brpc/policy/constant_concurrency_limiter.h
index f58a6286..9bae9393 100644
--- a/src/brpc/policy/constant_concurrency_limiter.h
+++ b/src/brpc/policy/constant_concurrency_limiter.h
@@ -33,6 +33,8 @@ public:
 
     int MaxConcurrency() override;
 
+    int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
+
     ConstantConcurrencyLimiter* New(const AdaptiveMaxConcurrency&) const 
override;
 
 private:
diff --git a/src/brpc/policy/timeout_concurrency_limiter.cpp 
b/src/brpc/policy/timeout_concurrency_limiter.cpp
index 98c1a200..b2582eb1 100644
--- a/src/brpc/policy/timeout_concurrency_limiter.cpp
+++ b/src/brpc/policy/timeout_concurrency_limiter.cpp
@@ -117,6 +117,11 @@ int TimeoutConcurrencyLimiter::MaxConcurrency() {
     return FLAGS_timeout_cl_max_concurrency;
 }
 
+int TimeoutConcurrencyLimiter::ResetMaxConcurrency(
+    const AdaptiveMaxConcurrency &) {
+    return -1;
+}
+
 bool TimeoutConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
                                           int64_t sampling_time_us) {
     std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
diff --git a/src/brpc/policy/timeout_concurrency_limiter.h 
b/src/brpc/policy/timeout_concurrency_limiter.h
index 3f0485ee..f7e4dde6 100644
--- a/src/brpc/policy/timeout_concurrency_limiter.h
+++ b/src/brpc/policy/timeout_concurrency_limiter.h
@@ -34,6 +34,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
 
     int MaxConcurrency() override;
 
+    int ResetMaxConcurrency(const AdaptiveMaxConcurrency&) override;
+
     TimeoutConcurrencyLimiter* New(
         const AdaptiveMaxConcurrency&) const override;
 
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 2da703ef..d27c73ec 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -1095,6 +1095,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
                 return -1;
             }
             it->second.status->SetConcurrencyLimiter(cl);
+            it->second.max_concurrency.SetConcurrencyLimiter(cl);
         }
     }
     if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
@@ -2221,10 +2222,6 @@ int Server::ResetMaxConcurrency(int max_concurrency) {
 }
 
 AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
-    if (IsRunning()) {
-        LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server 
started";
-        return g_default_max_concurrency_of_method;
-    }
     if (mp->status == NULL) {
         LOG(ERROR) << "method=" << mp->method->full_name()
                    << " does not support max_concurrency";
@@ -2235,10 +2232,6 @@ AdaptiveMaxConcurrency& 
Server::MaxConcurrencyOf(MethodProperty* mp) {
 }
 
 int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
-    if (IsRunning()) {
-        LOG(WARNING) << "MaxConcurrencyOf is only allowed before Server 
started";
-        return g_default_max_concurrency_of_method;
-    }
     if (mp == NULL || mp->status == NULL) {
         return 0;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to