This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new bb284cf1 Support backup request policy (#2734) bb284cf1 is described below commit bb284cf1a2b1e1de7517a748be7240b272b131bc Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Thu Sep 26 10:46:01 2024 +0800 Support backup request policy (#2734) * Support backup request policy * Support Controller::set_backup_request_policy * Pass Controller to GetBackupRequestMs and update cn/client.md * Feedback call info * Avoid to block the timer thread in HandleSocketFailed --- docs/cn/client.md | 33 ++++++++-- src/brpc/backup_request_policy.h | 43 +++++++++++++ src/brpc/channel.cpp | 6 +- src/brpc/channel.h | 14 ++++- src/brpc/controller.cpp | 28 +++++++++ src/brpc/controller.h | 15 +++-- test/brpc_channel_unittest.cpp | 128 ++++++++++++++++++++++++++++++++++++++- 7 files changed, 254 insertions(+), 13 deletions(-) diff --git a/docs/cn/client.md b/docs/cn/client.md index 27f1fa70..0cf3dc75 100755 --- a/docs/cn/client.md +++ b/docs/cn/client.md @@ -584,10 +584,6 @@ r34717后Controller.has_backup_request()获知是否发送过backup_request。 如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果你需要在一定时间后发送另一个请求,使用backup request。 -工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。 - -ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启),Controller.set_backup_request_ms()可修改某次RPC的值。 - ### 没到超时 超时后RPC会尽快结束。 @@ -708,6 +704,35 @@ options.retry_policy = &g_my_retry_policy; - [brpc::RpcRetryPolicyWithFixedBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(固定时间间隔退策略)和[brpc::RpcRetryPolicyWithJitteredBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(随机时间间隔退策略)继承了[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h),使用框架默认的DoRetry。 - 在pthread中进行重试退避(实际上通过bthread_usleep实现)会阻塞pthread,所以默认不会在pthread上进行重试退避。 +### backup request + +工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。 + +ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启)。Controller.set_backup_request_ms()可修改某次RPC的值。 + +用户可以通过继承[brpc::BackupRequestPolicy](https://github.com/apache/brpc/blob/master/src/brpc/backup_request_policy.h)自定义策略(backup_request_ms和熔断backup request)。 比如根据延时调节backup_request_ms或者根据错误率熔断部分backup request。 + +ChannelOptions.backup_request_policy同样影响该Channel上所有RPC。Controller.set_backup_request_policy()可修改某次RPC的策略。backup_request_policy优先级高于backup_request_ms。 + +brpc::BackupRequestPolicy接口如下: + +```c++ +class BackupRequestPolicy { +public: + virtual ~BackupRequestPolicy() = default; + + // Return the time in milliseconds in which another request + // will be sent if RPC does not finish. + virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0; + + // Return true if the backup request should be sent. + virtual bool DoBackup(const Controller* controller) const = 0; + + // Called when a rpc is end, user can collect call information to adjust policy. + virtual void OnRPCEnd(const Controller* controller) = 0; +}; +``` + ### 重试应当保守 由于成本的限制,大部分线上server的冗余度是有限的,主要是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。默认的重试是比较安全的: 只要连接不断RPC就不会重试,一般不会产生大量的重试请求。用户可以通过RetryPolicy定制重试策略,但也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。 diff --git a/src/brpc/backup_request_policy.h b/src/brpc/backup_request_policy.h new file mode 100644 index 00000000..ea254f1d --- /dev/null +++ b/src/brpc/backup_request_policy.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef BRPC_BACKUP_REQUEST_POLICY_H +#define BRPC_BACKUP_REQUEST_POLICY_H + +#include "brpc/controller.h" + +namespace brpc { + +class BackupRequestPolicy { +public: + virtual ~BackupRequestPolicy() = default; + + // Return the time in milliseconds in which another request + // will be sent if RPC does not finish. + virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0; + + // Return true if the backup request should be sent. + virtual bool DoBackup(const Controller* controller) const = 0; + + // Called when a rpc is end, user can collect call information to adjust policy. + virtual void OnRPCEnd(const Controller* controller) = 0; +}; + +} + +#endif // BRPC_BACKUP_REQUEST_POLICY_H diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 5fc66096..c15611ea 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -55,6 +55,7 @@ ChannelOptions::ChannelOptions() , log_succeed_without_server(true) , use_rdma(false) , auth(NULL) + , backup_request_policy(NULL) , retry_policy(NULL) , ns_filter(NULL) {} @@ -495,8 +496,10 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, // overriding connect_timeout_ms does not make sense, just use the // one in ChannelOptions cntl->_connect_timeout_ms = _options.connect_timeout_ms; - if (cntl->backup_request_ms() == UNSET_MAGIC_NUM) { + if (cntl->backup_request_ms() == UNSET_MAGIC_NUM && + NULL == cntl->_backup_request_policy) { cntl->set_backup_request_ms(_options.backup_request_ms); + cntl->_backup_request_policy = _options.backup_request_policy; } if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) { cntl->set_connection_type(_options.connection_type); @@ -536,6 +539,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, // Currently we cannot handle retry and backup request correctly cntl->set_max_retry(0); cntl->set_backup_request_ms(-1); + cntl->_backup_request_policy = NULL; } if (cntl->backup_request_ms() >= 0 && diff --git a/src/brpc/channel.h b/src/brpc/channel.h index 2ed91a64..ef3c15e7 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -34,6 +34,7 @@ #include "brpc/controller.h" // brpc::Controller #include "brpc/details/profiler_linker.h" #include "brpc/retry_policy.h" +#include "brpc/backup_request_policy.h" #include "brpc/naming_service_filter.h" namespace brpc { @@ -55,11 +56,12 @@ struct ChannelOptions { int32_t timeout_ms; // Send another request if RPC does not finish after so many milliseconds. - // Overridable by Controller.set_backup_request_ms(). + // Overridable by Controller.set_backup_request_ms() or + // Controller.set_backup_request_policy(). // The request will be sent to a different server by best effort. // If timeout_ms is set and backup_request_ms >= timeout_ms, backup request // will never be sent. - // backup request does NOT imply server-side cancelation. + // backup request does NOT imply server-side cancellation. // Default: -1 (disabled) // Maximum: 0x7fffffff (roughly 30 days) int32_t backup_request_ms; @@ -112,6 +114,14 @@ struct ChannelOptions { // Default: NULL const Authenticator* auth; + // Customize the backup request time and whether to send backup request. + // Priority: `backup_request_policy' > `backup_request_ms'. + // Overridable by Controller.set_backup_request_ms() or + // Controller.set_backup_request_policy(). + // This object is NOT owned by channel and should remain valid when channel is used. + // Default: NULL + BackupRequestPolicy* backup_request_policy; + // Customize the error code that should be retried. The interface is // defined in src/brpc/retry_policy.h // This object is NOT owned by channel and should remain valid when diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 98e25ae2..afebb3c2 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -258,6 +258,7 @@ void Controller::ResetPods() { _connection_type = CONNECTION_TYPE_UNKNOWN; _timeout_ms = UNSET_MAGIC_NUM; _backup_request_ms = UNSET_MAGIC_NUM; + _backup_request_policy = NULL; _connect_timeout_ms = UNSET_MAGIC_NUM; _real_timeout_ms = UNSET_MAGIC_NUM; _deadline_us = -1; @@ -344,6 +345,16 @@ void Controller::set_backup_request_ms(int64_t timeout_ms) { } } +int64_t Controller::backup_request_ms() const { + int timeout_ms = NULL != _backup_request_policy ? + _backup_request_policy->GetBackupRequestMs(this) : _backup_request_ms; + if (timeout_ms > 0x7fffffff) { + timeout_ms = 0x7fffffff; + LOG(WARNING) << "backup_request_ms is limited to 0x7fffffff (roughly 24 days)"; + } + return timeout_ms; +} + void Controller::set_max_retry(int max_retry) { if (max_retry > MAX_RETRY_COUNT) { LOG(WARNING) << "Retry count can't be larger than " @@ -606,6 +617,13 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, goto END_OF_RPC; } if (_error_code == EBACKUPREQUEST) { + if (NULL != _backup_request_policy && !_backup_request_policy->DoBackup(this)) { + // No need to do backup request. + _error_code = saved_error; + CHECK_EQ(0, bthread_id_unlock(info.id)); + return; + } + // Reset timeout if needed int rc = 0; if (timeout_ms() >= 0) { @@ -969,6 +987,14 @@ void Controller::EndRPC(const CompletionInfo& info) { CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid)); } } + +void Controller::OnRPCEnd(int64_t end_time_us) { + _end_time_us = end_time_us; + if (NULL != _backup_request_policy) { + _backup_request_policy->OnRPCEnd(this); + } +} + void Controller::RunDoneInBackupThread(void* arg) { static_cast<Controller*>(arg)->DoneInBackupThread(); } @@ -1313,6 +1339,7 @@ CallId Controller::call_id() { void Controller::SaveClientSettings(ClientSettings* s) const { s->timeout_ms = _timeout_ms; s->backup_request_ms = _backup_request_ms; + s->backup_request_policy = _backup_request_policy; s->max_retry = _max_retry; s->tos = _tos; s->connection_type = _connection_type; @@ -1325,6 +1352,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const { void Controller::ApplyClientSettings(const ClientSettings& s) { set_timeout_ms(s.timeout_ms); set_backup_request_ms(s.backup_request_ms); + set_backup_request_policy(s.backup_request_policy); set_max_retry(s.max_retry); set_type_of_service(s.tos); set_connection_type(s.connection_type); diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 5b2132b4..9b3c0201 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -71,6 +71,7 @@ class RPCSender; class StreamSettings; class MongoContext; class RetryPolicy; +class BackupRequestPolicy; class InputMessageBase; class ThriftStub; namespace policy { @@ -180,7 +181,10 @@ public: // Set/get the delay to send backup request in milliseconds. Use // ChannelOptions.backup_request_ms on unset. void set_backup_request_ms(int64_t timeout_ms); - int64_t backup_request_ms() const { return _backup_request_ms; } + void set_backup_request_policy(BackupRequestPolicy* policy) { + _backup_request_policy = policy; + } + int64_t backup_request_ms() const; // Set/get maximum times of retrying. Use ChannelOptions.max_retry on unset. // <=0 means no retry. @@ -670,7 +674,8 @@ private: struct ClientSettings { int32_t timeout_ms; int32_t backup_request_ms; - int max_retry; + BackupRequestPolicy* backup_request_policy; + int max_retry; int32_t tos; ConnectionType connection_type; CompressType request_compress_type; @@ -737,9 +742,7 @@ private: _end_time_us = begin_time_us; } - void OnRPCEnd(int64_t end_time_us) { - _end_time_us = end_time_us; - } + void OnRPCEnd(int64_t end_time_us); static void RunDoneInBackupThread(void*); void DoneInBackupThread(); @@ -800,6 +803,8 @@ private: int32_t _timeout_ms; int32_t _connect_timeout_ms; int32_t _backup_request_ms; + // Priority: `_backup_request_policy' > `_backup_request_ms'. + BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call int64_t _real_timeout_ms; // Deadline of this RPC (since the Epoch in microseconds). diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 6c189f18..8814b0bc 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -309,7 +309,8 @@ protected: bool single_server, bool short_connection, const brpc::Authenticator* auth = NULL, - std::string connection_group = std::string()) { + std::string connection_group = std::string(), + bool use_backup_request_policy = false) { brpc::ChannelOptions opt; if (short_connection) { opt.connection_type = brpc::CONNECTION_TYPE_SHORT; @@ -317,6 +318,9 @@ protected: opt.auth = auth; opt.max_retry = 0; opt.connection_group = connection_group; + if (use_backup_request_policy) { + opt.backup_request_policy = &_backup_request_policy; + } if (single_server) { EXPECT_EQ(0, channel->Init(_ep, &opt)); } else { @@ -1917,6 +1921,107 @@ protected: StopAndJoin(); } + void TestBackupRequest(bool single_server, bool async, + bool short_connection) { + std::cout << " *** single=" << single_server + << " async=" << async + << " short=" << short_connection << std::endl; + + ASSERT_EQ(0, StartAccept(_ep)); + brpc::Channel channel; + SetUpChannel(&channel, single_server, short_connection); + + const int RETRY_NUM = 1; + test::EchoRequest req; + test::EchoResponse res; + brpc::Controller cntl; + req.set_message(__FUNCTION__); + + cntl.set_max_retry(RETRY_NUM); + cntl.set_backup_request_ms(10); // 10ms + cntl.set_timeout_ms(100); // 10ms + req.set_sleep_us(50000); // 100ms + CallMethod(&channel, &cntl, &req, &res, async); + ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); + ASSERT_TRUE(cntl.has_backup_request()); + ASSERT_EQ(RETRY_NUM, cntl.retried_count()); + bthread_usleep(70000); // wait for the sleep task to finish + + if (short_connection) { + // Sleep to let `_messenger' detect `Socket' being `SetFailed' + const int64_t start_time = butil::gettimeofday_us(); + while (_messenger.ConnectionCount() != 0) { + EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/); + bthread_usleep(1000); + } + } else { + EXPECT_GE(1ul, _messenger.ConnectionCount()); + } + StopAndJoin(); + } + + class BackupRequestPolicyImpl : public brpc::BackupRequestPolicy { + public: + int32_t GetBackupRequestMs(const brpc::Controller*) const override { + return 10; + } + + // Return true if the backup request should be sent. + bool DoBackup(const brpc::Controller*) const override { + return backup; + } + + void OnRPCEnd(const brpc::Controller*) override {} + + bool backup{true}; + + }; + + void TestBackupRequestPolicy(bool single_server, bool async, + bool short_connection) { + ASSERT_EQ(0, StartAccept(_ep)); + for (int i = 0; i < 2; ++i) { + bool backup = i == 0; + std::cout << " *** single=" << single_server + << " async=" << async + << " short=" << short_connection + << " backup=" << backup + << std::endl; + + brpc::Channel channel; + SetUpChannel(&channel, single_server, short_connection, NULL, "", true); + + const int RETRY_NUM = 1; + test::EchoRequest req; + test::EchoResponse res; + brpc::Controller cntl; + req.set_message(__FUNCTION__); + + _backup_request_policy.backup = backup; + cntl.set_max_retry(RETRY_NUM); + cntl.set_timeout_ms(100); // 100ms + req.set_sleep_us(50000); // 50ms + CallMethod(&channel, &cntl, &req, &res, async); + ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); + ASSERT_EQ(backup, cntl.has_backup_request()); + ASSERT_EQ(backup ? RETRY_NUM : 0, cntl.retried_count()); + bthread_usleep(70000); // wait for the sleep task to finish + + if (short_connection) { + // Sleep to let `_messenger' detect `Socket' being `SetFailed' + const int64_t start_time = butil::gettimeofday_us(); + while (_messenger.ConnectionCount() != 0) { + ASSERT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/); + bthread_usleep(1000); + } + } else { + ASSERT_GE(1ul, _messenger.ConnectionCount()); + } + } + + StopAndJoin(); + } + butil::EndPoint _ep; butil::TempFile _server_list; std::string _naming_url; @@ -1929,6 +2034,7 @@ protected: bool _close_fd_once; MyEchoService _svc; + BackupRequestPolicyImpl _backup_request_policy; }; class MyShared : public brpc::SharedObject { @@ -2596,6 +2702,26 @@ TEST_F(ChannelTest, retry_backoff) { } } +TEST_F(ChannelTest, backup_request) { + for (int i = 0; i <= 1; ++i) { // Flag SingleServer + for (int j = 0; j <= 1; ++j) { // Flag Asynchronous + for (int k = 0; k <= 1; ++k) { // Flag ShortConnection + TestBackupRequest(i, j, k); + } + } + } +} + +TEST_F(ChannelTest, backup_request_policy) { + for (int i = 0; i <= 1; ++i) { // Flag SingleServer + for (int j = 0; j <= 1; ++j) { // Flag Asynchronous + for (int k = 0; k <= 1; ++k) { // Flag ShortConnection + TestBackupRequestPolicy(i, j, k); + } + } + } +} + TEST_F(ChannelTest, multiple_threads_single_channel) { srand(time(NULL)); ASSERT_EQ(0, StartAccept(_ep)); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org