This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 56946469 Support user interceptor of server (#2137)
56946469 is described below
commit 569464694b6d87e7ccf60c7641c11caf5390d695
Author: Bright Chen <[email protected]>
AuthorDate: Wed Apr 26 15:45:32 2023 +0800
Support user interceptor of server (#2137)
* Support user interceptor of server
* Optimize interceptor unittest
* Optimize interceptor
* interceptor return quickly
---
src/brpc/interceptor.h | 42 +++++++
src/brpc/policy/baidu_rpc_protocol.cpp | 6 +
src/brpc/policy/http_rpc_protocol.cpp | 3 +
src/brpc/policy/hulu_pbrpc_protocol.cpp | 5 +
src/brpc/policy/nshead_protocol.cpp | 3 +
src/brpc/policy/sofa_pbrpc_protocol.cpp | 5 +
src/brpc/policy/thrift_protocol.cpp | 4 +
src/brpc/server.cpp | 25 ++++
src/brpc/server.h | 13 +++
test/brpc_interceptor_unittest.cpp | 201 ++++++++++++++++++++++++++++++++
10 files changed, 307 insertions(+)
diff --git a/src/brpc/interceptor.h b/src/brpc/interceptor.h
new file mode 100644
index 00000000..dbd59b39
--- /dev/null
+++ b/src/brpc/interceptor.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_INTERCEPTOR_H
+#define BRPC_INTERCEPTOR_H
+
+#include "brpc/controller.h"
+
+
+namespace brpc {
+
+class Interceptor {
+public:
+ virtual ~Interceptor() = default;
+
+ // Returns true if accept request, reject request otherwise.
+ // When server rejects request, You can fill `error_code'
+ // and `error_txt' which will send to client.
+ virtual bool Accept(const brpc::Controller* controller,
+ int& error_code,
+ std::string& error_txt) const = 0;
+
+};
+
+} // namespace brpc
+
+
+#endif //BRPC_INTERCEPTOR_H
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp
b/src/brpc/policy/baidu_rpc_protocol.cpp
index 73e42f93..ef1ab7d4 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -464,6 +464,12 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
google::protobuf::Service* svc = mp->service;
const google::protobuf::MethodDescriptor* method = mp->method;
accessor.set_method(method);
+
+
+ if (!server->AcceptRequest(cntl.get())) {
+ break;
+ }
+
if (span) {
span->ResetServerSpanName(method->full_name());
}
diff --git a/src/brpc/policy/http_rpc_protocol.cpp
b/src/brpc/policy/http_rpc_protocol.cpp
index bd880960..d3ae2625 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -1431,6 +1431,9 @@ void ProcessHttpRequest(InputMessageBase *msg) {
" -usercode_in_pthread is on");
return;
}
+ if (!server->AcceptRequest(cntl)) {
+ return;
+ }
} else if (security_mode) {
cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "
"ServerOptions.internal_port=%d instead if you're in"
diff --git a/src/brpc/policy/hulu_pbrpc_protocol.cpp
b/src/brpc/policy/hulu_pbrpc_protocol.cpp
index 5a31ae72..2b63189e 100644
--- a/src/brpc/policy/hulu_pbrpc_protocol.cpp
+++ b/src/brpc/policy/hulu_pbrpc_protocol.cpp
@@ -469,6 +469,11 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);
+
+ if (!server->AcceptRequest(cntl.get())) {
+ break;
+ }
+
if (span) {
span->ResetServerSpanName(method->full_name());
}
diff --git a/src/brpc/policy/nshead_protocol.cpp
b/src/brpc/policy/nshead_protocol.cpp
index cc27df23..e51be361 100644
--- a/src/brpc/policy/nshead_protocol.cpp
+++ b/src/brpc/policy/nshead_protocol.cpp
@@ -317,6 +317,9 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
" -usercode_in_pthread is on");
break;
}
+ if (!server->AcceptRequest(cntl)) {
+ break;
+ }
} while (false);
msg.reset(); // optional, just release resource ASAP
diff --git a/src/brpc/policy/sofa_pbrpc_protocol.cpp
b/src/brpc/policy/sofa_pbrpc_protocol.cpp
index f6ca32c2..7584f79b 100644
--- a/src/brpc/policy/sofa_pbrpc_protocol.cpp
+++ b/src/brpc/policy/sofa_pbrpc_protocol.cpp
@@ -420,6 +420,11 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);
+
+ if (!server->AcceptRequest(cntl.get())) {
+ break;
+ }
+
if (span) {
span->ResetServerSpanName(method->full_name());
}
diff --git a/src/brpc/policy/thrift_protocol.cpp
b/src/brpc/policy/thrift_protocol.cpp
index 82b5a789..a746cb0a 100755
--- a/src/brpc/policy/thrift_protocol.cpp
+++ b/src/brpc/policy/thrift_protocol.cpp
@@ -540,6 +540,10 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
" -usercode_in_pthread is on");
}
+ if (!server->AcceptRequest(cntl)) {
+ return;
+ }
+
msg.reset(); // optional, just release resource ASAP
if (span) {
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index b4758ad8..021450fc 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -126,6 +126,8 @@ ServerOptions::ServerOptions()
, mongo_service_adaptor(NULL)
, auth(NULL)
, server_owns_auth(false)
+ , interceptor(NULL)
+ , server_owns_interceptor(false)
, num_threads(8)
, max_concurrency(0)
, session_local_data_factory(NULL)
@@ -450,6 +452,10 @@ Server::~Server() {
delete _options.auth;
_options.auth = NULL;
}
+ if (_options.server_owns_interceptor) {
+ delete _options.interceptor;
+ _options.interceptor = NULL;
+ }
delete _options.redis_service;
_options.redis_service = NULL;
@@ -2174,6 +2180,25 @@ int Server::MaxConcurrencyOf(google::protobuf::Service*
service,
return MaxConcurrencyOf(service->GetDescriptor()->full_name(),
method_name);
}
+bool Server::AcceptRequest(Controller* cntl) const {
+ const Interceptor* interceptor = _options.interceptor;
+ if (!interceptor) {
+ return true;
+ }
+
+ int error_code = 0;
+ std::string error_text;
+ if (cntl &&
+ !interceptor->Accept(cntl, error_code, error_text)) {
+ cntl->SetFailed(error_code,
+ "Reject by Interceptor: %s",
+ error_text.c_str());
+ return false;
+ }
+
+ return true;
+}
+
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
int* al, Server* server) {
diff --git a/src/brpc/server.h b/src/brpc/server.h
index a603a92c..e01670be 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -41,6 +41,7 @@
#include "brpc/adaptive_max_concurrency.h"
#include "brpc/http2.h"
#include "brpc/redis.h"
+#include "brpc/interceptor.h"
namespace brpc {
@@ -91,6 +92,15 @@ struct ServerOptions {
// Default: false
bool server_owns_auth;
+ // Turn on request interception if `interceptor' is not NULL.
+ // Default: NULL
+ const Interceptor* interceptor;
+
+ // false: `interceptor' is not owned by server and must be valid when
server is running.
+ // true: `interceptor' is owned by server and will be deleted when server
is destructed.
+ // Default: false
+ bool server_owns_interceptor;
+
// Number of pthreads that server runs on. Notice that this is just a hint,
// you can't assume that the server uses exactly so many pthreads because
// pthread workers are shared by all servers and channels inside a
@@ -551,6 +561,9 @@ public:
return butil::subtle::NoBarrier_Load(&_concurrency);
};
+ // Returns true if accept request, reject request otherwise.
+ bool AcceptRequest(Controller* cntl) const;
+
private:
friend class StatusService;
friend class ProtobufsService;
diff --git a/test/brpc_interceptor_unittest.cpp
b/test/brpc_interceptor_unittest.cpp
new file mode 100644
index 00000000..b786f103
--- /dev/null
+++ b/test/brpc_interceptor_unittest.cpp
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include "brpc/policy/sofa_pbrpc_protocol.h"
+#include "brpc/channel.h"
+#include "brpc/server.h"
+#include "brpc/interceptor.h"
+#include "brpc/nshead_service.h"
+#include "echo.pb.h"
+
+namespace brpc {
+namespace policy {
+DECLARE_bool(use_http_error_code);
+}
+}
+
+int main(int argc, char* argv[]) {
+ ::testing::InitGoogleTest(&argc, argv);
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+ return RUN_ALL_TESTS();
+}
+
+const int EREJECT = 4000;
+int g_index = 0;
+const int port = 8613;
+const std::string EXP_REQUEST = "hello";
+const std::string EXP_RESPONSE = "world";
+const std::string NSHEAD_EXP_RESPONSE = "error";
+
+class EchoServiceImpl : public ::test::EchoService {
+public:
+ EchoServiceImpl() = default;
+ ~EchoServiceImpl() override = default;
+ void Echo(google::protobuf::RpcController* cntl_base,
+ const ::test::EchoRequest* request,
+ ::test::EchoResponse* response,
+ google::protobuf::Closure* done) override {
+ brpc::ClosureGuard done_guard(done);
+ ASSERT_EQ(EXP_REQUEST, request->message());
+ response->set_message(EXP_RESPONSE);
+ }
+};
+
+// Adapt your own nshead-based protocol to use brpc
+class MyNsheadProtocol : public brpc::NsheadService {
+public:
+ void ProcessNsheadRequest(const brpc::Server&,
+ brpc::Controller* cntl,
+ const brpc::NsheadMessage& request,
+ brpc::NsheadMessage* response,
+ brpc::NsheadClosure* done) {
+ // This object helps you to call done->Run() in RAII style. If you need
+ // to process the request asynchronously, pass done_guard.release().
+ brpc::ClosureGuard done_guard(done);
+
+ response->head = request.head;
+ if (cntl->Failed()) {
+ ASSERT_TRUE(cntl->Failed());
+ ASSERT_EQ(EREJECT, cntl->ErrorCode());
+ response->body.append(NSHEAD_EXP_RESPONSE);
+ return;
+ }
+ response->body.append(EXP_RESPONSE);
+ }
+};
+
+class MyInterceptor : public brpc::Interceptor {
+public:
+ MyInterceptor() = default;
+
+ ~MyInterceptor() override = default;
+
+ bool Accept(const brpc::Controller* controller,
+ int& error_code,
+ std::string& error_txt) const override {
+ if (g_index % 2 == 0) {
+ error_code = EREJECT;
+ error_txt = "reject g_index=0";
+ return false;
+ }
+
+ return true;
+ }
+};
+
+class InterceptorTest : public ::testing::Test {
+public:
+ InterceptorTest() {
+ EXPECT_EQ(0, _server.AddService(&_echo_svc,
+ brpc::SERVER_DOESNT_OWN_SERVICE));
+ brpc::ServerOptions options;
+ options.interceptor = new MyInterceptor;
+ options.nshead_service = new MyNsheadProtocol;
+ options.server_owns_interceptor = true;
+ EXPECT_EQ(0, _server.Start(port, &options));
+ }
+
+ ~InterceptorTest() override = default;
+
+ static void CallMethod(test::EchoService_Stub& stub,
+ ::test::EchoRequest& req,
+ ::test::EchoResponse& res) {
+ for (g_index = 0; g_index < 1000; ++g_index) {
+ brpc::Controller cntl;
+ stub.Echo(&cntl, &req, &res, NULL);
+ if (g_index % 2 == 0) {
+ ASSERT_TRUE(cntl.Failed());
+ ASSERT_EQ(EREJECT, cntl.ErrorCode());
+ } else {
+ ASSERT_FALSE(cntl.Failed());
+ EXPECT_EQ(EXP_RESPONSE, res.message()) << cntl.ErrorText();
+ }
+ }
+ }
+
+private:
+ brpc::Server _server;
+ EchoServiceImpl _echo_svc;
+};
+
+TEST_F(InterceptorTest, sanity) {
+ ::test::EchoRequest req;
+ ::test::EchoResponse res;
+ req.set_message(EXP_REQUEST);
+
+ // PROTOCOL_BAIDU_STD
+ {
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ ASSERT_EQ(0, channel.Init("localhost", port, &options));
+ test::EchoService_Stub stub(&channel);
+ CallMethod(stub, req, res);
+ }
+
+ // PROTOCOL_HTTP
+ {
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_HTTP;
+ ASSERT_EQ(0, channel.Init("localhost", port, &options));
+ test::EchoService_Stub stub(&channel);
+ // Set the x-bd-error-code header of http response to brpc error code.
+ brpc::policy::FLAGS_use_http_error_code = true;
+ CallMethod(stub, req, res);
+ }
+
+ // PROTOCOL_HULU_PBRPC
+ {
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_HULU_PBRPC;
+ ASSERT_EQ(0, channel.Init("localhost", port, &options));
+ test::EchoService_Stub stub(&channel);
+ CallMethod(stub, req, res);
+ }
+
+ // PROTOCOL_SOFA_PBRPC
+ {
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_SOFA_PBRPC;
+ ASSERT_EQ(0, channel.Init("localhost", port, &options));
+ test::EchoService_Stub stub(&channel);
+ CallMethod(stub, req, res);
+ }
+
+ // PROTOCOL_NSHEAD
+ {
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_NSHEAD;
+ ASSERT_EQ(0, channel.Init("localhost", port, &options));
+ brpc::NsheadMessage request;
+ for (g_index = 0; g_index < 1000; ++g_index) {
+ brpc::Controller cntl;
+ brpc::NsheadMessage response;
+ channel.CallMethod(NULL, &cntl, &request, &response, NULL);
+ if (g_index % 2 == 0) {
+ ASSERT_EQ(NSHEAD_EXP_RESPONSE, response.body.to_string());
+ } else {
+ ASSERT_EQ(EXP_RESPONSE, response.body.to_string());
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]