This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new bb284cf1 Support backup request policy (#2734)
bb284cf1 is described below

commit bb284cf1a2b1e1de7517a748be7240b272b131bc
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Thu Sep 26 10:46:01 2024 +0800

    Support backup request policy (#2734)
    
    * Support backup request policy
    
    * Support Controller::set_backup_request_policy
    
    * Pass Controller to GetBackupRequestMs and update cn/client.md
    
    * Feedback call info
    
    * Avoid to block the timer thread in HandleSocketFailed
---
 docs/cn/client.md                |  33 ++++++++--
 src/brpc/backup_request_policy.h |  43 +++++++++++++
 src/brpc/channel.cpp             |   6 +-
 src/brpc/channel.h               |  14 ++++-
 src/brpc/controller.cpp          |  28 +++++++++
 src/brpc/controller.h            |  15 +++--
 test/brpc_channel_unittest.cpp   | 128 ++++++++++++++++++++++++++++++++++++++-
 7 files changed, 254 insertions(+), 13 deletions(-)

diff --git a/docs/cn/client.md b/docs/cn/client.md
index 27f1fa70..0cf3dc75 100755
--- a/docs/cn/client.md
+++ b/docs/cn/client.md
@@ -584,10 +584,6 @@ 
r34717后Controller.has_backup_request()获知是否发送过backup_request。
 
 如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果你需要在一定时间后发送另一个请求,使用backup request。
 
-工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup
 request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。
-
-ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启),Controller.set_backup_request_ms()可修改某次RPC的值。
-
 ### 没到超时
 
 超时后RPC会尽快结束。
@@ -708,6 +704,35 @@ options.retry_policy = &g_my_retry_policy;
 - 
[brpc::RpcRetryPolicyWithFixedBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(固定时间间隔退策略)和[brpc::RpcRetryPolicyWithJitteredBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(随机时间间隔退策略)继承了[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h),使用框架默认的DoRetry。
 - 在pthread中进行重试退避(实际上通过bthread_usleep实现)会阻塞pthread,所以默认不会在pthread上进行重试退避。
 
+### backup request
+
+工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup
 request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。
+
+ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启)。Controller.set_backup_request_ms()可修改某次RPC的值。
+
+用户可以通过继承[brpc::BackupRequestPolicy](https://github.com/apache/brpc/blob/master/src/brpc/backup_request_policy.h)自定义策略(backup_request_ms和熔断backup
 request)。 比如根据延时调节backup_request_ms或者根据错误率熔断部分backup request。
+
+ChannelOptions.backup_request_policy同样影响该Channel上所有RPC。Controller.set_backup_request_policy()可修改某次RPC的策略。backup_request_policy优先级高于backup_request_ms。
+
+brpc::BackupRequestPolicy接口如下:
+
+```c++
+class BackupRequestPolicy {
+public:
+    virtual ~BackupRequestPolicy() = default;
+
+    // Return the time in milliseconds in which another request
+    // will be sent if RPC does not finish.
+    virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0;
+
+    // Return true if the backup request should be sent.
+    virtual bool DoBackup(const Controller* controller) const = 0;
+    
+    // Called  when a rpc is end, user can collect call information to adjust 
policy.
+    virtual void OnRPCEnd(const Controller* controller) = 0;
+};
+```
+
 ### 重试应当保守
 
 
由于成本的限制,大部分线上server的冗余度是有限的,主要是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。默认的重试是比较安全的:
 
只要连接不断RPC就不会重试,一般不会产生大量的重试请求。用户可以通过RetryPolicy定制重试策略,但也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。
diff --git a/src/brpc/backup_request_policy.h b/src/brpc/backup_request_policy.h
new file mode 100644
index 00000000..ea254f1d
--- /dev/null
+++ b/src/brpc/backup_request_policy.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_BACKUP_REQUEST_POLICY_H
+#define BRPC_BACKUP_REQUEST_POLICY_H
+
+#include "brpc/controller.h"
+
+namespace brpc {
+
+class BackupRequestPolicy {
+public:
+    virtual ~BackupRequestPolicy() = default;
+
+    // Return the time in milliseconds in which another request
+    // will be sent if RPC does not finish.
+    virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0;
+
+    // Return true if the backup request should be sent.
+    virtual bool DoBackup(const Controller* controller) const = 0;
+
+    // Called  when a rpc is end, user can collect call information to adjust 
policy.
+    virtual void OnRPCEnd(const Controller* controller) = 0;
+};
+
+}
+
+#endif // BRPC_BACKUP_REQUEST_POLICY_H
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index 5fc66096..c15611ea 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -55,6 +55,7 @@ ChannelOptions::ChannelOptions()
     , log_succeed_without_server(true)
     , use_rdma(false)
     , auth(NULL)
+    , backup_request_policy(NULL)
     , retry_policy(NULL)
     , ns_filter(NULL)
 {}
@@ -495,8 +496,10 @@ void Channel::CallMethod(const 
google::protobuf::MethodDescriptor* method,
     // overriding connect_timeout_ms does not make sense, just use the
     // one in ChannelOptions
     cntl->_connect_timeout_ms = _options.connect_timeout_ms;
-    if (cntl->backup_request_ms() == UNSET_MAGIC_NUM) {
+    if (cntl->backup_request_ms() == UNSET_MAGIC_NUM &&
+        NULL == cntl->_backup_request_policy) {
         cntl->set_backup_request_ms(_options.backup_request_ms);
+        cntl->_backup_request_policy = _options.backup_request_policy;
     }
     if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) {
         cntl->set_connection_type(_options.connection_type);
@@ -536,6 +539,7 @@ void Channel::CallMethod(const 
google::protobuf::MethodDescriptor* method,
         // Currently we cannot handle retry and backup request correctly
         cntl->set_max_retry(0);
         cntl->set_backup_request_ms(-1);
+        cntl->_backup_request_policy = NULL;
     }
 
     if (cntl->backup_request_ms() >= 0 &&
diff --git a/src/brpc/channel.h b/src/brpc/channel.h
index 2ed91a64..ef3c15e7 100644
--- a/src/brpc/channel.h
+++ b/src/brpc/channel.h
@@ -34,6 +34,7 @@
 #include "brpc/controller.h"                // brpc::Controller
 #include "brpc/details/profiler_linker.h"
 #include "brpc/retry_policy.h"
+#include "brpc/backup_request_policy.h"
 #include "brpc/naming_service_filter.h"
 
 namespace brpc {
@@ -55,11 +56,12 @@ struct ChannelOptions {
     int32_t timeout_ms;
 
     // Send another request if RPC does not finish after so many milliseconds.
-    // Overridable by Controller.set_backup_request_ms().
+    // Overridable by Controller.set_backup_request_ms() or
+    // Controller.set_backup_request_policy().
     // The request will be sent to a different server by best effort.
     // If timeout_ms is set and backup_request_ms >= timeout_ms, backup request
     // will never be sent.
-    // backup request does NOT imply server-side cancelation.
+    // backup request does NOT imply server-side cancellation.
     // Default: -1 (disabled)
     // Maximum: 0x7fffffff (roughly 30 days)
     int32_t backup_request_ms;
@@ -112,6 +114,14 @@ struct ChannelOptions {
     // Default: NULL
     const Authenticator* auth;
 
+    // Customize the backup request time and whether to send backup request.
+    // Priority: `backup_request_policy' > `backup_request_ms'.
+    // Overridable by Controller.set_backup_request_ms() or
+    // Controller.set_backup_request_policy().
+    // This object is NOT owned by channel and should remain valid when 
channel is used.
+    // Default: NULL
+    BackupRequestPolicy* backup_request_policy;
+
     // Customize the error code that should be retried. The interface is
     // defined in src/brpc/retry_policy.h
     // This object is NOT owned by channel and should remain valid when
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 98e25ae2..afebb3c2 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -258,6 +258,7 @@ void Controller::ResetPods() {
     _connection_type = CONNECTION_TYPE_UNKNOWN;
     _timeout_ms = UNSET_MAGIC_NUM;
     _backup_request_ms = UNSET_MAGIC_NUM;
+    _backup_request_policy = NULL;
     _connect_timeout_ms = UNSET_MAGIC_NUM;
     _real_timeout_ms = UNSET_MAGIC_NUM;
     _deadline_us = -1;
@@ -344,6 +345,16 @@ void Controller::set_backup_request_ms(int64_t timeout_ms) 
{
     }
 }
 
+int64_t Controller::backup_request_ms() const {
+    int timeout_ms = NULL != _backup_request_policy ?
+        _backup_request_policy->GetBackupRequestMs(this) : _backup_request_ms;
+    if (timeout_ms > 0x7fffffff) {
+        timeout_ms = 0x7fffffff;
+        LOG(WARNING) << "backup_request_ms is limited to 0x7fffffff (roughly 
24 days)";
+    }
+    return timeout_ms;
+}
+
 void Controller::set_max_retry(int max_retry) {
     if (max_retry > MAX_RETRY_COUNT) {
         LOG(WARNING) << "Retry count can't be larger than "
@@ -606,6 +617,13 @@ void Controller::OnVersionedRPCReturned(const 
CompletionInfo& info,
         goto END_OF_RPC;
     }
     if (_error_code == EBACKUPREQUEST) {
+        if (NULL != _backup_request_policy && 
!_backup_request_policy->DoBackup(this)) {
+            // No need to do backup request.
+            _error_code = saved_error;
+            CHECK_EQ(0, bthread_id_unlock(info.id));
+            return;
+        }
+
         // Reset timeout if needed
         int rc = 0;
         if (timeout_ms() >= 0) {
@@ -969,6 +987,14 @@ void Controller::EndRPC(const CompletionInfo& info) {
         CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid));
     }
 }
+
+void Controller::OnRPCEnd(int64_t end_time_us) {
+    _end_time_us = end_time_us;
+    if (NULL != _backup_request_policy) {
+        _backup_request_policy->OnRPCEnd(this);
+    }
+}
+
 void Controller::RunDoneInBackupThread(void* arg) {
     static_cast<Controller*>(arg)->DoneInBackupThread();
 }
@@ -1313,6 +1339,7 @@ CallId Controller::call_id() {
 void Controller::SaveClientSettings(ClientSettings* s) const {
     s->timeout_ms = _timeout_ms;
     s->backup_request_ms = _backup_request_ms;
+    s->backup_request_policy = _backup_request_policy;
     s->max_retry = _max_retry;
     s->tos = _tos;
     s->connection_type = _connection_type;
@@ -1325,6 +1352,7 @@ void Controller::SaveClientSettings(ClientSettings* s) 
const {
 void Controller::ApplyClientSettings(const ClientSettings& s) {
     set_timeout_ms(s.timeout_ms);
     set_backup_request_ms(s.backup_request_ms);
+    set_backup_request_policy(s.backup_request_policy);
     set_max_retry(s.max_retry);
     set_type_of_service(s.tos);
     set_connection_type(s.connection_type);
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 5b2132b4..9b3c0201 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -71,6 +71,7 @@ class RPCSender;
 class StreamSettings;
 class MongoContext;
 class RetryPolicy;
+class BackupRequestPolicy;
 class InputMessageBase;
 class ThriftStub;
 namespace policy {
@@ -180,7 +181,10 @@ public:
     // Set/get the delay to send backup request in milliseconds. Use
     // ChannelOptions.backup_request_ms on unset.
     void set_backup_request_ms(int64_t timeout_ms);
-    int64_t backup_request_ms() const { return _backup_request_ms; }
+    void set_backup_request_policy(BackupRequestPolicy* policy) {
+        _backup_request_policy = policy;
+    }
+    int64_t backup_request_ms() const;
 
     // Set/get maximum times of retrying. Use ChannelOptions.max_retry on 
unset.
     // <=0 means no retry.
@@ -670,7 +674,8 @@ private:
     struct ClientSettings {
         int32_t timeout_ms;
         int32_t backup_request_ms;
-        int max_retry;                      
+        BackupRequestPolicy* backup_request_policy;
+        int max_retry;
         int32_t tos;
         ConnectionType connection_type;         
         CompressType request_compress_type;
@@ -737,9 +742,7 @@ private:
         _end_time_us = begin_time_us;
     }
 
-    void OnRPCEnd(int64_t end_time_us) {
-        _end_time_us = end_time_us;
-    }
+    void OnRPCEnd(int64_t end_time_us);
 
     static void RunDoneInBackupThread(void*);
     void DoneInBackupThread();
@@ -800,6 +803,8 @@ private:
     int32_t _timeout_ms;
     int32_t _connect_timeout_ms;
     int32_t _backup_request_ms;
+    // Priority: `_backup_request_policy' > `_backup_request_ms'.
+    BackupRequestPolicy* _backup_request_policy;
     // If this rpc call has retry/backup request,this var save the real 
timeout for current call
     int64_t _real_timeout_ms;
     // Deadline of this RPC (since the Epoch in microseconds).
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 6c189f18..8814b0bc 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -309,7 +309,8 @@ protected:
                       bool single_server,
                       bool short_connection,
                       const brpc::Authenticator* auth = NULL,
-                      std::string connection_group = std::string()) {
+                      std::string connection_group = std::string(),
+                      bool use_backup_request_policy = false) {
         brpc::ChannelOptions opt;
         if (short_connection) {
             opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
@@ -317,6 +318,9 @@ protected:
         opt.auth = auth;
         opt.max_retry = 0;
         opt.connection_group = connection_group;
+        if (use_backup_request_policy) {
+            opt.backup_request_policy = &_backup_request_policy;
+        }
         if (single_server) {
             EXPECT_EQ(0, channel->Init(_ep, &opt)); 
         } else {                                                 
@@ -1917,6 +1921,107 @@ protected:
         StopAndJoin();
     }
 
+    void TestBackupRequest(bool single_server, bool async,
+                           bool short_connection) {
+        std::cout << " *** single=" << single_server
+                  << " async=" << async
+                  << " short=" << short_connection << std::endl;
+
+        ASSERT_EQ(0, StartAccept(_ep));
+        brpc::Channel channel;
+        SetUpChannel(&channel, single_server, short_connection);
+
+        const int RETRY_NUM = 1;
+        test::EchoRequest req;
+        test::EchoResponse res;
+        brpc::Controller cntl;
+        req.set_message(__FUNCTION__);
+
+        cntl.set_max_retry(RETRY_NUM);
+        cntl.set_backup_request_ms(10);  // 10ms
+        cntl.set_timeout_ms(100);  // 10ms
+        req.set_sleep_us(50000); // 100ms
+        CallMethod(&channel, &cntl, &req, &res, async);
+        ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
+        ASSERT_TRUE(cntl.has_backup_request());
+        ASSERT_EQ(RETRY_NUM, cntl.retried_count());
+        bthread_usleep(70000);  // wait for the sleep task to finish
+
+        if (short_connection) {
+            // Sleep to let `_messenger' detect `Socket' being `SetFailed'
+            const int64_t start_time = butil::gettimeofday_us();
+            while (_messenger.ConnectionCount() != 0) {
+                EXPECT_LT(butil::gettimeofday_us(), start_time + 
100000L/*100ms*/);
+                bthread_usleep(1000);
+            }
+        } else {
+            EXPECT_GE(1ul, _messenger.ConnectionCount());
+        }
+        StopAndJoin();
+    }
+
+    class BackupRequestPolicyImpl : public brpc::BackupRequestPolicy {
+    public:
+        int32_t GetBackupRequestMs(const brpc::Controller*) const override {
+            return 10;
+        }
+
+        // Return true if the backup request should be sent.
+        bool DoBackup(const brpc::Controller*) const override {
+            return backup;
+        }
+
+        void OnRPCEnd(const brpc::Controller*) override {}
+
+        bool backup{true};
+
+    };
+
+    void TestBackupRequestPolicy(bool single_server, bool async,
+                                 bool short_connection) {
+        ASSERT_EQ(0, StartAccept(_ep));
+        for (int i = 0; i < 2; ++i) {
+            bool backup = i == 0;
+            std::cout << " *** single=" << single_server
+                      << " async=" << async
+                      << " short=" << short_connection
+                      << " backup=" << backup
+                      << std::endl;
+
+            brpc::Channel channel;
+            SetUpChannel(&channel, single_server, short_connection, NULL, "", 
true);
+
+            const int RETRY_NUM = 1;
+            test::EchoRequest req;
+            test::EchoResponse res;
+            brpc::Controller cntl;
+            req.set_message(__FUNCTION__);
+
+            _backup_request_policy.backup = backup;
+            cntl.set_max_retry(RETRY_NUM);
+            cntl.set_timeout_ms(100);  // 100ms
+            req.set_sleep_us(50000); // 50ms
+            CallMethod(&channel, &cntl, &req, &res, async);
+            ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
+            ASSERT_EQ(backup, cntl.has_backup_request());
+            ASSERT_EQ(backup ? RETRY_NUM : 0, cntl.retried_count());
+            bthread_usleep(70000);  // wait for the sleep task to finish
+
+            if (short_connection) {
+                // Sleep to let `_messenger' detect `Socket' being `SetFailed'
+                const int64_t start_time = butil::gettimeofday_us();
+                while (_messenger.ConnectionCount() != 0) {
+                    ASSERT_LT(butil::gettimeofday_us(), start_time + 
100000L/*100ms*/);
+                    bthread_usleep(1000);
+                }
+            } else {
+                ASSERT_GE(1ul, _messenger.ConnectionCount());
+            }
+        }
+
+        StopAndJoin();
+    }
+
     butil::EndPoint _ep;
     butil::TempFile _server_list;                                        
     std::string _naming_url;
@@ -1929,6 +2034,7 @@ protected:
     bool _close_fd_once;
     
     MyEchoService _svc;
+    BackupRequestPolicyImpl _backup_request_policy;
 };
 
 class MyShared : public brpc::SharedObject {
@@ -2596,6 +2702,26 @@ TEST_F(ChannelTest, retry_backoff) {
     }
 }
 
+TEST_F(ChannelTest, backup_request) {
+    for (int i = 0; i <= 1; ++i) { // Flag SingleServer
+        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
+            for (int k = 0; k <= 1; ++k) { // Flag ShortConnection
+                TestBackupRequest(i, j, k);
+            }
+        }
+    }
+}
+
+TEST_F(ChannelTest, backup_request_policy) {
+    for (int i = 0; i <= 1; ++i) { // Flag SingleServer
+        for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
+            for (int k = 0; k <= 1; ++k) { // Flag ShortConnection
+                TestBackupRequestPolicy(i, j, k);
+            }
+        }
+    }
+}
+
 TEST_F(ChannelTest, multiple_threads_single_channel) {
     srand(time(NULL));
     ASSERT_EQ(0, StartAccept(_ep));


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to