This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 9961be7d different method can use different TimeoutConcurrencyConf 
(#2112)
9961be7d is described below

commit 9961be7d7eef49e990d299e086c3d6541a8bdb60
Author: Yang,Liming <[email protected]>
AuthorDate: Fri Mar 10 10:18:29 2023 +0800

    different method can use different TimeoutConcurrencyConf (#2112)
---
 docs/cn/timeout_concurrency_limiter.md             |  4 +++-
 src/brpc/adaptive_max_concurrency.cpp              | 10 +++++++++
 src/brpc/adaptive_max_concurrency.h                | 11 ++++++++++
 src/brpc/policy/timeout_concurrency_limiter.cpp    | 20 +++++++++++++-----
 src/brpc/policy/timeout_concurrency_limiter.h      |  3 +++
 test/brpc_timeout_concurrency_limiter_unittest.cpp | 24 ++++++++++++++++++++++
 6 files changed, 66 insertions(+), 6 deletions(-)

diff --git a/docs/cn/timeout_concurrency_limiter.md 
b/docs/cn/timeout_concurrency_limiter.md
index e72b7ee8..7da715a9 100644
--- a/docs/cn/timeout_concurrency_limiter.md
+++ b/docs/cn/timeout_concurrency_limiter.md
@@ -11,13 +11,15 @@
 
在服务正常运营过程中,流量的增减、请求体的大小变化,磁盘的顺序、随机读写,这些都会影响请求的延迟,用户一般情况下不希望请求延迟的波动造成错误,即使会有一些请求的排队造成请求延迟增加,因此,一般用户设置的请求超时时间都会是服务平均延迟的3至4倍。基于请求超时时间的限流是根据统计服务平均延迟和请求设置的超时时间相比较,来估算请求是否能够在设置的超时时间内完成处理,如果能够能完成则接受请求,如果不能完成则拒绝请求。由于统计服务平均延迟和当前请求的实际延迟会有一定的时间差,因此需要设置一个比较宽泛的最大并发度,保证服务不会因为突然的慢请求造成短时间内服务堆积过多的请求。
 
 ## 开启方法
-目前只有method级别支持基于超时的限流。如果要为某个method开启基于超时的限流,只需要将它的最大并发设置为"timeout"即可,如果客户端没有开启FLAGS_baidu_std_protocol_deliver_timeout_ms,可以设置FLAGS_timeout_cl_default_timeout_ms来调整一个默认的请求超时时间,可以设置FLAGS_timeout_cl_max_concurrency来调整最大并发度。
+目前只有method级别支持基于超时的限流。如果要为某个method开启基于超时的限流,只需要将它的最大并发设置为"timeout"即可,如果客户端没有开启FLAGS_baidu_std_protocol_deliver_timeout_ms,可以设置FLAGS_timeout_cl_default_timeout_ms来调整一个默认的请求超时时间,可以设置FLAGS_timeout_cl_max_concurrency来调整最大并发度。也可以通过设置brpc::TimeoutConcurrencyConf为每个method指定不同的配置。
 
 ```c++
 // Set timeout concurrency limiter for all methods
 brpc::ServerOptions options;
 options.method_max_concurrency = "timeout";
+options.method_max_concurrency = brpc::TimeoutConcurrencyConf{1, 100};
 
 // Set timeout concurrency limiter for specific method
 server.MaxConcurrencyOf("example.EchoService.Echo") = "timeout";
+server.MaxConcurrencyOf("example.EchoService.Echo") = 
brpc::TimeoutConcurrencyConf{1, 100};
 ```
diff --git a/src/brpc/adaptive_max_concurrency.cpp 
b/src/brpc/adaptive_max_concurrency.cpp
index 2e90c530..ae11ceff 100644
--- a/src/brpc/adaptive_max_concurrency.cpp
+++ b/src/brpc/adaptive_max_concurrency.cpp
@@ -40,6 +40,10 @@ AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(int 
max_concurrency)
     }
 }
 
+AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(
+    const TimeoutConcurrencyConf& value)
+    : _value("timeout"), _max_concurrency(-1), _timeout_conf(value) {}
+
 inline bool CompareStringPieceWithoutCase(
     const butil::StringPiece& s1, const char* s2) {
     DCHECK(s2 != NULL);
@@ -80,6 +84,12 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
     }
 }
 
+void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) {
+    _value = "timeout";
+    _max_concurrency = -1;
+    _timeout_conf = value;
+}
+
 const std::string& AdaptiveMaxConcurrency::type() const {
     if (_max_concurrency > 0) {
         return CONSTANT();
diff --git a/src/brpc/adaptive_max_concurrency.h 
b/src/brpc/adaptive_max_concurrency.h
index 46af4141..6bdad1ef 100644
--- a/src/brpc/adaptive_max_concurrency.h
+++ b/src/brpc/adaptive_max_concurrency.h
@@ -26,11 +26,18 @@
 
 namespace brpc {
 
+// timeout concurrency limiter config
+struct TimeoutConcurrencyConf {
+    int64_t timeout_ms;
+    int max_concurrency;
+};
+
 class AdaptiveMaxConcurrency{
 public:
     explicit AdaptiveMaxConcurrency();
     explicit AdaptiveMaxConcurrency(int max_concurrency);
     explicit AdaptiveMaxConcurrency(const butil::StringPiece& value);
+    explicit AdaptiveMaxConcurrency(const TimeoutConcurrencyConf& value);
 
     // Non-trivial destructor to prevent AdaptiveMaxConcurrency from being
     // passed to variadic arguments without explicit type conversion.
@@ -41,11 +48,13 @@ public:
 
     void operator=(int max_concurrency);
     void operator=(const butil::StringPiece& value);
+    void operator=(const TimeoutConcurrencyConf& value);
 
     // 0  for type="unlimited"
     // >0 for type="constant"
     // <0 for type="user-defined"
     operator int() const { return _max_concurrency; }
+    operator TimeoutConcurrencyConf() const { return _timeout_conf; }
 
     // "unlimited" for type="unlimited"
     // "10" "20" "30" for type="constant"
@@ -62,6 +71,8 @@ public:
 private:
     std::string _value;
     int _max_concurrency;
+    TimeoutConcurrencyConf
+        _timeout_conf;  // TODO std::varient for different type
 };
 
 inline std::ostream& operator<<(std::ostream& os, const 
AdaptiveMaxConcurrency& amc) {
diff --git a/src/brpc/policy/timeout_concurrency_limiter.cpp 
b/src/brpc/policy/timeout_concurrency_limiter.cpp
index 27c608e5..98c1a200 100644
--- a/src/brpc/policy/timeout_concurrency_limiter.cpp
+++ b/src/brpc/policy/timeout_concurrency_limiter.cpp
@@ -54,16 +54,26 @@ DEFINE_int32(timeout_cl_max_concurrency, 100,
 
 TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter()
     : _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
-      _last_sampling_time_us(0) {}
+      _last_sampling_time_us(0),
+      _timeout_ms(FLAGS_timeout_cl_default_timeout_ms),
+      _max_concurrency(FLAGS_timeout_cl_max_concurrency) {}
+
+TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter(
+    const TimeoutConcurrencyConf &conf)
+    : _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
+      _last_sampling_time_us(0),
+      _timeout_ms(conf.timeout_ms),
+      _max_concurrency(conf.max_concurrency) {}
 
 TimeoutConcurrencyLimiter *TimeoutConcurrencyLimiter::New(
-    const AdaptiveMaxConcurrency &) const {
-    return new (std::nothrow) TimeoutConcurrencyLimiter;
+    const AdaptiveMaxConcurrency &amc) const {
+    return new (std::nothrow)
+        TimeoutConcurrencyLimiter(static_cast<TimeoutConcurrencyConf>(amc));
 }
 
 bool TimeoutConcurrencyLimiter::OnRequested(int current_concurrency,
                                             Controller *cntl) {
-    auto timeout_ms = FLAGS_timeout_cl_default_timeout_ms;
+    auto timeout_ms = _timeout_ms;
     if (cntl != nullptr && cntl->timeout_ms() != UNSET_MAGIC_NUM) {
         timeout_ms = cntl->timeout_ms();
     }
@@ -71,7 +81,7 @@ bool TimeoutConcurrencyLimiter::OnRequested(int 
current_concurrency,
     // timeout, allow currency_concurrency is 1 ensures the average latency can
     // be obtained renew.
     return current_concurrency == 1 ||
-           (current_concurrency <= FLAGS_timeout_cl_max_concurrency &&
+           (current_concurrency <= _max_concurrency &&
             _avg_latency_us < timeout_ms * 1000);
 }
 
diff --git a/src/brpc/policy/timeout_concurrency_limiter.h 
b/src/brpc/policy/timeout_concurrency_limiter.h
index ca045dd6..3f0485ee 100644
--- a/src/brpc/policy/timeout_concurrency_limiter.h
+++ b/src/brpc/policy/timeout_concurrency_limiter.h
@@ -26,6 +26,7 @@ namespace policy {
 class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
    public:
     TimeoutConcurrencyLimiter();
+    explicit TimeoutConcurrencyLimiter(const TimeoutConcurrencyConf& conf);
 
     bool OnRequested(int current_concurrency, Controller* cntl) override;
 
@@ -66,6 +67,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
     BAIDU_CACHELINE_ALIGNMENT butil::atomic<int64_t> _last_sampling_time_us;
     butil::Mutex _sw_mutex;
     SampleWindow _sw;
+    int64_t _timeout_ms;
+    int _max_concurrency;
 };
 
 }  // namespace policy
diff --git a/test/brpc_timeout_concurrency_limiter_unittest.cpp 
b/test/brpc_timeout_concurrency_limiter_unittest.cpp
index c80f6921..11b9e23b 100644
--- a/test/brpc_timeout_concurrency_limiter_unittest.cpp
+++ b/test/brpc_timeout_concurrency_limiter_unittest.cpp
@@ -103,3 +103,27 @@ TEST(TimeoutConcurrencyLimiterTest, OnResponded) {
     ASSERT_EQ(limiter._sw.succ_count, 2);
     ASSERT_EQ(limiter._sw.failed_count, 0);
 }
+
+TEST(TimeoutConcurrencyLimiterTest, AdaptiveMaxConcurrencyTest) {
+    {
+        brpc::AdaptiveMaxConcurrency concurrency(
+            brpc::TimeoutConcurrencyConf{100, 100});
+        ASSERT_EQ(concurrency.type(), "timeout");
+        ASSERT_EQ(concurrency.value(), "timeout");
+    }
+    {
+        brpc::AdaptiveMaxConcurrency concurrency;
+        concurrency = "timeout";
+        ASSERT_EQ(concurrency.type(), "timeout");
+        ASSERT_EQ(concurrency.value(), "timeout");
+    }
+    {
+        brpc::AdaptiveMaxConcurrency concurrency;
+        concurrency = brpc::TimeoutConcurrencyConf{50, 100};
+        ASSERT_EQ(concurrency.type(), "timeout");
+        ASSERT_EQ(concurrency.value(), "timeout");
+        auto time_conf = 
static_cast<brpc::TimeoutConcurrencyConf>(concurrency);
+        ASSERT_EQ(time_conf.timeout_ms, 50);
+        ASSERT_EQ(time_conf.max_concurrency, 100);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to