This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 2098dd39 Support user fields of baidu protocol (#2406)
2098dd39 is described below
commit 2098dd3927ac389cdf6a0973b0edf19a477ff540
Author: Bright Chen <[email protected]>
AuthorDate: Wed Dec 20 11:49:31 2023 +0800
Support user fields of baidu protocol (#2406)
* Support user fields of baidu protocol
* Use request_user_fields before RPC
---
src/brpc/controller.cpp | 4 ++++
src/brpc/controller.h | 26 ++++++++++++++++++++++++
src/brpc/policy/baidu_rpc_meta.proto | 3 ++-
src/brpc/policy/baidu_rpc_protocol.cpp | 29 +++++++++++++++++++++++++++
test/brpc_server_unittest.cpp | 36 ++++++++++++++++++++++++++++++++++
5 files changed, 97 insertions(+), 1 deletion(-)
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 47dafb6f..f49a27a9 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -209,6 +209,8 @@ void Controller::ResetNonPods() {
_request_buf.clear();
delete _http_request;
delete _http_response;
+ delete _request_user_fields;
+ delete _response_user_fields;
_request_attachment.clear();
_response_attachment.clear();
if (_wpa) {
@@ -283,6 +285,8 @@ void Controller::ResetPods() {
_idl_result = IDL_VOID_RESULT;
_http_request = NULL;
_http_response = NULL;
+ _request_user_fields = NULL;
+ _response_user_fields = NULL;
_request_stream = INVALID_STREAM_ID;
_response_stream = INVALID_STREAM_ID;
_remote_stream_settings = NULL;
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 708ff8c6..d3ffb99f 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -105,6 +105,8 @@ enum StopStyle {
const int32_t UNSET_MAGIC_NUM = -123456789;
+typedef butil::FlatMap<std::string, std::string> UserFieldsMap;
+
// A Controller mediates a single method call. The primary purpose of
// the controller is to provide a way to manipulate settings per RPC-call
// and to find out about RPC-level errors.
@@ -255,6 +257,26 @@ public:
return tmp;
}
+ UserFieldsMap* request_user_fields() {
+ if (!_request_user_fields) {
+ _request_user_fields = new UserFieldsMap;
+ _request_user_fields->init(29);
+ }
+ return _request_user_fields;
+ }
+
+ bool has_request_user_fields() const { return _request_user_fields; }
+
+ UserFieldsMap* response_user_fields() {
+ if (!_response_user_fields) {
+ _response_user_fields = new UserFieldsMap;
+ _response_user_fields->init(29);
+ }
+ return _response_user_fields;
+ }
+
+ bool has_response_user_fields() const { return _response_user_fields; }
+
// User attached data or body of http request, which is wired to network
// directly instead of being serialized into protobuf messages.
butil::IOBuf& request_attachment() { return _request_attachment; }
@@ -820,6 +842,10 @@ private:
HttpHeader* _http_request;
HttpHeader* _http_response;
+ // User fields of baidu_std protocol.
+ UserFieldsMap* _request_user_fields;
+ UserFieldsMap* _response_user_fields;
+
std::unique_ptr<KVMap> _session_kv;
// Fields with large size but low access frequency
diff --git a/src/brpc/policy/baidu_rpc_meta.proto
b/src/brpc/policy/baidu_rpc_meta.proto
index dc716540..300564bb 100644
--- a/src/brpc/policy/baidu_rpc_meta.proto
+++ b/src/brpc/policy/baidu_rpc_meta.proto
@@ -31,7 +31,8 @@ message RpcMeta {
optional int32 attachment_size = 5;
optional ChunkInfo chunk_info = 6;
optional bytes authentication_data = 7;
- optional StreamSettings stream_settings = 8;
+ optional StreamSettings stream_settings = 8;
+ map<string, string> user_fields = 9;
}
message RpcRequestMeta {
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp
b/src/brpc/policy/baidu_rpc_protocol.cpp
index d8342619..b19fbb37 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -223,6 +223,15 @@ void SendRpcResponse(int64_t correlation_id,
}
}
+ if (cntl->has_response_user_fields() &&
+ !cntl->response_user_fields()->empty()) {
+ ::google::protobuf::Map<std::string, std::string>& user_fields
+ = *meta.mutable_user_fields();
+ user_fields.insert(cntl->response_user_fields()->begin(),
+ cntl->response_user_fields()->end());
+
+ }
+
butil::IOBuf res_buf;
SerializeRpcHeaderAndMeta(&res_buf, meta, res_size + attached_size);
if (append_body) {
@@ -380,6 +389,12 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
accessor.set_remote_stream_settings(meta.release_stream_settings());
}
+ if (!meta.user_fields().empty()) {
+ for (const auto& it : meta.user_fields()) {
+ (*cntl->request_user_fields())[it.first] = it.second;
+ }
+ }
+
// Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
@@ -595,6 +610,13 @@ void ProcessRpcResponse(InputMessageBase* msg_base) {
accessor.set_remote_stream_settings(
new StreamSettings(meta.stream_settings()));
}
+
+ if (!meta.user_fields().empty()) {
+ for (const auto& it : meta.user_fields()) {
+ (*cntl->response_user_fields())[it.first] = it.second;
+ }
+ }
+
Span* span = accessor.span();
if (span) {
span->set_base_real_us(msg->base_real_us());
@@ -694,6 +716,13 @@ void PackRpcRequest(butil::IOBuf* req_buf,
s->FillSettings(meta.mutable_stream_settings());
}
+ if (cntl->has_request_user_fields() &&
!cntl->request_user_fields()->empty()) {
+ ::google::protobuf::Map<std::string, std::string>& user_fields
+ = *meta.mutable_user_fields();
+ user_fields.insert(cntl->request_user_fields()->begin(),
+ cntl->request_user_fields()->end());
+ }
+
// Don't use res->ByteSize() since it may be compressed
const size_t req_size = request_body.length();
const size_t attached_size = cntl->request_attachment().length();
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index c22b6b53..8a8a76d8 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -97,6 +97,8 @@ bool g_delete = false;
const std::string EXP_REQUEST = "hello";
const std::string EXP_RESPONSE = "world";
const std::string EXP_REQUEST_BASE64 = "aGVsbG8=";
+const std::string EXP_USER_FIELD_KEY = "hello";
+const std::string EXP_USER_FIELD_VALUE = "world";
class EchoServiceImpl : public test::EchoService {
public:
@@ -118,6 +120,13 @@ public:
} else {
LOG(INFO) << "No sleep, protocol=" << cntl->request_protocol();
}
+ if (cntl->has_request_user_fields()) {
+ ASSERT_TRUE(!cntl->request_user_fields()->empty());
+ std::string* val =
cntl->request_user_fields()->seek(EXP_USER_FIELD_KEY);
+ ASSERT_TRUE(val != NULL);
+ ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
+ cntl->response_user_fields()->insert(EXP_USER_FIELD_KEY,
EXP_USER_FIELD_VALUE);
+ }
}
virtual void ComboEcho(google::protobuf::RpcController*,
@@ -1620,4 +1629,31 @@ TEST_F(ServerTest, max_concurrency) {
stub.Echo(&cntl4, &req, NULL, NULL);
ASSERT_FALSE(cntl4.Failed()) << cntl4.ErrorText();
}
+
+TEST_F(ServerTest, user_fields) {
+ const int port = 9200;
+ brpc::Server server;
+ EchoServiceImpl service;
+ ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+ ASSERT_EQ(0, server.Start(port, NULL));
+
+ brpc::Channel channel;
+ ASSERT_EQ(0, channel.Init("0.0.0.0", port, NULL));
+ test::EchoService_Stub stub(&channel);
+
+ brpc::Controller cntl;
+ cntl.request_user_fields()->insert(EXP_USER_FIELD_KEY,
EXP_USER_FIELD_VALUE);
+ test::EchoRequest req;
+ test::EchoResponse res;
+ req.set_message("hello");
+ stub.Echo(&cntl, &req, &res, NULL);
+
+ ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+ ASSERT_TRUE(cntl.has_response_user_fields());
+ ASSERT_TRUE(!cntl.response_user_fields()->empty());
+ std::string* val = cntl.response_user_fields()->seek(EXP_USER_FIELD_KEY);
+ ASSERT_TRUE(val != NULL);
+ ASSERT_EQ(*val, EXP_USER_FIELD_VALUE);
+}
+
} //namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]