This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new d92e7cff Support SO_BINDTODEVICE and bind client_host (#3179)
d92e7cff is described below
commit d92e7cff8b8f9b8017494f04732c894fea7c1502
Author: Mao <[email protected]>
AuthorDate: Mon Jan 26 10:57:05 2026 +0800
Support SO_BINDTODEVICE and bind client_host (#3179)
* bind_client_ip
* fix UT & review
* add client_host UT
* updated to support SO_BINDTODEVICE.
* updated to support SO_BINDTODEVICE and bind client_host.
* review
---
src/brpc/channel.cpp | 41 ++++++++++++++++++++++++---
src/brpc/channel.h | 10 +++++++
src/brpc/details/naming_service_thread.cpp | 4 +--
src/brpc/details/naming_service_thread.h | 9 +++---
src/brpc/socket.cpp | 25 +++++++++++++++--
src/brpc/socket.h | 5 ++++
src/brpc/socket_map.cpp | 16 +++--------
src/brpc/socket_map.h | 22 +++++++++++++--
test/brpc_server_unittest.cpp | 45 ++++++++++++++++++++++++++++++
9 files changed, 150 insertions(+), 27 deletions(-)
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index 0fd43d7c..a130f613 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -77,6 +77,8 @@ ChannelSSLOptions* ChannelOptions::mutable_ssl_options() {
static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
if (opt.auth == NULL &&
!opt.has_ssl_options() &&
+ opt.client_host.empty() &&
+ opt.device_name.empty() &&
opt.connection_group.empty() &&
opt.hc_option.health_check_path.empty()) {
// Returning zeroized result by default is more intuitive for users.
@@ -94,6 +96,14 @@ static ChannelSignature ComputeChannelSignature(const
ChannelOptions& opt) {
buf.append("|conng=");
buf.append(opt.connection_group);
}
+ if (!opt.client_host.empty()) {
+ buf.append("|clih=");
+ buf.append(opt.client_host);
+ }
+ if (!opt.device_name.empty()) {
+ buf.append("|devn=");
+ buf.append(opt.device_name);
+ }
if (opt.auth) {
buf.append("|auth=");
buf.append((char*)&opt.auth, sizeof(opt.auth));
@@ -362,14 +372,27 @@ int Channel::InitSingle(const butil::EndPoint&
server_addr_and_port,
LOG(ERROR) << "Invalid port=" << port;
return -1;
}
+ butil::EndPoint client_endpoint;
+ if (!_options.client_host.empty() &&
+ butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0
&&
+ butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip)
!= 0) {
+ LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\'';
+ return -1;
+ }
_server_address = server_addr_and_port;
const ChannelSignature sig = ComputeChannelSignature(_options);
std::shared_ptr<SocketSSLContext> ssl_ctx;
if (CreateSocketSSLContext(_options, &ssl_ctx) != 0) {
return -1;
}
+ SocketOptions opt;
+ opt.local_side = client_endpoint;
+ opt.initial_ssl_ctx = ssl_ctx;
+ opt.use_rdma = _options.use_rdma;
+ opt.hc_option = _options.hc_option;
+ opt.device_name = _options.device_name;
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
- &_server_id, ssl_ctx, _options.use_rdma,
_options.hc_option) != 0) {
+ &_server_id, opt) != 0) {
LOG(ERROR) << "Fail to insert into SocketMap";
return -1;
}
@@ -397,6 +420,13 @@ int Channel::Init(const char* ns_url,
_options.mutable_ssl_options()->sni_name = _service_name;
}
}
+ butil::EndPoint client_endpoint;
+ if (!_options.client_host.empty() &&
+ butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0
&&
+ butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip)
!= 0) {
+ LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\'';
+ return -1;
+ }
std::unique_ptr<LoadBalancerWithNaming> lb(new (std::nothrow)
LoadBalancerWithNaming);
if (NULL == lb) {
@@ -406,10 +436,13 @@ int Channel::Init(const char* ns_url,
GetNamingServiceThreadOptions ns_opt;
ns_opt.succeed_without_server = _options.succeed_without_server;
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
- ns_opt.use_rdma = _options.use_rdma;
+ ns_opt.socket_option.use_rdma = _options.use_rdma;
ns_opt.channel_signature = ComputeChannelSignature(_options);
- ns_opt.hc_option = _options.hc_option;
- if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {
+ ns_opt.socket_option.hc_option = _options.hc_option;
+ ns_opt.socket_option.local_side = client_endpoint;
+ ns_opt.socket_option.device_name = _options.device_name;
+ if (CreateSocketSSLContext(_options,
+ &ns_opt.socket_option.initial_ssl_ctx) != 0) {
return -1;
}
if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) {
diff --git a/src/brpc/channel.h b/src/brpc/channel.h
index c970209b..0f349ac6 100644
--- a/src/brpc/channel.h
+++ b/src/brpc/channel.h
@@ -148,6 +148,16 @@ struct ChannelOptions {
// Its priority is higher than FLAGS_health_check_path and
FLAGS_health_check_timeout_ms.
// When it is not set, FLAGS_health_check_path and
FLAGS_health_check_timeout_ms will take effect.
HealthCheckOption hc_option;
+
+ // IP address or host name of the client.
+ // if the client_host is "", the client IP address is determined by the OS.
+ // Default: ""
+ std::string client_host;
+
+ // The device name of the client's network adapter.
+ // if the device_name is "", the flow control is determined by the OS.
+ // Default: ""
+ std::string device_name;
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ChannelOptions from being bloated in most cases.
diff --git a/src/brpc/details/naming_service_thread.cpp
b/src/brpc/details/naming_service_thread.cpp
index 341ca35b..f882b225 100644
--- a/src/brpc/details/naming_service_thread.cpp
+++ b/src/brpc/details/naming_service_thread.cpp
@@ -125,8 +125,8 @@ void NamingServiceThread::Actions::ResetServers(
// Socket. SocketMapKey may be passed through AddWatcher. Make
sure
// to pick those Sockets with the right settings during
OnAddedServers
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
- CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
_owner->_options.ssl_ctx,
- _owner->_options.use_rdma,
_owner->_options.hc_option));
+ CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
+ _owner->_options.socket_option));
_added_sockets.push_back(tagged_id);
}
diff --git a/src/brpc/details/naming_service_thread.h
b/src/brpc/details/naming_service_thread.h
index 1745e5f2..9acb8f29 100644
--- a/src/brpc/details/naming_service_thread.h
+++ b/src/brpc/details/naming_service_thread.h
@@ -44,15 +44,14 @@ public:
struct GetNamingServiceThreadOptions {
GetNamingServiceThreadOptions()
: succeed_without_server(false)
- , log_succeed_without_server(true)
- , use_rdma(false) {}
+ , log_succeed_without_server(true) {
+ socket_option.use_rdma = false;
+}
bool succeed_without_server;
bool log_succeed_without_server;
- bool use_rdma;
- HealthCheckOption hc_option;
ChannelSignature channel_signature;
- std::shared_ptr<SocketSSLContext> ssl_ctx;
+ SocketOptions socket_option;
};
// A dedicated thread to map a name to ServerIds
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 9490650b..e431acef 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -728,7 +728,8 @@ int Socket::OnCreated(const SocketOptions& options) {
_keytable_pool = options.keytable_pool;
_tos = 0;
_remote_side = options.remote_side;
- _local_side = butil::EndPoint();
+ _local_side = options.local_side;
+ _device_name = options.device_name;
_on_edge_triggered_events = options.on_edge_triggered_events;
_user = options.user;
_conn = options.conn;
@@ -1296,7 +1297,25 @@ int Socket::Connect(const timespec* abstime,
CHECK_EQ(0, butil::make_close_on_exec(sockfd));
// We need to do async connect (to manage the timeout by ourselves).
CHECK_EQ(0, butil::make_non_blocking(sockfd));
-
+ if (!_device_name.empty()) {
+ if (setsockopt(sockfd, SOL_SOCKET, SO_BINDTODEVICE,
+ _device_name.c_str(), _device_name.size()) < 0) {
+ PLOG(ERROR) << "Fail to set SO_BINDTODEVICE of fd=" << sockfd
+ << " to device_name=" << _device_name;
+ return -1;
+ }
+ }
+ if (local_side().ip != butil::IP_ANY) {
+ struct sockaddr_storage cli_addr;
+ if (butil::endpoint2sockaddr(local_side(), &cli_addr, &addr_size) !=
0) {
+ PLOG(ERROR) << "Fail to get client sockaddr";
+ return -1;
+ }
+ if (::bind(sockfd, (struct sockaddr*)&cli_addr, addr_size) != 0) {
+ PLOG(ERROR) << "Fail to bind client socket, errno=" <<
strerror(errno);
+ return -1;
+ }
+ }
const int rc = ::connect(
sockfd, (struct sockaddr*)&serv_addr, addr_size);
if (rc != 0 && errno != EINPROGRESS) {
@@ -2811,6 +2830,7 @@ int Socket::GetPooledSocket(SocketUniquePtr*
pooled_socket) {
if (socket_pool == NULL) {
SocketOptions opt;
opt.remote_side = remote_side();
+ opt.local_side = butil::EndPoint(local_side().ip, 0);
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
@@ -2912,6 +2932,7 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket)
{
SocketId id;
SocketOptions opt;
opt.remote_side = remote_side();
+ opt.local_side = butil::EndPoint(local_side().ip, 0);
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index 03ad43f8..a3e23230 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -250,6 +250,8 @@ struct SocketOptions {
// user->BeforeRecycle() before recycling.
int fd{-1};
butil::EndPoint remote_side;
+ butil::EndPoint local_side;
+ std::string device_name;
// If `connect_on_create' is true and `fd' is less than 0,
// a client connection will be established to remote_side()
// regarding deadline `connect_abstime' when Socket is being created.
@@ -830,6 +832,9 @@ private:
// Address of self. Initialized in ResetFileDescriptor().
butil::EndPoint _local_side;
+ // The device name of the client's network adapter.
+ std::string _device_name;
+
// Called when edge-triggered events happened on `_fd'. Read comments
// of EventDispatcher::AddConsumer (event_dispatcher.h)
// carefully before implementing the callback.
diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp
index 14bea71d..3984f6b8 100644
--- a/src/brpc/socket_map.cpp
+++ b/src/brpc/socket_map.cpp
@@ -90,11 +90,9 @@ SocketMap* get_or_new_client_side_socket_map() {
}
int SocketMapInsert(const SocketMapKey& key, SocketId* id,
- const std::shared_ptr<SocketSSLContext>& ssl_ctx,
- bool use_rdma,
- const HealthCheckOption& hc_option) {
- return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx,
use_rdma, hc_option);
-}
+ SocketOptions& opt) {
+ return get_or_new_client_side_socket_map()->Insert(key, id, opt);
+}
int SocketMapFind(const SocketMapKey& key, SocketId* id) {
SocketMap* m = get_client_side_socket_map();
@@ -227,9 +225,7 @@ void SocketMap::ShowSocketMapInBvarIfNeed() {
}
int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
- const std::shared_ptr<SocketSSLContext>& ssl_ctx,
- bool use_rdma,
- const HealthCheckOption& hc_option) {
+ SocketOptions& opt) {
ShowSocketMapInBvarIfNeed();
std::unique_lock<butil::Mutex> mu(_mutex);
@@ -249,11 +245,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId*
id,
sc = NULL;
}
SocketId tmp_id;
- SocketOptions opt;
opt.remote_side = key.peer.addr;
- opt.initial_ssl_ctx = ssl_ctx;
- opt.use_rdma = use_rdma;
- opt.hc_option = hc_option;
if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) {
PLOG(FATAL) << "Fail to create socket to " << key.peer;
return -1;
diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h
index b0d542e7..7cf08804 100644
--- a/src/brpc/socket_map.h
+++ b/src/brpc/socket_map.h
@@ -80,9 +80,19 @@ struct SocketMapKeyHasher {
// successfully, SocketMapRemove() MUST be called when the Socket is not
needed.
// Return 0 on success, -1 otherwise.
int SocketMapInsert(const SocketMapKey& key, SocketId* id,
+ SocketOptions& opt);
+
+inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
- const HealthCheckOption& hc_option);
+ const HealthCheckOption& hc_option) {
+ SocketOptions opt;
+ opt.remote_side = key.peer.addr;
+ opt.initial_ssl_ctx = ssl_ctx;
+ opt.use_rdma = use_rdma;
+ opt.hc_option = hc_option;
+ return SocketMapInsert(key, id, opt);
+}
inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
@@ -155,7 +165,14 @@ public:
int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
bool use_rdma,
- const HealthCheckOption& hc_option);
+ const HealthCheckOption& hc_option) {
+ SocketOptions opt;
+ opt.remote_side = key.peer.addr;
+ opt.initial_ssl_ctx = ssl_ctx;
+ opt.use_rdma = use_rdma;
+ opt.hc_option = hc_option;
+ return Insert(key, id, opt);
+}
int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
@@ -167,6 +184,7 @@ public:
HealthCheckOption hc_option;
return Insert(key, id, empty_ptr, false, hc_option);
}
+ int Insert(const SocketMapKey& key, SocketId* id, SocketOptions& opt);
void Remove(const SocketMapKey& key, SocketId expected_id);
int Find(const SocketMapKey& key, SocketId* id);
diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp
index 4a774fab..8508a798 100644
--- a/test/brpc_server_unittest.cpp
+++ b/test/brpc_server_unittest.cpp
@@ -2070,4 +2070,49 @@ TEST_F(ServerTest, auth) {
ASSERT_EQ(0, server.Join());
}
+void TestClientHost(const butil::EndPoint& ep,
+ brpc::Controller& cntl,
+ int error_code, bool failed,
+ brpc::ChannelOptions& copt) {
+ brpc::Channel chan;
+ copt.max_retry = 0;
+ ASSERT_EQ(0, chan.Init(ep, &copt));
+
+ test::EchoRequest req;
+ test::EchoResponse res;
+ req.set_message(EXP_REQUEST);
+ test::EchoService_Stub stub(&chan);
+ stub.Echo(&cntl, &req, &res, NULL);
+ ASSERT_EQ(cntl.Failed(), failed) << cntl.ErrorText();
+ ASSERT_EQ(cntl.ErrorCode(), error_code);
+}
+
+TEST_F(ServerTest, bind_client_host_and_network_device) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+ brpc::Server server;
+ EchoServiceImpl service;
+ ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+ brpc::ServerOptions opt;
+ ASSERT_EQ(0, server.Start(ep, &opt));
+
+ brpc::Controller cntl;
+ brpc::ChannelOptions copt;
+ copt.client_host = "localhost";
+ copt.device_name = "lo";
+ std::vector<brpc::ConnectionType> connection_types = {
+ brpc::CONNECTION_TYPE_SINGLE,
+ brpc::CONNECTION_TYPE_POOLED,
+ brpc::CONNECTION_TYPE_SHORT
+ };
+ for (auto connect_type : connection_types) {
+ copt.connection_type = connect_type;
+ TestClientHost(ep, cntl, 0, false, copt);
+ cntl.Reset();
+ }
+
+ ASSERT_EQ(0, server.Stop(0));
+ ASSERT_EQ(0, server.Join());
+}
+
} //namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]