This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new e1a88638 send response之后,request/response对象析构之前执行一些自定义逻辑 (#2328)
e1a88638 is described below

commit e1a88638fb5028262396d0e6d687e66c5abd4da2
Author: youcheng huang <[email protected]>
AuthorDate: Wed Oct 25 15:01:45 2023 +0800

    send response之后,request/response对象析构之前执行一些自定义逻辑 (#2328)
    
    * call_after_rpc_resp
    
    * fix function name & add examples & add ut
    
    * fix mistake
    
    * fix compile error
    
    * fix compile error
    
    * update ut
    
    * complete ut
    
    * update ut
    
    * modify function name
    
    ---------
    
    Co-authored-by: yuncheng <[email protected]>
---
 example/asynchronous_echo_c++/server.cpp | 20 +++++++++++++++-
 example/echo_c++/server.cpp              | 19 +++++++++++++++
 example/http_c++/http_server.cpp         | 21 ++++++++++++++++-
 src/brpc/controller.cpp                  |  8 +++++++
 src/brpc/controller.h                    | 11 +++++++++
 src/brpc/policy/baidu_rpc_protocol.cpp   |  8 +++++--
 src/brpc/policy/http_rpc_protocol.cpp    |  6 ++++-
 test/brpc_channel_unittest.cpp           | 40 +++++++++++++++++++++++++++++---
 8 files changed, 125 insertions(+), 8 deletions(-)

diff --git a/example/asynchronous_echo_c++/server.cpp 
b/example/asynchronous_echo_c++/server.cpp
index d95d9dfe..8c7ced67 100644
--- a/example/asynchronous_echo_c++/server.cpp
+++ b/example/asynchronous_echo_c++/server.cpp
@@ -39,10 +39,15 @@ public:
         // This object helps you to call done->Run() in RAII style. If you need
         // to process the request asynchronously, pass done_guard.release().
         brpc::ClosureGuard done_guard(done);
-        
+
         brpc::Controller* cntl =
             static_cast<brpc::Controller*>(cntl_base);
 
+        // optional: set a callback function which is called after response is 
sent
+        // and before cntl/req/res is destructed.
+        cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
+            std::placeholders::_1, std::placeholders::_2, 
std::placeholders::_3));
+
         // The purpose of following logs is to help you to understand
         // how clients interact with servers more intuitively. You should 
         // remove these logs in performance-sensitive servers.
@@ -64,6 +69,19 @@ public:
             cntl->response_attachment().append("bar");
         }
     }
+
+    // optional
+    static void CallAfterRpc(brpc::Controller* cntl,
+                        const google::protobuf::Message* req,
+                        const google::protobuf::Message* res) {
+        // at this time res is already sent to client, but cntl/req/res is not 
destructed
+        std::string req_str;
+        std::string res_str;
+        json2pb::ProtoMessageToJson(*req, &req_str, NULL);
+        json2pb::ProtoMessageToJson(*res, &res_str, NULL);
+        LOG(INFO) << "req:" << req_str
+                    << " res:" << res_str;
+    }
 };
 
 int main(int argc, char* argv[]) {
diff --git a/example/echo_c++/server.cpp b/example/echo_c++/server.cpp
index 08b3e9d6..d1f06051 100644
--- a/example/echo_c++/server.cpp
+++ b/example/echo_c++/server.cpp
@@ -20,6 +20,7 @@
 #include <gflags/gflags.h>
 #include <butil/logging.h>
 #include <brpc/server.h>
+#include <json2pb/pb_to_json.h>
 #include "echo.pb.h"
 
 DEFINE_bool(echo_attachment, true, "Echo attachment as well");
@@ -48,6 +49,11 @@ public:
         brpc::Controller* cntl =
             static_cast<brpc::Controller*>(cntl_base);
 
+        // optional: set a callback function which is called after response is 
sent
+        // and before cntl/req/res is destructed.
+        cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
+            std::placeholders::_1, std::placeholders::_2, 
std::placeholders::_3));
+
         // The purpose of following logs is to help you to understand
         // how clients interact with servers more intuitively. You should 
         // remove these logs in performance-sensitive servers.
@@ -70,6 +76,19 @@ public:
             cntl->response_attachment().append(cntl->request_attachment());
         }
     }
+
+    // optional
+    static void CallAfterRpc(brpc::Controller* cntl,
+                        const google::protobuf::Message* req,
+                        const google::protobuf::Message* res) {
+        // at this time res is already sent to client, but cntl/req/res is not 
destructed
+        std::string req_str;
+        std::string res_str;
+        json2pb::ProtoMessageToJson(*req, &req_str, NULL);
+        json2pb::ProtoMessageToJson(*res, &res_str, NULL);
+        LOG(INFO) << "req:" << req_str
+                    << " res:" << res_str;
+    }
 };
 }  // namespace example
 
diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp
index 9a1595b9..202f55d1 100644
--- a/example/http_c++/http_server.cpp
+++ b/example/http_c++/http_server.cpp
@@ -45,9 +45,15 @@ public:
         // This object helps you to call done->Run() in RAII style. If you need
         // to process the request asynchronously, pass done_guard.release().
         brpc::ClosureGuard done_guard(done);
-        
+
         brpc::Controller* cntl =
             static_cast<brpc::Controller*>(cntl_base);
+
+        // optional: set a callback function which is called after response is 
sent
+        // and before cntl/req/res is destructed.
+        cntl->set_after_rpc_resp_fn(std::bind(&HttpServiceImpl::CallAfterRpc,
+            std::placeholders::_1, std::placeholders::_2, 
std::placeholders::_3));
+
         // Fill response.
         cntl->http_response().set_content_type("text/plain");
         butil::IOBufBuilder os;
@@ -59,6 +65,19 @@ public:
         os << "\nbody: " << cntl->request_attachment() << '\n';
         os.move_to(cntl->response_attachment());
     }
+
+    // optional
+    static void CallAfterRpc(brpc::Controller* cntl,
+                        const google::protobuf::Message* req,
+                        const google::protobuf::Message* res) {
+        // at this time res is already sent to client, but cntl/req/res is not 
destructed
+        std::string req_str;
+        std::string res_str;
+        json2pb::ProtoMessageToJson(*req, &req_str, NULL);
+        json2pb::ProtoMessageToJson(*res, &res_str, NULL);
+        LOG(INFO) << "req:" << req_str
+                    << " res:" << res_str;
+    }
 };
 
 // Service with dynamic path.
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 5392f16c..47dafb6f 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -228,6 +228,7 @@ void Controller::ResetNonPods() {
     }
     delete _remote_stream_settings;
     _thrift_method_name.clear();
+    _after_rpc_resp_fn = nullptr;
 
     CHECK(_unfinished_call == NULL);
 }
@@ -1500,6 +1501,13 @@ int Controller::GetSockOption(int level, int optname, 
void* optval, socklen_t* o
     }
 }
 
+void Controller::CallAfterRpcResp(const google::protobuf::Message* req, const 
google::protobuf::Message* res) {
+    if (_after_rpc_resp_fn) {
+        _after_rpc_resp_fn(this, req, res);
+        _after_rpc_resp_fn = nullptr;
+    }
+}
+
 #if defined(OS_MACOSX)
 typedef sig_t SignalHandler;
 #else
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index a9498818..708ff8c6 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -22,6 +22,7 @@
 // To brpc developers: This is a header included by user, don't depend
 // on internal structures, use opaque pointers instead.
 
+#include <functional>                          // std::function
 #include <gflags/gflags.h>                     // Users often need gflags
 #include <string>
 #include "butil/intrusive_ptr.hpp"             // butil::intrusive_ptr
@@ -579,6 +580,14 @@ public:
     // -1 means no deadline.
     int64_t deadline_us() const { return _deadline_us; }
 
+    using AfterRpcRespFnType = std::function<void(Controller* cntl,
+                                               const 
google::protobuf::Message* req,
+                                               const 
google::protobuf::Message* res)>;
+
+    void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) { _after_rpc_resp_fn = 
fn; }
+
+    void CallAfterRpcResp(const google::protobuf::Message* req, const 
google::protobuf::Message* res);
+
 private:
     struct CompletionInfo {
         CallId id;           // call_id of the corresponding request
@@ -834,6 +843,8 @@ private:
     std::string _thrift_method_name;
 
     uint32_t _auth_flags;
+
+    AfterRpcRespFnType _after_rpc_resp_fn;
 };
 
 // Advises the RPC system that the caller desires that the RPC call be
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp 
b/src/brpc/policy/baidu_rpc_protocol.cpp
index ef1ab7d4..7fafa218 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -149,10 +149,14 @@ void SendRpcResponse(int64_t correlation_id,
         span->set_start_send_us(butil::cpuwide_time_us());
     }
     Socket* sock = accessor.get_sending_socket();
-    std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
-    ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
+
     std::unique_ptr<const google::protobuf::Message> recycle_req(req);
     std::unique_ptr<const google::protobuf::Message> recycle_res(res);
+
+    std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
+    ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
+
+    ClosureGuard guard(brpc::NewCallback(cntl, &Controller::CallAfterRpcResp, 
req, res));
     
     StreamId response_stream_id = accessor.response_stream();
 
diff --git a/src/brpc/policy/http_rpc_protocol.cpp 
b/src/brpc/policy/http_rpc_protocol.cpp
index 9120ba58..979e3861 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -736,7 +736,11 @@ private:
 class HttpResponseSenderAsDone : public google::protobuf::Closure {
 public:
     HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
-    void Run() override { delete this; }
+    void Run() override {
+        _sender._cntl->CallAfterRpcResp(_sender._req.get(), 
_sender._res.get());
+        delete this;
+    }
+
 private:
     HttpResponseSender _sender;
 };
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 6d7d374b..d43a0f4b 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -40,7 +40,11 @@
 #include "brpc/selective_channel.h"
 #include "brpc/socket_map.h"
 #include "brpc/controller.h"
+#if BAZEL_TEST
+#include "test/echo.pb.h"
+#else
 #include "echo.pb.h"
+#endif   // BAZEL_TEST
 #include "brpc/options.pb.h"
 
 namespace brpc {
@@ -129,6 +133,22 @@ static bool VerifyMyRequest(const brpc::InputMessageBase* 
msg_base) {
     return true;
 }
 
+class CallAfterRpcObject {
+public:
+    explicit CallAfterRpcObject() {}
+
+    ~CallAfterRpcObject() {
+        EXPECT_EQ(str, "CallAfterRpcRespTest");
+    }
+
+    void Append(const std::string& s) {
+        str.append(s);
+    }
+
+private:
+    std::string str;
+};
+
 class MyEchoService : public ::test::EchoService {
     void Echo(google::protobuf::RpcController* cntl_base,
               const ::test::EchoRequest* req,
@@ -136,6 +156,9 @@ class MyEchoService : public ::test::EchoService {
               google::protobuf::Closure* done) {
         brpc::Controller* cntl =
             static_cast<brpc::Controller*>(cntl_base);
+        std::shared_ptr<CallAfterRpcObject> str_test(new CallAfterRpcObject());
+        cntl->set_after_rpc_resp_fn(std::bind(&MyEchoService::CallAfterRpc, 
str_test,
+            std::placeholders::_1, std::placeholders::_2, 
std::placeholders::_3));
         brpc::ClosureGuard done_guard(done);
         if (req->server_fail()) {
             cntl->SetFailed(req->server_fail(), "Server fail1");
@@ -157,6 +180,17 @@ class MyEchoService : public ::test::EchoService {
         }
         res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
     }
+    static void CallAfterRpc(std::shared_ptr<CallAfterRpcObject> str,
+                        brpc::Controller* cntl,
+                        const google::protobuf::Message* req,
+                        const google::protobuf::Message* res) {
+        const test::EchoRequest* request = static_cast<const 
test::EchoRequest*>(req);
+        const test::EchoResponse* response = static_cast<const 
test::EchoResponse*>(res);
+        str->Append("CallAfterRpcRespTest");
+        EXPECT_TRUE(nullptr != cntl);
+        EXPECT_TRUE(nullptr != request);
+        EXPECT_TRUE(nullptr != response);
+    }
 };
 
 pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT;
@@ -247,7 +281,7 @@ protected:
             const brpc::Server*,
             brpc::MethodStatus*, int64_t>(
                 &brpc::policy::SendRpcResponse,
-                meta.correlation_id(), cntl, NULL, res,
+                meta.correlation_id(), cntl, req, res,
                 &ts->_dummy, NULL, -1);
         ts->_svc.CallMethod(method, cntl, req, res, done);
     }
@@ -1564,7 +1598,7 @@ protected:
         }
         StopAndJoin();
     }
-    
+
     void RPCThread(brpc::ChannelBase* channel, bool async) {
         brpc::Controller cntl;
         test::EchoRequest req;
@@ -1583,7 +1617,7 @@ protected:
             test::EchoResponse res;
             req.set_message(__FUNCTION__);
             CallMethod(channel, &cntl, &req, &res, async);
-            
+
             ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
             ASSERT_EQ("received " + std::string(__FUNCTION__), res.message());
             cntl.Reset();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to