This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 48996cfc Support rpc protobuf message factory interface (#2718)
48996cfc is described below

commit 48996cfc901248508cf6f2ac486ddde25d0981f6
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon Aug 26 10:27:21 2024 +0800

    Support rpc protobuf message factory interface (#2718)
    
    * Support rpc protobuf message factory interface
    
    * Update cn/server.md
---
 docs/cn/server.md                      | 28 ++++++++++
 src/brpc/policy/baidu_rpc_protocol.cpp | 99 +++++++++++++++++++++++-----------
 src/brpc/rpc_pb_message_factory.cpp    | 51 ++++++++++++++++++
 src/brpc/rpc_pb_message_factory.h      | 54 +++++++++++++++++++
 src/brpc/server.cpp                    |  6 ++-
 src/brpc/server.h                      | 10 ++++
 test/brpc_channel_unittest.cpp         | 27 +++++-----
 test/brpc_server_unittest.cpp          | 61 +++++++++++++++++++++
 8 files changed, 289 insertions(+), 47 deletions(-)

diff --git a/docs/cn/server.md b/docs/cn/server.md
index 070adf97..5e3fc42a 100644
--- a/docs/cn/server.md
+++ b/docs/cn/server.md
@@ -1013,6 +1013,34 @@ public:
         ...
 ```
 
+## RPC Protobuf message factory
+
+Server默认使用`DefaultRpcPBMessageFactory`。它是一个简单的工厂类,通过`new`来创建请求/响应message和`delete`来销毁请求/响应message。
+
+如果用户希望自定义创建销毁机制,可以实现`RpcPBMessages`(请求/响应message的封装)和`RpcPBMessageFactory`(工厂类),并通过`ServerOptions.rpc_pb_message_factory`。
+
+接口如下:
+
+```c++
+// Inherit this class to customize rpc protobuf messages,
+// include request and response.
+class RpcPBMessages {
+public:
+    virtual ~RpcPBMessages() = default;
+    virtual google::protobuf::Message* Request() = 0;
+    virtual google::protobuf::Message* Response() = 0;
+};
+
+// Factory to manage `RpcPBMessages'.
+class RpcPBMessageFactory {
+public:
+    virtual ~RpcPBMessageFactory() = default;
+    virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
+                               const ::google::protobuf::MethodDescriptor& 
method) = 0;
+    virtual void Return(RpcPBMessages* protobuf_message) = 0;
+};
+```
+
 # FAQ
 
 ### Q: Fail to write into fd=1865 SocketId=8905@10.208.245.43:54742@8230: Got 
EOF是什么意思
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp 
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 6ce76467..0fb439a8 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -24,6 +24,7 @@
 #include "butil/time.h"
 #include "butil/iobuf.h"                         // butil::IOBuf
 #include "butil/raw_pack.h"                      // RawPacker RawUnpacker
+#include "butil/memory/scope_guard.h"
 #include "brpc/controller.h"                    // Controller
 #include "brpc/socket.h"                        // Socket
 #include "brpc/server.h"                        // Server
@@ -31,6 +32,7 @@
 #include "brpc/compress.h"                      // ParseFromCompressedData
 #include "brpc/stream_impl.h"
 #include "brpc/rpc_dump.h"                      // SampledRequest
+#include "brpc/rpc_pb_message_factory.h"
 #include "brpc/policy/baidu_rpc_meta.pb.h"      // RpcRequestMeta
 #include "brpc/policy/baidu_rpc_protocol.h"
 #include "brpc/policy/most_common_message.h"
@@ -157,11 +159,34 @@ static bool SerializeResponse(const 
google::protobuf::Message& res,
     return true;
 }
 
+namespace {
+struct BaiduProxyPBMessages : public RpcPBMessages {
+    static BaiduProxyPBMessages* Get() {
+        return butil::get_object<BaiduProxyPBMessages>();
+    }
+
+    static void Return(BaiduProxyPBMessages* messages) {
+        messages->Clear();
+        butil::return_object(messages);
+    }
+
+    void Clear() {
+        request.Clear();
+        response.Clear();
+    }
+
+    ::google::protobuf::Message* Request() override { return &request; }
+    ::google::protobuf::Message* Response() override { return &response; }
+
+    SerializedRequest request;
+    SerializedResponse response;
+};
+}
+
 // Used by UT, can't be static.
 void SendRpcResponse(int64_t correlation_id,
-                     Controller* cntl, 
-                     const google::protobuf::Message* req,
-                     const google::protobuf::Message* res,
+                     Controller* cntl,
+                     RpcPBMessages* messages,
                      const Server* server,
                      MethodStatus* method_status,
                      int64_t received_us) {
@@ -172,13 +197,24 @@ void SendRpcResponse(int64_t correlation_id,
     }
     Socket* sock = accessor.get_sending_socket();
 
-    std::unique_ptr<const google::protobuf::Message> recycle_req(req);
-    std::unique_ptr<const google::protobuf::Message> recycle_res(res);
-
     std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
     ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
 
-    ClosureGuard guard(brpc::NewCallback(cntl, &Controller::CallAfterRpcResp, 
req, res));
+    auto messages_guard = butil::MakeScopeGuard([server, messages] {
+        if (NULL == messages) {
+            return;
+        }
+        if (NULL != server->options().baidu_master_service) {
+            
BaiduProxyPBMessages::Return(static_cast<BaiduProxyPBMessages*>(messages));
+        } else {
+            server->options().rpc_pb_message_factory->Return(messages);
+        }
+    });
+
+    const google::protobuf::Message* req = NULL == messages ? NULL : 
messages->Request();
+    const google::protobuf::Message* res = NULL == messages ? NULL : 
messages->Response();
+    ClosureGuard guard(brpc::NewCallback(
+        cntl, &Controller::CallAfterRpcResp, req, res));
     
     StreamId response_stream_id = accessor.response_stream();
 
@@ -375,8 +411,7 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
         return;
     }
 
-    std::unique_ptr<google::protobuf::Message> req;
-    std::unique_ptr<google::protobuf::Message> res;
+    RpcPBMessages* messages = NULL;
 
     ServerPrivateAccessor server_accessor(server);
     ControllerPrivateAccessor accessor(cntl.get());
@@ -496,13 +531,10 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
                 span->ResetServerSpanName(sampled_request->meta.method_name());
             }
 
-            auto serialized_request = (SerializedRequest*)
-                svc->GetRequestPrototype(NULL).New();
-            req.reset(serialized_request);
-            res.reset(svc->GetResponsePrototype(NULL).New());
-
-            msg->payload.cutn(&serialized_request->serialized_data(),
-                              req_size - meta.attachment_size());
+            messages = BaiduProxyPBMessages::Get();
+            msg->payload.cutn(
+                &((SerializedRequest*)messages->Request())->serialized_data(),
+                req_size - meta.attachment_size());
             if (!msg->payload.empty()) {
                 cntl->request_attachment().swap(msg->payload);
             }
@@ -568,26 +600,25 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
             }
 
             auto req_cmp_type = 
static_cast<CompressType>(meta.compress_type());
-            req.reset(svc->GetRequestPrototype(method).New());
-            if (!ParseFromCompressedData(req_buf, req.get(), req_cmp_type)) {
+            messages = server->options().rpc_pb_message_factory->Get(*svc, 
*method);
+            if (!ParseFromCompressedData(req_buf, messages->Request(), 
req_cmp_type)) {
                 cntl->SetFailed(EREQUEST, "Fail to parse request message, "
                                           "CompressType=%s, request_size=%d",
                                 CompressTypeToCStr(req_cmp_type), req_size);
+                server->options().rpc_pb_message_factory->Return(messages);
                 break;
             }
-
-            res.reset(svc->GetResponsePrototype(method).New());
             req_buf.clear();
         }
 
         // `socket' will be held until response has been sent
         google::protobuf::Closure* done = ::brpc::NewCallback<
-            int64_t, Controller*, const google::protobuf::Message*,
-            const google::protobuf::Message*, const Server*,
-            MethodStatus*, int64_t>(
-                &SendRpcResponse, meta.correlation_id(), cntl.get(), 
-                req.get(), res.get(), server,
-                method_status, msg->received_us());
+            int64_t, Controller*, RpcPBMessages*,
+            const Server*, MethodStatus*, int64_t>(&SendRpcResponse,
+                                                   meta.correlation_id(),
+                                                   cntl.get(), messages,
+                                                   server, method_status,
+                                                   msg->received_us());
 
         // optional, just release resource ASAP
         msg.reset();
@@ -598,24 +629,28 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
         }
         if (!FLAGS_usercode_in_pthread) {
             return svc->CallMethod(method, cntl.release(), 
-                                   req.release(), res.release(), done);
+                                   messages->Request(),
+                                   messages->Response(), done);
         }
         if (BeginRunningUserCode()) {
             svc->CallMethod(method, cntl.release(), 
-                            req.release(), res.release(), done);
+                            messages->Request(),
+                            messages->Response(), done);
             return EndRunningUserCodeInPlace();
         } else {
             return EndRunningCallMethodInPool(
                 svc, method, cntl.release(),
-                req.release(), res.release(), done);
+                messages->Request(),
+                messages->Response(), done);
         }
     } while (false);
     
     // `cntl', `req' and `res' will be deleted inside `SendRpcResponse'
     // `socket' will be held until response has been sent
-    SendRpcResponse(meta.correlation_id(), cntl.release(), 
-                    req.release(), res.release(), server,
-                    method_status, msg->received_us());
+    SendRpcResponse(meta.correlation_id(),
+                    cntl.release(), messages,
+                    server, method_status,
+                    msg->received_us());
 }
 
 bool VerifyRpcRequest(const InputMessageBase* msg_base) {
diff --git a/src/brpc/rpc_pb_message_factory.cpp 
b/src/brpc/rpc_pb_message_factory.cpp
new file mode 100644
index 00000000..828a289d
--- /dev/null
+++ b/src/brpc/rpc_pb_message_factory.cpp
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/rpc_pb_message_factory.h"
+#include "butil/object_pool.h"
+
+namespace brpc {
+
+struct DefaultRpcPBMessages : public RpcPBMessages {
+    DefaultRpcPBMessages() : request(NULL), response(NULL) {}
+    ::google::protobuf::Message* Request() override { return request; }
+    ::google::protobuf::Message* Response() override { return response; }
+
+    ::google::protobuf::Message* request;
+    ::google::protobuf::Message* response;
+};
+
+
+RpcPBMessages* DefaultRpcPBMessageFactory::Get(
+        const ::google::protobuf::Service& service,
+        const ::google::protobuf::MethodDescriptor& method) {
+    auto messages = butil::get_object<DefaultRpcPBMessages>();
+    messages->request = service.GetRequestPrototype(&method).New();
+    messages->response = service.GetResponsePrototype(&method).New();
+    return messages;
+}
+
+void DefaultRpcPBMessageFactory::Return(RpcPBMessages* messages) {
+    auto default_messages = static_cast<DefaultRpcPBMessages*>(messages);
+    delete default_messages->request;
+    delete default_messages->response;
+    default_messages->request = NULL;
+    default_messages->response = NULL;
+    butil::return_object(default_messages);
+}
+
+} // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/rpc_pb_message_factory.h 
b/src/brpc/rpc_pb_message_factory.h
new file mode 100644
index 00000000..0da0ff2a
--- /dev/null
+++ b/src/brpc/rpc_pb_message_factory.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_RPC_PB_MESSAGE_FACTORY_H
+#define BRPC_RPC_PB_MESSAGE_FACTORY_H
+
+#include <google/protobuf/service.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/message.h>
+
+namespace brpc {
+
+// Inherit this class to customize rpc protobuf messages,
+// include request and response.
+class RpcPBMessages {
+public:
+    virtual ~RpcPBMessages() = default;
+    virtual google::protobuf::Message* Request() = 0;
+    virtual google::protobuf::Message* Response() = 0;
+};
+
+// Factory to manage `RpcPBMessages'.
+class RpcPBMessageFactory {
+public:
+    virtual ~RpcPBMessageFactory() = default;
+    virtual RpcPBMessages* Get(const ::google::protobuf::Service& service,
+                               const ::google::protobuf::MethodDescriptor& 
method) = 0;
+    virtual void Return(RpcPBMessages* protobuf_message) = 0;
+};
+
+class DefaultRpcPBMessageFactory : public RpcPBMessageFactory {
+public:
+    RpcPBMessages* Get(const ::google::protobuf::Service& service,
+                       const ::google::protobuf::MethodDescriptor& method) 
override;
+    void Return(RpcPBMessages* messages) override;
+};
+
+} // namespace brpc
+
+#endif //BRPC_RPC_PB_MESSAGE_FACTORY_H
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 00cdec79..740873f1 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -151,7 +151,8 @@ ServerOptions::ServerOptions()
     , health_reporter(NULL)
     , rtmp_service(NULL)
     , redis_service(NULL)
-    , bthread_tag(BTHREAD_TAG_INVALID) {
+    , bthread_tag(BTHREAD_TAG_INVALID)
+    , rpc_pb_message_factory(new DefaultRpcPBMessageFactory()) {
     if (s_ncore > 0) {
         num_threads = s_ncore + 1;
     }
@@ -449,6 +450,9 @@ Server::~Server() {
     delete _options.http_master_service;
     _options.http_master_service = NULL;
 
+    delete _options.rpc_pb_message_factory;
+    _options.rpc_pb_message_factory = NULL;
+
     delete _am;
     _am = NULL;
     delete _internal_am;
diff --git a/src/brpc/server.h b/src/brpc/server.h
index fdcba68f..d65e13f0 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -44,6 +44,7 @@
 #include "brpc/interceptor.h"
 #include "brpc/concurrency_limiter.h"
 #include "brpc/baidu_master_service.h"
+#include "brpc/rpc_pb_message_factory.h"
 
 namespace brpc {
 
@@ -277,6 +278,15 @@ struct ServerOptions {
     // Default: BTHREAD_TAG_DEFAULT
     bthread_tag_t bthread_tag;
 
+    // [CAUTION] This option is for implementing specialized rpc protobuf
+    // message factory, most users don't need it. Don't change this option
+    // unless you fully understand the description below.
+    // If this option is set, all baidu-std rpc request message and response
+    // message will be created by this factory.
+    //
+    // Owned by Server and deleted in server's destructor.
+    RpcPBMessageFactory* rpc_pb_message_factory;
+
 private:
     // SSLOptions is large and not often used, allocate it on heap to
     // prevent ServerOptions from being bloated in most cases.
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index d43a0f4b..6c189f18 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -53,10 +53,11 @@ DECLARE_int32(max_connection_pool_size);
 class Server;
 class MethodStatus;
 namespace policy {
-void SendRpcResponse(int64_t correlation_id, Controller* cntl, 
-                     const google::protobuf::Message* req,
-                     const google::protobuf::Message* res,
-                     const Server* server_raw, MethodStatus *, int64_t);
+void SendRpcResponse(int64_t correlation_id,
+                     Controller* cntl,
+                     RpcPBMessages* messages,
+                     const Server* server_raw,
+                     MethodStatus *, int64_t);
 } // policy
 } // brpc
 
@@ -255,8 +256,10 @@ protected:
         ASSERT_EQ(ts->_svc.descriptor()->full_name(), req_meta.service_name());
         const google::protobuf::MethodDescriptor* method =
             ts->_svc.descriptor()->FindMethodByName(req_meta.method_name());
-        google::protobuf::Message* req =
-              ts->_svc.GetRequestPrototype(method).New();
+        brpc::RpcPBMessages* messages =
+            ts->_dummy.options().rpc_pb_message_factory->Get(ts->_svc, 
*method);
+        google::protobuf::Message* req = messages->Request();
+        google::protobuf::Message* res = messages->Response();
         if (meta.attachment_size() != 0) {
             butil::IOBuf req_buf;
             msg->payload.cutn(&req_buf, msg->payload.size() - 
meta.attachment_size());
@@ -271,18 +274,14 @@ protected:
         cntl->_current_call.sending_sock.reset(ptr.release());
         cntl->_server = &ts->_dummy;
 
-        google::protobuf::Message* res =
-              ts->_svc.GetResponsePrototype(method).New();
         google::protobuf::Closure* done =
               brpc::NewCallback<
             int64_t, brpc::Controller*,
-            const google::protobuf::Message*,
-            const google::protobuf::Message*,
+            brpc::RpcPBMessages*,
             const brpc::Server*,
-            brpc::MethodStatus*, int64_t>(
-                &brpc::policy::SendRpcResponse,
-                meta.correlation_id(), cntl, req, res,
-                &ts->_dummy, NULL, -1);
+            brpc::MethodStatus*, int64_t>(&brpc::policy::SendRpcResponse,
+                                          meta.correlation_id(), cntl,
+                                          messages, &ts->_dummy, NULL, -1);
         ts->_svc.CallMethod(method, cntl, req, res, done);
     }
 
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index cc98f111..5f06887a 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -29,6 +29,7 @@
 #include "butil/fd_guard.h"
 #include "butil/files/scoped_file.h"
 #include "brpc/socket.h"
+#include "butil/object_pool.h"
 #include "brpc/builtin/version_service.h"
 #include "brpc/builtin/health_service.h"
 #include "brpc/builtin/list_service.h"
@@ -1767,4 +1768,64 @@ TEST_F(ServerTest, generic_call) {
     ASSERT_EQ(0, server.Join());
 }
 
+struct DefaultRpcPBMessages : public brpc::RpcPBMessages {
+    DefaultRpcPBMessages() : request(NULL), response(NULL) {}
+    ::google::protobuf::Message* Request() override { return request; }
+    ::google::protobuf::Message* Response() override { return response; }
+
+    ::google::protobuf::Message* request;
+    ::google::protobuf::Message* response;
+};
+
+class TestRpcPBMessageFactory : public brpc::RpcPBMessageFactory {
+public:
+    brpc::RpcPBMessages* Get(const google::protobuf::Service& service,
+                             const google::protobuf::MethodDescriptor& method) 
override {
+        auto messages = butil::get_object<DefaultRpcPBMessages>();
+        auto request = butil::get_object<test::EchoRequest>();
+        auto response = butil::get_object<test::EchoResponse>();
+        request->clear_message();
+        response->clear_message();
+        messages->request = request;
+        messages->response = response;
+        return messages;
+    }
+
+    void Return(brpc::RpcPBMessages* messages) override {
+        auto test_messages = static_cast<DefaultRpcPBMessages*>(messages);
+        
butil::return_object(static_cast<test::EchoRequest*>(test_messages->request));
+        
butil::return_object(static_cast<test::EchoResponse*>(test_messages->response));
+        test_messages->request = NULL;
+        test_messages->response = NULL;
+        butil::return_object(test_messages);
+    }
+};
+
+TEST_F(ServerTest, rpc_pb_message_factory) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+    brpc::Server server;
+    EchoServiceImpl service;
+    ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+    brpc::ServerOptions opt;
+    opt.rpc_pb_message_factory = new TestRpcPBMessageFactory;
+    ASSERT_EQ(0, server.Start(ep, &opt));
+
+    brpc::Channel chan;
+    brpc::ChannelOptions copt;
+    copt.protocol = "baidu_std";
+    ASSERT_EQ(0, chan.Init(ep, &copt));
+    brpc::Controller cntl;
+    test::EchoRequest req;
+    test::EchoResponse res;
+    req.set_message(EXP_REQUEST);
+    test::EchoService_Stub stub(&chan);
+    stub.Echo(&cntl, &req, &res, NULL);
+    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+    ASSERT_EQ(EXP_RESPONSE, res.message());
+
+    ASSERT_EQ(0, server.Stop(0));
+    ASSERT_EQ(0, server.Join());
+}
+
 } //namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to