This is an automated email from the ASF dual-hosted git repository.
serverglen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 65b753d1 Support fixed and jittered retry backoff policy (#2273)
65b753d1 is described below
commit 65b753d1c8fa485bf551b1db9a065291af9646a0
Author: Bright Chen <[email protected]>
AuthorDate: Fri Jul 14 19:59:32 2023 +0800
Support fixed and jittered retry backoff policy (#2273)
* Support fixed and jittered retry backoff policy
* Enable retry backoff in pthread
* Support CanRetryBackoffInPthread virtual method
* Add backoff strategy to RetryPolicy
* Only pass Controller param
* Add doc of retry backoff
* Opt retry backoff
---
docs/cn/client.md | 77 +++++++++++++++++++++++++++++++
src/brpc/controller.cpp | 71 +++++++++++++++++++----------
src/brpc/controller.h | 2 +-
src/brpc/retry_policy.cpp | 63 +++++++++++++++----------
src/brpc/retry_policy.h | 62 ++++++++++++++++++++++++-
test/brpc_channel_unittest.cpp | 101 ++++++++++++++++++++++++++++++++++++++++-
6 files changed, 326 insertions(+), 50 deletions(-)
diff --git a/docs/cn/client.md b/docs/cn/client.md
index 3d03e73c..ef714ed8 100755
--- a/docs/cn/client.md
+++ b/docs/cn/client.md
@@ -631,6 +631,83 @@ options.retry_policy = &g_my_retry_policy;
* 通过cntl->response()可获得对应RPC的response。
* 对ERPCTIMEDOUT代表的RPC超时总是不重试,即使你继承的RetryPolicy中允许。
+
+### 重试退避
+
+对于一些暂时性的错误,如网络抖动等,等待一小会儿再重试的成功率比立即重试的成功率高,同时可以打散上游重试的时机,减轻服务端压力,避免重试风暴导致服务端出现瞬间流量洪峰。
+
+框架支持两种重试退避策略:固定时间间隔退避策略和随机时间间隔退避策略。
+
+固定时间间隔退避策略需要设置固定时间间隔(毫秒)、无需重试退避的剩余rpc时间阈值(毫秒,当剩余rpc时间小于阈值,则不进行重试退避)、是否允许在pthread进行重试退避。使用方法如下:
+
+```c++
+// 给ChannelOptions.retry_policy赋值就行了。
+//
注意:retry_policy必须在Channel使用期间保持有效,Channel也不会删除retry_policy,所以大部分情况下RetryPolicy都应以单例模式创建。
+brpc::ChannelOptions options;
+int32_t fixed_backoff_time_ms = 100; // 固定时间间隔(毫秒)
+int32_t no_backoff_remaining_rpc_time_ms = 150; // 无需重试退避的剩余rpc时间阈值(毫秒)
+bool retry_backoff_in_pthread = false;
+static brpc::RpcRetryPolicyWithFixedBackoff g_retry_policy_with_fixed_backoff(
+ fixed_backoff_time_ms, no_backoff_remaining_rpc_time_ms,
retry_backoff_in_pthread);
+options.retry_policy = &g_retry_policy_with_fixed_backoff;
+...
+```
+
+随机时间间隔退避策略需要设置最小时间间隔(毫秒)、最大时间间隔(毫秒)、无需重试退避的剩余rpc时间阈值(毫秒,当剩余rpc时间小于阈值,则不进行重试退避)、是否允许在pthread做重试退避。每次策略会随机生成一个在最小时间间隔和最大时间间隔之间的重试退避间隔。使用方法如下:
+
+```c++
+// 给ChannelOptions.retry_policy赋值就行了。
+//
注意:retry_policy必须在Channel使用期间保持有效,Channel也不会删除retry_policy,所以大部分情况下RetryPolicy都应以单例模式创建。
+brpc::ChannelOptions options;
+int32_t min_backoff_time_ms = 100; // 最小时间间隔(毫秒)
+int32_t max_backoff_time_ms = 200; // 最大时间间隔(毫秒)
+int32_t no_backoff_remaining_rpc_time_ms = 150; // 无需重试退避的剩余rpc时间阈值(毫秒)
+bool retry_backoff_in_pthread = false; // 是否允许在pthread做重试退避
+static brpc::RpcRetryPolicyWithJitteredBackoff
g_retry_policy_with_jitter_backoff(
+ min_backoff_time_ms, max_backoff_time_ms,
+ no_backoff_remaining_rpc_time_ms, retry_backoff_in_pthread);
+options.retry_policy = &g_retry_policy_with_jitter_backoff;
+...
+```
+
+用户可以通过继承[brpc::RetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)自定义重试退避策略。比如只需要针对服务端并发数超限的情况进行重试退避,可以这么做:
+
+```c++
+class MyRetryPolicy : public brpc::RetryPolicy {
+public:
+ bool DoRetry(const brpc::Controller* cntl) const {
+ // 同《错误值得重试》一节
+ }
+
+ int32_t GetBackoffTimeMs(const brpc::Controller* cntl) const {
+ if (controller->ErrorCode() == brpc::ELIMIT) {
+ return 100; // 退避100毫秒
+ }
+ return 0; // 返回0表示不进行重试退避。
+ }
+
+ bool CanRetryBackoffInPthread() const {
+ return true;
+ }
+};
+...
+
+// 给ChannelOptions.retry_policy赋值就行了。
+//
注意:retry_policy必须在Channel使用期间保持有效,Channel也不会删除retry_policy,所以大部分情况下RetryPolicy都应以单例模式创建。
+brpc::ChannelOptions options;
+static MyRetryPolicy g_my_retry_policy;
+options.retry_policy = &g_my_retry_policy;
+...
+```
+
+如果用户希望使用框架默认的DoRetry,只实现自定义的重试退避策略,则可以继承[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)。
+
+一些提示:
+
+- 当策略返回的重试退避时间大于等于剩余的rpc时间或者等于0,框架不会进行重试退避,而是立即进行重试。
+-
[brpc::RpcRetryPolicyWithFixedBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(固定时间间隔退策略)和[brpc::RpcRetryPolicyWithJitteredBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(随机时间间隔退策略)继承了[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h),使用框架默认的DoRetry。
+- 在pthread中进行重试退避(实际上通过bthread_usleep实现)会阻塞pthread,所以默认不会在pthread上进行重试退避。
+
### 重试应当保守
由于成本的限制,大部分线上server的冗余度是有限的,主要是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。默认的重试是比较安全的:
只要连接不断RPC就不会重试,一般不会产生大量的重试请求。用户可以通过RetryPolicy定制重试策略,但也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 823ca953..c9935a54 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -48,6 +48,11 @@
// Force linking the .o in UT (which analysis deps by inclusions)
#include "brpc/parallel_channel.h"
#include "brpc/selective_channel.h"
+#include "bthread/task_group.h"
+
+namespace bthread {
+extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
+}
// This is the only place that both client/server must link, so we put
// registrations of errno here.
@@ -627,33 +632,51 @@ void Controller::OnVersionedRPCReturned(const
CompletionInfo& info,
++_current_call.nretry;
add_flag(FLAGS_BACKUP_REQUEST);
return IssueRPC(butil::gettimeofday_us());
- } else if (_retry_policy ? _retry_policy->DoRetry(this)
- : DefaultRetryPolicy()->DoRetry(this)) {
- // The error must come from _current_call because:
- // * we intercepted error from _unfinished_call in
OnVersionedRPCReturned
- // * ERPCTIMEDOUT/ECANCELED are not retrying error by default.
- CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code;
- if (!SingleServer()) {
- if (_accessed == NULL) {
- _accessed = ExcludedServers::Create(
- std::min(_max_retry, RETRY_AVOIDANCE));
- if (NULL == _accessed) {
- SetFailed(ENOMEM, "Fail to create ExcludedServers");
- goto END_OF_RPC;
+ } else {
+ auto retry_policy = _retry_policy ? _retry_policy :
DefaultRetryPolicy();
+ if (retry_policy->DoRetry(this)) {
+ // The error must come from _current_call because:
+ // * we intercepted error from _unfinished_call in
OnVersionedRPCReturned
+ // * ERPCTIMEDOUT/ECANCELED are not retrying error by default.
+ CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code;
+ if (!SingleServer()) {
+ if (_accessed == NULL) {
+ _accessed = ExcludedServers::Create(
+ std::min(_max_retry, RETRY_AVOIDANCE));
+ if (NULL == _accessed) {
+ SetFailed(ENOMEM, "Fail to create ExcludedServers");
+ goto END_OF_RPC;
+ }
}
+ _accessed->Add(_current_call.peer_id);
}
- _accessed->Add(_current_call.peer_id);
- }
- _current_call.OnComplete(this, _error_code, info.responded, false);
- ++_current_call.nretry;
- // Clear http responses before retrying, otherwise the response may
- // be mixed with older (and undefined) stuff. This is actually not
- // done before r32008.
- if (_http_response) {
- _http_response->Clear();
+ _current_call.OnComplete(this, _error_code, info.responded, false);
+ ++_current_call.nretry;
+ // Clear http responses before retrying, otherwise the response may
+ // be mixed with older (and undefined) stuff. This is actually not
+ // done before r32008.
+ if (_http_response) {
+ _http_response->Clear();
+ }
+ response_attachment().clear();
+
+ // Retry backoff.
+ bthread::TaskGroup* g = bthread::tls_task_group;
+ if (retry_policy->CanRetryBackoffInPthread() ||
+ (g && !g->is_current_pthread_task())) {
+ int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this)
* 1000L;
+ // No need to do retry backoff when the backoff time is longer
than the remaining rpc time.
+ if (backoff_time_us > 0 &&
+ backoff_time_us < _deadline_us - butil::gettimeofday_us())
{
+ bthread_usleep(backoff_time_us);
+ }
+
+ } else {
+ LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, "
+ "skip retry backoff in pthread.";
+ }
+ return IssueRPC(butil::gettimeofday_us());
}
- response_attachment().clear();
- return IssueRPC(butil::gettimeofday_us());
}
END_OF_RPC:
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 3d75ff50..f27dd54f 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -740,7 +740,7 @@ private:
// after CallMethod.
int _max_retry;
const RetryPolicy* _retry_policy;
- // Synchronization object for one RPC call. It remains unchanged even
+ // Synchronization object for one RPC call. It remains unchanged even
// when retry happens. Synchronous RPC will wait on this id.
CallId _correlation_id;
diff --git a/src/brpc/retry_policy.cpp b/src/brpc/retry_policy.cpp
index 4fdd594b..3120b3c8 100644
--- a/src/brpc/retry_policy.cpp
+++ b/src/brpc/retry_policy.cpp
@@ -17,34 +17,30 @@
#include "brpc/retry_policy.h"
+#include "butil/fast_rand.h"
namespace brpc {
-RetryPolicy::~RetryPolicy() {}
-
-class RpcRetryPolicy : public RetryPolicy {
-public:
- bool DoRetry(const Controller* controller) const {
- const int error_code = controller->ErrorCode();
- if (!error_code) {
- return false;
- }
- return (EFAILEDSOCKET == error_code
- || EEOF == error_code
- || EHOSTDOWN == error_code
- || ELOGOFF == error_code
- || ETIMEDOUT == error_code // This is not timeout of RPC.
- || ELIMIT == error_code
- || ENOENT == error_code
- || EPIPE == error_code
- || ECONNREFUSED == error_code
- || ECONNRESET == error_code
- || ENODATA == error_code
- || EOVERCROWDED == error_code
- || EH2RUNOUTSTREAMS == error_code);
+bool RpcRetryPolicy::DoRetry(const Controller* controller) const {
+ const int error_code = controller->ErrorCode();
+ if (!error_code) {
+ return false;
}
-};
+ return (EFAILEDSOCKET == error_code
+ || EEOF == error_code
+ || EHOSTDOWN == error_code
+ || ELOGOFF == error_code
+ || ETIMEDOUT == error_code // This is not timeout of RPC.
+ || ELIMIT == error_code
+ || ENOENT == error_code
+ || EPIPE == error_code
+ || ECONNREFUSED == error_code
+ || ECONNRESET == error_code
+ || ENODATA == error_code
+ || EOVERCROWDED == error_code
+ || EH2RUNOUTSTREAMS == error_code);
+}
// NOTE(gejun): g_default_policy can't be deleted on process's exit because
// client-side may still retry and use the policy at exit
@@ -58,4 +54,25 @@ const RetryPolicy* DefaultRetryPolicy() {
return g_default_policy;
}
+int32_t RpcRetryPolicyWithFixedBackoff::GetBackoffTimeMs(
+ const Controller* controller) const {
+ int64_t remaining_rpc_time_ms =
+ (controller->deadline_us() - butil::gettimeofday_us()) / 1000;
+ if (remaining_rpc_time_ms < _no_backoff_remaining_rpc_time_ms) {
+ return 0;
+ }
+ return _backoff_time_ms;
+}
+
+int32_t RpcRetryPolicyWithJitteredBackoff::GetBackoffTimeMs(
+ const Controller* controller) const {
+ int64_t remaining_rpc_time_ms =
+ (controller->deadline_us() - butil::gettimeofday_us()) / 1000;
+ if (remaining_rpc_time_ms < _no_backoff_remaining_rpc_time_ms) {
+ return 0;
+ }
+ return butil::fast_rand_in(_min_backoff_time_ms,
+ _max_backoff_time_ms);
+}
+
} // namespace brpc
diff --git a/src/brpc/retry_policy.h b/src/brpc/retry_policy.h
index b4852da4..bfe20d83 100644
--- a/src/brpc/retry_policy.h
+++ b/src/brpc/retry_policy.h
@@ -27,7 +27,7 @@ namespace brpc {
// Inherit this class to customize when the RPC should be retried.
class RetryPolicy {
public:
- virtual ~RetryPolicy();
+ virtual ~RetryPolicy() = default;
// Returns true if the RPC represented by `controller' should be retried.
// [Example]
@@ -68,11 +68,71 @@ public:
virtual bool DoRetry(const Controller* controller) const = 0;
// ^
// don't forget the const modifier
+
+ // Returns the backoff time in milliseconds before every retry.
+ virtual int32_t GetBackoffTimeMs(const Controller* controller) const {
return 0; }
+ // ^
+ // don't forget the const
modifier
+
+ // Returns true if enable retry backoff in pthread, otherwise returns
false.
+ virtual bool CanRetryBackoffInPthread() const { return false; }
+ // ^
+ // don't forget the const modifier
};
// Get the RetryPolicy used by brpc.
const RetryPolicy* DefaultRetryPolicy();
+class RpcRetryPolicy : public RetryPolicy {
+public:
+ bool DoRetry(const Controller* controller) const override;
+};
+
+class RpcRetryPolicyWithFixedBackoff : public RpcRetryPolicy {
+public:
+ RpcRetryPolicyWithFixedBackoff(int32_t backoff_time_ms,
+ int32_t no_backoff_remaining_rpc_time_ms,
+ bool retry_backoff_in_pthread)
+ : _backoff_time_ms(backoff_time_ms)
+ , _no_backoff_remaining_rpc_time_ms(no_backoff_remaining_rpc_time_ms)
+ , _retry_backoff_in_pthread(retry_backoff_in_pthread) {}
+
+ int32_t GetBackoffTimeMs(const Controller* controller) const override;
+
+ bool CanRetryBackoffInPthread() const override { return
_retry_backoff_in_pthread; }
+
+
+private:
+ int32_t _backoff_time_ms;
+ // If remaining rpc time is less than `_no_backoff_remaining_rpc_time', no
backoff.
+ int32_t _no_backoff_remaining_rpc_time_ms;
+ bool _retry_backoff_in_pthread;
+};
+
+class RpcRetryPolicyWithJitteredBackoff : public RpcRetryPolicy {
+public:
+ RpcRetryPolicyWithJitteredBackoff(int32_t min_backoff_time_ms,
+ int32_t max_backoff_time_ms,
+ int32_t no_backoff_remaining_rpc_time_ms,
+ bool retry_backoff_in_pthread)
+ : _min_backoff_time_ms(min_backoff_time_ms)
+ , _max_backoff_time_ms(max_backoff_time_ms)
+ ,
_no_backoff_remaining_rpc_time_ms(no_backoff_remaining_rpc_time_ms)
+ , _retry_backoff_in_pthread(retry_backoff_in_pthread) {}
+
+ int32_t GetBackoffTimeMs(const Controller* controller) const override;
+
+ bool CanRetryBackoffInPthread() const override { return
_retry_backoff_in_pthread; }
+
+private:
+ // Generate jittered backoff time between [_min_backoff_ms,
_max_backoff_ms].
+ int32_t _min_backoff_time_ms;
+ int32_t _max_backoff_time_ms;
+ // If remaining rpc time is less than `_no_backoff_remaining_rpc_time', no
backoff.
+ int32_t _no_backoff_remaining_rpc_time_ms;
+ bool _retry_backoff_in_pthread;
+};
+
} // namespace brpc
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 694f3f7f..f5806ccd 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -1786,6 +1786,7 @@ protected:
brpc::Channel channel;
brpc::ChannelOptions opt;
+ opt.timeout_ms = 1000;
if (short_connection) {
opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
}
@@ -1810,6 +1811,81 @@ protected:
StopAndJoin();
}
+ struct TestRetryBackoffInfo {
+ TestRetryBackoffInfo(ChannelTest* channel_test_param,
+ bool async_param,
+ bool short_connection_param,
+ bool fixed_backoff_param)
+ : channel_test(channel_test_param)
+ , async(async_param)
+ , short_connection(short_connection_param)
+ , fixed_backoff(fixed_backoff_param) {}
+
+ ChannelTest* channel_test;
+ int async;
+ int short_connection;
+ int fixed_backoff;
+ };
+
+ static void* TestRetryBackoffBthread(void* void_args) {
+ auto args = static_cast<TestRetryBackoffInfo*>(void_args);
+ args->channel_test->TestRetryBackoff(args->async,
args->short_connection,
+ args->fixed_backoff, false);
+ return NULL;
+ }
+
+ void TestRetryBackoff(bool async, bool short_connection, bool
fixed_backoff,
+ bool retry_backoff_in_pthread) {
+ ASSERT_EQ(0, StartAccept(_ep));
+
+ const int32_t backoff_time_ms = 100;
+ const int32_t no_backoff_remaining_rpc_time_ms = 100;
+ std::unique_ptr<brpc::RetryPolicy> retry_ptr;
+ if (fixed_backoff) {
+ retry_ptr.reset(
+ new brpc::RpcRetryPolicyWithFixedBackoff(backoff_time_ms,
+
no_backoff_remaining_rpc_time_ms,
+
retry_backoff_in_pthread));
+ } else {
+ retry_ptr.reset(
+ new
brpc::RpcRetryPolicyWithJitteredBackoff(backoff_time_ms,
+
backoff_time_ms + 20,
+
no_backoff_remaining_rpc_time_ms,
+
retry_backoff_in_pthread));
+ }
+
+ brpc::Channel channel;
+ brpc::ChannelOptions opt;
+ opt.timeout_ms = 1000;
+ opt.retry_policy = retry_ptr.get();
+ if (short_connection) {
+ opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
+ }
+ butil::TempFile server_list;
+ EXPECT_EQ(0, server_list.save_format(
+ "127.0.0.1:100\n"
+ "127.0.0.1:200\n"
+ "%s", endpoint2str(_ep).c_str()));
+ std::string naming_url = std::string("fIle://")
+ + server_list.fname();
+ EXPECT_EQ(0, channel.Init(naming_url.c_str(), "RR", &opt));
+
+ const int RETRY_NUM = 3;
+ test::EchoRequest req;
+ test::EchoResponse res;
+ brpc::Controller cntl;
+ req.set_message(__FUNCTION__);
+ cntl.set_max_retry(RETRY_NUM);
+ CallMethod(&channel, &cntl, &req, &res, async);
+ if (cntl.retried_count() > 0) {
+ EXPECT_GT(cntl.latency_us(), ((int64_t)backoff_time_ms * 1000) *
cntl.retried_count())
+ << "latency_us=" << cntl.latency_us() << " retried_count=" <<
cntl.retried_count()
+ << " enable_retry_backoff_in_pthread=" <<
retry_backoff_in_pthread;
+ }
+ EXPECT_EQ(0, cntl.ErrorCode()) << async << ", " << short_connection;
+ StopAndJoin();
+ }
+
butil::EndPoint _ep;
butil::TempFile _server_list;
std::string _naming_url;
@@ -1828,7 +1904,7 @@ class MyShared : public brpc::SharedObject {
public:
MyShared() { ++ nctor; }
MyShared(const MyShared&) : brpc::SharedObject() { ++ nctor; }
- ~MyShared() { ++ ndtor; }
+ ~MyShared() override { ++ ndtor; }
static int nctor;
static int ndtor;
@@ -2466,6 +2542,29 @@ TEST_F(ChannelTest, retry_other_servers) {
}
}
+TEST_F(ChannelTest, retry_backoff) {
+ for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
+ for (int k = 0; k <= 1; ++k) { // Flag ShortConnection
+ for (int l = 0; l <= 1; ++l) { // Flag FixedRetryBackoffPolicy or
JitteredRetryBackoffPolicy
+ for (int m = 0; m <= 1; ++m) { // Flag retry backoff in
bthread or pthread
+ if (m % 2 == 0) {
+ bthread_t th;
+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+ std::unique_ptr<TestRetryBackoffInfo>
test_retry_backoff(
+ new TestRetryBackoffInfo(this, j, k, l));
+ // Retry backoff in bthread.
+ bthread_start_background(&th, &attr,
TestRetryBackoffBthread, test_retry_backoff.get());
+ bthread_join(th, NULL);
+ } else {
+ // Retry backoff in pthread.
+ TestRetryBackoff(j, k, l, true);
+ }
+ }
+ }
+ }
+ }
+}
+
TEST_F(ChannelTest, multiple_threads_single_channel) {
srand(time(NULL));
ASSERT_EQ(0, StartAccept(_ep));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]