This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new e1a88638 send response之后,request/response对象析构之前执行一些自定义逻辑 (#2328)
e1a88638 is described below
commit e1a88638fb5028262396d0e6d687e66c5abd4da2
Author: youcheng huang <[email protected]>
AuthorDate: Wed Oct 25 15:01:45 2023 +0800
send response之后,request/response对象析构之前执行一些自定义逻辑 (#2328)
* call_after_rpc_resp
* fix function name & add examples & add ut
* fix mistake
* fix compile error
* fix compile error
* update ut
* complete ut
* update ut
* modify function name
---------
Co-authored-by: yuncheng <[email protected]>
---
example/asynchronous_echo_c++/server.cpp | 20 +++++++++++++++-
example/echo_c++/server.cpp | 19 +++++++++++++++
example/http_c++/http_server.cpp | 21 ++++++++++++++++-
src/brpc/controller.cpp | 8 +++++++
src/brpc/controller.h | 11 +++++++++
src/brpc/policy/baidu_rpc_protocol.cpp | 8 +++++--
src/brpc/policy/http_rpc_protocol.cpp | 6 ++++-
test/brpc_channel_unittest.cpp | 40 +++++++++++++++++++++++++++++---
8 files changed, 125 insertions(+), 8 deletions(-)
diff --git a/example/asynchronous_echo_c++/server.cpp
b/example/asynchronous_echo_c++/server.cpp
index d95d9dfe..8c7ced67 100644
--- a/example/asynchronous_echo_c++/server.cpp
+++ b/example/asynchronous_echo_c++/server.cpp
@@ -39,10 +39,15 @@ public:
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
-
+
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
+ // optional: set a callback function which is called after response is
sent
+ // and before cntl/req/res is destructed.
+ cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
+ std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
+
// The purpose of following logs is to help you to understand
// how clients interact with servers more intuitively. You should
// remove these logs in performance-sensitive servers.
@@ -64,6 +69,19 @@ public:
cntl->response_attachment().append("bar");
}
}
+
+ // optional
+ static void CallAfterRpc(brpc::Controller* cntl,
+ const google::protobuf::Message* req,
+ const google::protobuf::Message* res) {
+ // at this time res is already sent to client, but cntl/req/res is not
destructed
+ std::string req_str;
+ std::string res_str;
+ json2pb::ProtoMessageToJson(*req, &req_str, NULL);
+ json2pb::ProtoMessageToJson(*res, &res_str, NULL);
+ LOG(INFO) << "req:" << req_str
+ << " res:" << res_str;
+ }
};
int main(int argc, char* argv[]) {
diff --git a/example/echo_c++/server.cpp b/example/echo_c++/server.cpp
index 08b3e9d6..d1f06051 100644
--- a/example/echo_c++/server.cpp
+++ b/example/echo_c++/server.cpp
@@ -20,6 +20,7 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
+#include <json2pb/pb_to_json.h>
#include "echo.pb.h"
DEFINE_bool(echo_attachment, true, "Echo attachment as well");
@@ -48,6 +49,11 @@ public:
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
+ // optional: set a callback function which is called after response is
sent
+ // and before cntl/req/res is destructed.
+ cntl->set_after_rpc_resp_fn(std::bind(&EchoServiceImpl::CallAfterRpc,
+ std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
+
// The purpose of following logs is to help you to understand
// how clients interact with servers more intuitively. You should
// remove these logs in performance-sensitive servers.
@@ -70,6 +76,19 @@ public:
cntl->response_attachment().append(cntl->request_attachment());
}
}
+
+ // optional
+ static void CallAfterRpc(brpc::Controller* cntl,
+ const google::protobuf::Message* req,
+ const google::protobuf::Message* res) {
+ // at this time res is already sent to client, but cntl/req/res is not
destructed
+ std::string req_str;
+ std::string res_str;
+ json2pb::ProtoMessageToJson(*req, &req_str, NULL);
+ json2pb::ProtoMessageToJson(*res, &res_str, NULL);
+ LOG(INFO) << "req:" << req_str
+ << " res:" << res_str;
+ }
};
} // namespace example
diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp
index 9a1595b9..202f55d1 100644
--- a/example/http_c++/http_server.cpp
+++ b/example/http_c++/http_server.cpp
@@ -45,9 +45,15 @@ public:
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
-
+
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
+
+ // optional: set a callback function which is called after response is
sent
+ // and before cntl/req/res is destructed.
+ cntl->set_after_rpc_resp_fn(std::bind(&HttpServiceImpl::CallAfterRpc,
+ std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
+
// Fill response.
cntl->http_response().set_content_type("text/plain");
butil::IOBufBuilder os;
@@ -59,6 +65,19 @@ public:
os << "\nbody: " << cntl->request_attachment() << '\n';
os.move_to(cntl->response_attachment());
}
+
+ // optional
+ static void CallAfterRpc(brpc::Controller* cntl,
+ const google::protobuf::Message* req,
+ const google::protobuf::Message* res) {
+ // at this time res is already sent to client, but cntl/req/res is not
destructed
+ std::string req_str;
+ std::string res_str;
+ json2pb::ProtoMessageToJson(*req, &req_str, NULL);
+ json2pb::ProtoMessageToJson(*res, &res_str, NULL);
+ LOG(INFO) << "req:" << req_str
+ << " res:" << res_str;
+ }
};
// Service with dynamic path.
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 5392f16c..47dafb6f 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -228,6 +228,7 @@ void Controller::ResetNonPods() {
}
delete _remote_stream_settings;
_thrift_method_name.clear();
+ _after_rpc_resp_fn = nullptr;
CHECK(_unfinished_call == NULL);
}
@@ -1500,6 +1501,13 @@ int Controller::GetSockOption(int level, int optname,
void* optval, socklen_t* o
}
}
+void Controller::CallAfterRpcResp(const google::protobuf::Message* req, const
google::protobuf::Message* res) {
+ if (_after_rpc_resp_fn) {
+ _after_rpc_resp_fn(this, req, res);
+ _after_rpc_resp_fn = nullptr;
+ }
+}
+
#if defined(OS_MACOSX)
typedef sig_t SignalHandler;
#else
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index a9498818..708ff8c6 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -22,6 +22,7 @@
// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
+#include <functional> // std::function
#include <gflags/gflags.h> // Users often need gflags
#include <string>
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
@@ -579,6 +580,14 @@ public:
// -1 means no deadline.
int64_t deadline_us() const { return _deadline_us; }
+ using AfterRpcRespFnType = std::function<void(Controller* cntl,
+ const
google::protobuf::Message* req,
+ const
google::protobuf::Message* res)>;
+
+ void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) { _after_rpc_resp_fn =
fn; }
+
+ void CallAfterRpcResp(const google::protobuf::Message* req, const
google::protobuf::Message* res);
+
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
@@ -834,6 +843,8 @@ private:
std::string _thrift_method_name;
uint32_t _auth_flags;
+
+ AfterRpcRespFnType _after_rpc_resp_fn;
};
// Advises the RPC system that the caller desires that the RPC call be
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp
b/src/brpc/policy/baidu_rpc_protocol.cpp
index ef1ab7d4..7fafa218 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -149,10 +149,14 @@ void SendRpcResponse(int64_t correlation_id,
span->set_start_send_us(butil::cpuwide_time_us());
}
Socket* sock = accessor.get_sending_socket();
- std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
- ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
+
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
+
+ std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
+ ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
+
+ ClosureGuard guard(brpc::NewCallback(cntl, &Controller::CallAfterRpcResp,
req, res));
StreamId response_stream_id = accessor.response_stream();
diff --git a/src/brpc/policy/http_rpc_protocol.cpp
b/src/brpc/policy/http_rpc_protocol.cpp
index 9120ba58..979e3861 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -736,7 +736,11 @@ private:
class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
- void Run() override { delete this; }
+ void Run() override {
+ _sender._cntl->CallAfterRpcResp(_sender._req.get(),
_sender._res.get());
+ delete this;
+ }
+
private:
HttpResponseSender _sender;
};
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 6d7d374b..d43a0f4b 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -40,7 +40,11 @@
#include "brpc/selective_channel.h"
#include "brpc/socket_map.h"
#include "brpc/controller.h"
+#if BAZEL_TEST
+#include "test/echo.pb.h"
+#else
#include "echo.pb.h"
+#endif // BAZEL_TEST
#include "brpc/options.pb.h"
namespace brpc {
@@ -129,6 +133,22 @@ static bool VerifyMyRequest(const brpc::InputMessageBase*
msg_base) {
return true;
}
+class CallAfterRpcObject {
+public:
+ explicit CallAfterRpcObject() {}
+
+ ~CallAfterRpcObject() {
+ EXPECT_EQ(str, "CallAfterRpcRespTest");
+ }
+
+ void Append(const std::string& s) {
+ str.append(s);
+ }
+
+private:
+ std::string str;
+};
+
class MyEchoService : public ::test::EchoService {
void Echo(google::protobuf::RpcController* cntl_base,
const ::test::EchoRequest* req,
@@ -136,6 +156,9 @@ class MyEchoService : public ::test::EchoService {
google::protobuf::Closure* done) {
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
+ std::shared_ptr<CallAfterRpcObject> str_test(new CallAfterRpcObject());
+ cntl->set_after_rpc_resp_fn(std::bind(&MyEchoService::CallAfterRpc,
str_test,
+ std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3));
brpc::ClosureGuard done_guard(done);
if (req->server_fail()) {
cntl->SetFailed(req->server_fail(), "Server fail1");
@@ -157,6 +180,17 @@ class MyEchoService : public ::test::EchoService {
}
res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
}
+ static void CallAfterRpc(std::shared_ptr<CallAfterRpcObject> str,
+ brpc::Controller* cntl,
+ const google::protobuf::Message* req,
+ const google::protobuf::Message* res) {
+ const test::EchoRequest* request = static_cast<const
test::EchoRequest*>(req);
+ const test::EchoResponse* response = static_cast<const
test::EchoResponse*>(res);
+ str->Append("CallAfterRpcRespTest");
+ EXPECT_TRUE(nullptr != cntl);
+ EXPECT_TRUE(nullptr != request);
+ EXPECT_TRUE(nullptr != response);
+ }
};
pthread_once_t register_mock_protocol = PTHREAD_ONCE_INIT;
@@ -247,7 +281,7 @@ protected:
const brpc::Server*,
brpc::MethodStatus*, int64_t>(
&brpc::policy::SendRpcResponse,
- meta.correlation_id(), cntl, NULL, res,
+ meta.correlation_id(), cntl, req, res,
&ts->_dummy, NULL, -1);
ts->_svc.CallMethod(method, cntl, req, res, done);
}
@@ -1564,7 +1598,7 @@ protected:
}
StopAndJoin();
}
-
+
void RPCThread(brpc::ChannelBase* channel, bool async) {
brpc::Controller cntl;
test::EchoRequest req;
@@ -1583,7 +1617,7 @@ protected:
test::EchoResponse res;
req.set_message(__FUNCTION__);
CallMethod(channel, &cntl, &req, &res, async);
-
+
ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText();
ASSERT_EQ("received " + std::string(__FUNCTION__), res.message());
cntl.Reset();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]