This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 02879c89 Add Transport to support more communication protocol
extensions (#3199)
02879c89 is described below
commit 02879c89d962f1b6363979437515131b4457ffb8
Author: Chuang Zhang <[email protected]>
AuthorDate: Tue Jan 27 10:59:00 2026 +0800
Add Transport to support more communication protocol extensions (#3199)
* Add The transport layer to support communication protocols of different
device vendors.
* Refine the SocketMode name style and clean some unused code
* Refine Transport Debug method param and RdmaTransport WaitEpollOut code
* format the code, remove indentation for top class and variables in new
file
* review code
---------
Co-authored-by: wenjiecn <[email protected]>
---
example/rdma_performance/client.cpp | 2 +-
example/rdma_performance/server.cpp | 2 +-
src/brpc/acceptor.cpp | 16 +-
src/brpc/acceptor.h | 5 +-
src/brpc/channel.cpp | 38 +----
src/brpc/channel.h | 7 +-
src/brpc/details/naming_service_thread.cpp | 2 +-
src/brpc/details/naming_service_thread.h | 3 +-
src/brpc/input_message_base.h | 1 +
src/brpc/input_messenger.cpp | 69 ++-------
src/brpc/input_messenger.h | 37 ++---
src/brpc/rdma/rdma_endpoint.cpp | 51 ++++---
src/brpc/rdma_transport.cpp | 238 +++++++++++++++++++++++++++++
src/brpc/rdma_transport.h | 65 ++++++++
src/brpc/server.cpp | 42 +----
src/brpc/server.h | 7 +-
src/brpc/socket.cpp | 160 ++++---------------
src/brpc/socket.h | 34 +++--
src/brpc/socket_map.h | 16 +-
src/brpc/socket_mode.h | 26 ++++
src/brpc/tcp_transport.cpp | 94 ++++++++++++
src/brpc/tcp_transport.h | 41 +++++
src/brpc/transport.h | 66 ++++++++
src/brpc/transport_factory.cpp | 51 +++++++
src/brpc/transport_factory.h | 40 +++++
25 files changed, 775 insertions(+), 338 deletions(-)
diff --git a/example/rdma_performance/client.cpp
b/example/rdma_performance/client.cpp
index 57d0c06c..a7ed2c99 100644
--- a/example/rdma_performance/client.cpp
+++ b/example/rdma_performance/client.cpp
@@ -102,7 +102,7 @@ public:
int Init() {
brpc::ChannelOptions options;
- options.use_rdma = FLAGS_use_rdma;
+ options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA :
brpc::SOCKET_MODE_TCP;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_rpc_timeout_ms;
diff --git a/example/rdma_performance/server.cpp
b/example/rdma_performance/server.cpp
index d3d00057..2e93e1ee 100644
--- a/example/rdma_performance/server.cpp
+++ b/example/rdma_performance/server.cpp
@@ -76,7 +76,7 @@ int main(int argc, char* argv[]) {
g_last_time.store(0, butil::memory_order_relaxed);
brpc::ServerOptions options;
- options.use_rdma = FLAGS_use_rdma;
+ options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA :
brpc::SOCKET_MODE_TCP;
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp
index fd6564c9..f9c22a68 100644
--- a/src/brpc/acceptor.cpp
+++ b/src/brpc/acceptor.cpp
@@ -21,8 +21,8 @@
#include "butil/fd_guard.h" // fd_guard
#include "butil/fd_utility.h" // make_close_on_exec
#include "butil/time.h" // gettimeofday_us
-#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/acceptor.h"
+#include "brpc/transport_factory.h"
namespace brpc {
@@ -40,7 +40,7 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
, _empty_cond(&_map_mutex)
, _force_ssl(false)
, _ssl_ctx(NULL)
- , _use_rdma(false)
+ , _socket_mode(SOCKET_MODE_TCP)
, _bthread_tag(BTHREAD_TAG_DEFAULT) {
}
@@ -282,18 +282,10 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket*
acception) {
options.fd = in_fd;
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
options.user = acception->user();
+ options.need_on_edge_trigger = true;
options.force_ssl = am->_force_ssl;
options.initial_ssl_ctx = am->_ssl_ctx;
-#if BRPC_WITH_RDMA
- if (am->_use_rdma) {
- options.on_edge_triggered_events =
rdma::RdmaEndpoint::OnNewDataFromTcp;
- } else {
-#else
- {
-#endif
- options.on_edge_triggered_events = InputMessenger::OnNewMessages;
- }
- options.use_rdma = am->_use_rdma;
+ options.socket_mode = am->_socket_mode;
options.bthread_tag = am->_bthread_tag;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
diff --git a/src/brpc/acceptor.h b/src/brpc/acceptor.h
index 69f632aa..77942bec 100644
--- a/src/brpc/acceptor.h
+++ b/src/brpc/acceptor.h
@@ -22,6 +22,7 @@
#include "butil/synchronization/condition_variable.h"
#include "butil/containers/flat_map.h"
#include "brpc/input_messenger.h"
+#include "brpc/socket_mode.h"
namespace brpc {
@@ -110,8 +111,8 @@ private:
bool _force_ssl;
std::shared_ptr<SocketSSLContext> _ssl_ctx;
- // Whether to use rdma or not
- bool _use_rdma;
+ // Choose to use a certain socket: 0 TCP, 1 RDMA
+ SocketMode _socket_mode;
// Acceptor belongs to this tag
bthread_tag_t _bthread_tag;
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index a130f613..86124c25 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -37,6 +37,7 @@
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/rdma/rdma_helper.h"
#include "brpc/policy/esp_authenticator.h"
+#include "brpc/transport_factory.h"
namespace brpc {
@@ -60,7 +61,7 @@ ChannelOptions::ChannelOptions()
, connection_type(CONNECTION_TYPE_UNKNOWN)
, succeed_without_server(true)
, log_succeed_without_server(true)
- , use_rdma(false)
+ , socket_mode(SOCKET_MODE_TCP)
, auth(NULL)
, backup_request_policy(NULL)
, retry_policy(NULL)
@@ -130,7 +131,7 @@ static ChannelSignature ComputeChannelSignature(const
ChannelOptions& opt) {
} else {
// All disabled ChannelSSLOptions are the same
}
- if (opt.use_rdma) {
+ if (opt.socket_mode == SOCKET_MODE_RDMA) {
buf.append("|rdma");
}
butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
@@ -173,20 +174,6 @@ Channel::~Channel() {
}
}
-#if BRPC_WITH_RDMA
-static bool OptionsAvailableForRdma(const ChannelOptions* opt) {
- if (opt->has_ssl_options()) {
- LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
- return false;
- }
- if (!rdma::SupportedByRdma(opt->protocol.name())) {
- LOG(WARNING) << "Cannot use " << opt->protocol.name()
- << " over RDMA";
- return false;
- }
- return true;
-}
-#endif
int Channel::InitChannelOptions(const ChannelOptions* options) {
if (options) { // Override default options if user provided one.
@@ -201,19 +188,10 @@ int Channel::InitChannelOptions(const ChannelOptions*
options) {
_options.hc_option.health_check_path = FLAGS_health_check_path;
_options.hc_option.health_check_timeout_ms =
FLAGS_health_check_timeout_ms;
}
- if (_options.use_rdma) {
-#if BRPC_WITH_RDMA
- if (!OptionsAvailableForRdma(&_options)) {
- return -1;
- }
- rdma::GlobalRdmaInitializeOrDie();
- if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
- return -1;
- }
-#else
- LOG(WARNING) << "Cannot use rdma since brpc does not compile with
rdma";
+ auto ret = TransportFactory::ContextInitOrDie(_options.socket_mode, false,
&_options);
+ if (ret != 0) {
+ LOG(ERROR) << "Fail to initialize transport context for channel, ret="
<< ret;
return -1;
-#endif
}
_serialize_request = protocol->serialize_request;
@@ -388,7 +366,7 @@ int Channel::InitSingle(const butil::EndPoint&
server_addr_and_port,
SocketOptions opt;
opt.local_side = client_endpoint;
opt.initial_ssl_ctx = ssl_ctx;
- opt.use_rdma = _options.use_rdma;
+ opt.socket_mode = _options.socket_mode;
opt.hc_option = _options.hc_option;
opt.device_name = _options.device_name;
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
@@ -436,7 +414,7 @@ int Channel::Init(const char* ns_url,
GetNamingServiceThreadOptions ns_opt;
ns_opt.succeed_without_server = _options.succeed_without_server;
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
- ns_opt.socket_option.use_rdma = _options.use_rdma;
+ ns_opt.socket_option.socket_mode = _options.socket_mode;
ns_opt.channel_signature = ComputeChannelSignature(_options);
ns_opt.socket_option.hc_option = _options.hc_option;
ns_opt.socket_option.local_side = client_endpoint;
diff --git a/src/brpc/channel.h b/src/brpc/channel.h
index 0f349ac6..7c257c05 100644
--- a/src/brpc/channel.h
+++ b/src/brpc/channel.h
@@ -37,6 +37,7 @@
#include "brpc/backup_request_policy.h"
#include "brpc/naming_service_filter.h"
#include "brpc/health_check_option.h"
+#include "brpc/socket_mode.h"
namespace brpc {
@@ -105,9 +106,9 @@ struct ChannelOptions {
const ChannelSSLOptions& ssl_options() const { return *_ssl_options; }
ChannelSSLOptions* mutable_ssl_options();
- // Let this channel use rdma rather than tcp.
- // Default: false
- bool use_rdma;
+ // Let this channel Choose to use a certain socket: 0 SOCKET_MODE_TCP, 1
SOCKET_MODE_RDMA.
+ // Default: SOCKET_MODE_TCP
+ SocketMode socket_mode;
// Turn on authentication for this channel if `auth' is not NULL.
// Note `auth' will not be deleted by channel and must remain valid when
diff --git a/src/brpc/details/naming_service_thread.cpp
b/src/brpc/details/naming_service_thread.cpp
index f882b225..7eb005e8 100644
--- a/src/brpc/details/naming_service_thread.cpp
+++ b/src/brpc/details/naming_service_thread.cpp
@@ -125,7 +125,7 @@ void NamingServiceThread::Actions::ResetServers(
// Socket. SocketMapKey may be passed through AddWatcher. Make
sure
// to pick those Sockets with the right settings during
OnAddedServers
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
- CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
+ CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
_owner->_options.socket_option));
_added_sockets.push_back(tagged_id);
}
diff --git a/src/brpc/details/naming_service_thread.h
b/src/brpc/details/naming_service_thread.h
index 9acb8f29..f01fbea6 100644
--- a/src/brpc/details/naming_service_thread.h
+++ b/src/brpc/details/naming_service_thread.h
@@ -27,6 +27,7 @@
#include "brpc/naming_service.h" // NamingService
#include "brpc/naming_service_filter.h" // NamingServiceFilter
#include "brpc/socket_map.h"
+#include "brpc/socket_mode.h"
namespace brpc {
@@ -45,7 +46,7 @@ struct GetNamingServiceThreadOptions {
GetNamingServiceThreadOptions()
: succeed_without_server(false)
, log_succeed_without_server(true) {
- socket_option.use_rdma = false;
+ socket_option.socket_mode = SOCKET_MODE_TCP;
}
bool succeed_without_server;
diff --git a/src/brpc/input_message_base.h b/src/brpc/input_message_base.h
index 86b25785..b117eb99 100644
--- a/src/brpc/input_message_base.h
+++ b/src/brpc/input_message_base.h
@@ -55,6 +55,7 @@ private:
friend class InputMessenger;
friend void* ProcessInputMessage(void*);
friend class Stream;
+friend class Transport;
int64_t _received_us;
int64_t _base_real_us;
SocketUniquePtr _socket;
diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index 1b8a86f2..925c8776 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -29,7 +29,7 @@
#include "brpc/protocol.h" // ListProtocols
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/input_messenger.h"
-
+#include "brpc/transport_factory.h"
namespace brpc {
@@ -112,8 +112,7 @@ ParseResult InputMessenger::CutInputMessage(
// The length of `data' must be PROTO_DUMMY_LEN + 1 to
store extra ending char '\0'
char data[PROTO_DUMMY_LEN + 1];
m->_read_buf.copy_to_cstr(data, PROTO_DUMMY_LEN);
- if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0 &&
- m->_rdma_state == Socket::RDMA_OFF) {
+ if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0) {
// To avoid timeout when client uses RDMA but server
uses TCP
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
@@ -191,46 +190,13 @@ struct RunLastMessage {
}
};
-static void QueueMessage(InputMessageBase* to_run_msg,
- int* num_bthread_created,
- bthread_keytable_pool_t* keytable_pool) {
- if (!to_run_msg) {
- return;
- }
-
-#if BRPC_WITH_RDMA
- if (rdma::FLAGS_rdma_disable_bthread) {
- ProcessInputMessage(to_run_msg);
- return;
- }
-#endif
- // Create bthread for last_msg. The bthread is not scheduled
- // until bthread_flush() is called (in the worse case).
-
- // TODO(gejun): Join threads.
- bthread_t th;
- bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
- BTHREAD_ATTR_PTHREAD :
- BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
- tmp.keytable_pool = keytable_pool;
- tmp.tag = bthread_self_tag();
- bthread_attr_set_name(&tmp, "ProcessInputMessage");
-
- if (!FLAGS_usercode_in_coroutine && bthread_start_background(
- &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
- ++*num_bthread_created;
- } else {
- ProcessInputMessage(to_run_msg);
- }
-}
-
-InputMessenger::InputMessageClosure::~InputMessageClosure() noexcept(false) {
+InputMessageClosure::~InputMessageClosure() noexcept(false) {
if (_msg) {
ProcessInputMessage(_msg);
}
}
-void InputMessenger::InputMessageClosure::reset(InputMessageBase* m) {
+void InputMessageClosure::reset(InputMessageBase* m) {
if (_msg) {
ProcessInputMessage(_msg);
}
@@ -303,7 +269,7 @@ int InputMessenger::ProcessNewMessage(
// This unique_ptr prevents msg to be lost before transfering
// ownership to last_msg
DestroyingPtr<InputMessageBase> msg(pr.message());
- QueueMessage(last_msg.release(), &num_bthread_created,
m->_keytable_pool);
+ m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
if (_handlers[index].process == NULL) {
LOG(ERROR) << "process of index=" << index << " is NULL";
continue;
@@ -336,22 +302,19 @@ int InputMessenger::ProcessNewMessage(
// Transfer ownership to last_msg
last_msg.reset(msg.release());
} else {
- QueueMessage(msg.release(), &num_bthread_created,
- m->_keytable_pool);
+ last_msg.reset(msg.release());
+ m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
bthread_flush();
num_bthread_created = 0;
}
}
-#if BRPC_WITH_RDMA
// In RDMA polling mode, all messages must be executed in a new bthread and
// not in the bthread where the polling bthread is located, because the
// method for processing messages may call synchronization primitives,
// causing the polling bthread to be scheduled out.
- if (rdma::FLAGS_rdma_use_polling) {
- QueueMessage(last_msg.release(), &num_bthread_created,
- m->_keytable_pool);
+ if (m->_socket_mode == SOCKET_MODE_RDMA) {
+ m->_transport->QueueMessage(last_msg, &num_bthread_created, true);
}
-#endif
if (num_bthread_created) {
bthread_flush();
}
@@ -414,8 +377,7 @@ void InputMessenger::OnNewMessages(Socket* m) {
}
}
- if (m->_rdma_state == Socket::RDMA_OFF && messenger->ProcessNewMessage(
- m, nr, read_eof, received_us, base_realtime, last_msg) <
0) {
+ if (messenger->ProcessNewMessage(m, nr, read_eof, received_us,
base_realtime, last_msg) < 0) {
return;
}
}
@@ -533,16 +495,7 @@ int InputMessenger::Create(const butil::EndPoint&
remote_side,
int InputMessenger::Create(SocketOptions options, SocketId* id) {
options.user = this;
-#if BRPC_WITH_RDMA
- if (options.use_rdma) {
- options.on_edge_triggered_events =
rdma::RdmaEndpoint::OnNewDataFromTcp;
- options.app_connect = std::make_shared<rdma::RdmaConnect>();
- } else {
-#else
- {
-#endif
- options.on_edge_triggered_events = OnNewMessages;
- }
+ options.need_on_edge_trigger = true;
// Enable keepalive by options or Gflag.
// Priority: options > Gflag.
if (options.keepalive_options || FLAGS_socket_keepalive) {
diff --git a/src/brpc/input_messenger.h b/src/brpc/input_messenger.h
index 1c191a87..8482c3f3 100644
--- a/src/brpc/input_messenger.h
+++ b/src/brpc/input_messenger.h
@@ -29,7 +29,7 @@ namespace brpc {
namespace rdma {
class RdmaEndpoint;
}
-
+class TcpTransport;
struct InputMessageHandler {
// The callback to cut a message from `source'.
// Returned message will be passed to process_request or process_response
@@ -70,9 +70,28 @@ struct InputMessageHandler {
const char* name;
};
+class InputMessageClosure {
+public:
+ InputMessageClosure() : _msg(NULL) { }
+ ~InputMessageClosure() noexcept(false);
+
+ InputMessageBase* release() {
+ InputMessageBase* m = _msg;
+ _msg = NULL;
+ return m;
+ }
+
+ void reset(InputMessageBase* m);
+
+private:
+ InputMessageBase* _msg;
+};
+
// Process messages from connections.
// `Message' corresponds to a client's request or a server's response.
class InputMessenger : public SocketUser {
+friend class Socket;
+friend class TcpTransport;
friend class rdma::RdmaEndpoint;
public:
explicit InputMessenger(size_t capacity = 128);
@@ -111,22 +130,6 @@ protected:
static void OnNewMessages(Socket* m);
private:
- class InputMessageClosure {
- public:
- InputMessageClosure() : _msg(NULL) { }
- ~InputMessageClosure() noexcept(false);
-
- InputMessageBase* release() {
- InputMessageBase* m = _msg;
- _msg = NULL;
- return m;
- }
-
- void reset(InputMessageBase* m);
-
- private:
- InputMessageBase* _msg;
- };
// Find a valid scissor from `handlers' to cut off `header' and `payload'
// from m->read_buf, save index of the scissor into `index'.
diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 616ef332..3cc2107f 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -30,6 +30,7 @@
#include "brpc/rdma/block_pool.h"
#include "brpc/rdma/rdma_helper.h"
#include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma_transport.h"
DECLARE_int32(task_group_ntags);
@@ -239,14 +240,15 @@ void RdmaEndpoint::Reset() {
void RdmaConnect::StartConnect(const Socket* socket,
void (*done)(int err, void* data),
void* data) {
- CHECK(socket->_rdma_ep != NULL);
+ auto* rdma_transport =
static_cast<RdmaTransport*>(socket->_transport.get());
+ CHECK(rdma_transport->_rdma_ep != NULL);
SocketUniquePtr s;
if (Socket::Address(socket->id(), &s) != 0) {
return;
}
if (!IsRdmaAvailable()) {
- socket->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
done(0, data);
return;
}
@@ -256,7 +258,7 @@ void RdmaConnect::StartConnect(const Socket* socket,
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "RdmaProcessHandshakeAtClient");
if (bthread_start_background(&tid, &attr,
- RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0)
{
+ RdmaEndpoint::ProcessHandshakeAtClient,
rdma_transport->_rdma_ep) < 0) {
LOG(FATAL) << "Fail to start handshake bthread";
Run();
} else {
@@ -299,7 +301,8 @@ static void TryReadOnTcpDuringRdmaEst(Socket* s) {
}
void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
- RdmaEndpoint* ep = m->_rdma_ep;
+ auto* rdma_transport = static_cast<RdmaTransport*>(m->_transport.get());
+ RdmaEndpoint* ep = rdma_transport->GetRdmaEp();
CHECK(ep != NULL);
int progress = Socket::PROGRESS_INIT;
@@ -308,7 +311,7 @@ void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
if (!m->CreatedByConnect()) {
if (!IsRdmaAvailable()) {
ep->_state = FALLBACK_TCP;
- m->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
continue;
}
bthread_t tid;
@@ -433,9 +436,10 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
// First initialize CQ and QP resources
ep->_state = C_ALLOC_QPCQ;
+ auto* rdma_transport = static_cast<RdmaTransport*>(s->_transport.get());
if (ep->AllocateResources() < 0) {
LOG(WARNING) << "Fallback to tcp:" << s->description();
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
ep->_state = FALLBACK_TCP;
return NULL;
}
@@ -514,7 +518,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
if (!HelloNegotiationValid(remote_msg)) {
LOG(WARNING) << "Fail to negotiate with server, fallback to tcp:"
<< s->description();
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
} else {
ep->_remote_recv_block_size = remote_msg.block_size;
ep->_local_window_capacity =
@@ -530,16 +534,16 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
ep->_state = C_BRINGUP_QP;
if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) <
0) {
LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" <<
s->description();
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
} else {
- s->_rdma_state = Socket::RDMA_ON;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_ON;
}
}
// Send ACK message to server
ep->_state = C_ACK_SEND;
uint32_t flags = 0;
- if (s->_rdma_state != Socket::RDMA_OFF) {
+ if (rdma_transport->_rdma_state != RdmaTransport::RDMA_OFF) {
flags |= ACK_MSG_RDMA_OK;
}
uint32_t* tmp = (uint32_t*)data; // avoid GCC warning on strict-aliasing
@@ -553,7 +557,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
return NULL;
}
- if (s->_rdma_state == Socket::RDMA_ON) {
+ if (rdma_transport->_rdma_state == RdmaTransport::RDMA_ON) {
ep->_state = ESTABLISHED;
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
<< "Client handshake ends (use rdma) on " << s->description();
@@ -586,7 +590,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
ep->_state = FAILED;
return NULL;
}
-
+ auto* rdma_transport = static_cast<RdmaTransport*>(s->_transport.get());
if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
LOG_IF(INFO, FLAGS_rdma_trace_verbose) << "It seems that the "
<< "client does not use RDMA, fallback to TCP:"
@@ -594,7 +598,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
// we need to copy data read back to _socket->_read_buf
s->_read_buf.append(data, MAGIC_STR_LEN);
ep->_state = FALLBACK_TCP;
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
ep->TryReadOnTcp();
return NULL;
}
@@ -626,7 +630,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
if (!HelloNegotiationValid(remote_msg)) {
LOG(WARNING) << "Fail to negotiate with client, fallback to tcp:"
<< s->description();
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
} else {
ep->_remote_recv_block_size = remote_msg.block_size;
ep->_local_window_capacity =
@@ -643,13 +647,13 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
if (ep->AllocateResources() < 0) {
LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"
<< s->description();
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
} else {
ep->_state = S_BRINGUP_QP;
if (ep->BringUpQp(remote_msg.lid, remote_msg.gid,
remote_msg.qp_num) < 0) {
LOG(WARNING) << "Fail to bringup QP, fallback to tcp:"
<< s->description();
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
}
}
}
@@ -658,7 +662,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
ep->_state = S_HELLO_SEND;
HelloMessage local_msg;
local_msg.msg_len = g_rdma_hello_msg_len;
- if (s->_rdma_state == Socket::RDMA_OFF) {
+ if (rdma_transport->_rdma_state == RdmaTransport::RDMA_OFF) {
local_msg.impl_ver = 0;
local_msg.hello_ver = 0;
} else {
@@ -702,7 +706,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
uint32_t* tmp = (uint32_t*)data; // avoid GCC warning on strict-aliasing
uint32_t flags = butil::NetToHost32(*tmp);
if (flags & ACK_MSG_RDMA_OK) {
- if (s->_rdma_state == Socket::RDMA_OFF) {
+ if (rdma_transport->_rdma_state == RdmaTransport::RDMA_OFF) {
LOG(WARNING) << "Fail to parse Hello Message length from client:"
<< s->description();
s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
@@ -710,13 +714,13 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
ep->_state = FAILED;
return NULL;
} else {
- s->_rdma_state = Socket::RDMA_ON;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_ON;
ep->_state = ESTABLISHED;
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
<< "Server handshake ends (use rdma) on " << s->description();
}
} else {
- s->_rdma_state = Socket::RDMA_OFF;
+ rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
ep->_state = FALLBACK_TCP;
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
<< "Server handshake ends (use tcp) on " << s->description();
@@ -1455,7 +1459,8 @@ void RdmaEndpoint::PollCq(Socket* m) {
if (Socket::Address(ep->_socket->id(), &s) < 0) {
return;
}
- CHECK(ep == s->_rdma_ep);
+ auto* rdma_transport = static_cast<RdmaTransport*>(s->_transport.get());
+ CHECK(ep == rdma_transport->_rdma_ep);
bool send = false;
ibv_cq* cq = ep->_resource->recv_cq;
@@ -1472,7 +1477,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
int progress = Socket::PROGRESS_INIT;
bool notified = false;
- InputMessenger::InputMessageClosure last_msg;
+ InputMessageClosure last_msg;
ibv_wc wc[FLAGS_rdma_cqe_poll_once];
while (true) {
int cnt = ibv_poll_cq(cq, FLAGS_rdma_cqe_poll_once, wc);
diff --git a/src/brpc/rdma_transport.cpp b/src/brpc/rdma_transport.cpp
new file mode 100644
index 00000000..d980c5a0
--- /dev/null
+++ b/src/brpc/rdma_transport.cpp
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include "brpc/rdma_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma/rdma_helper.h"
+
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector *g_vars;
+
+void RdmaTransport::Init(Socket *socket, const SocketOptions &options) {
+ CHECK(_rdma_ep == NULL);
+ if (options.socket_mode == SOCKET_MODE_RDMA) {
+ _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket);
+ if (!_rdma_ep) {
+ const int saved_errno = errno;
+ PLOG(ERROR) << "Fail to create RdmaEndpoint";
+ socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+ berror(saved_errno));
+ }
+ _rdma_state = RDMA_UNKNOWN;
+ } else {
+ _rdma_state = RDMA_OFF;
+ socket->_socket_mode = SOCKET_MODE_TCP;
+ }
+ _socket = socket;
+ _default_connect = options.app_connect;
+ _on_edge_trigger = options.on_edge_triggered_events;
+ if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+ _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
+ }
+ _tcp_transport = std::make_shared<TcpTransport>();
+ _tcp_transport->Init(socket, options);
+}
+
+void RdmaTransport::Release() {
+ if (_rdma_ep) {
+ delete _rdma_ep;
+ _rdma_ep = NULL;
+ _rdma_state = RDMA_UNKNOWN;
+ }
+}
+
+int RdmaTransport::Reset(int32_t expected_nref) {
+ if (_rdma_ep) {
+ _rdma_ep->Reset();
+ _rdma_state = RDMA_UNKNOWN;
+ }
+ return 0;
+}
+
+std::shared_ptr<AppConnect> RdmaTransport::Connect() {
+ if (_default_connect == nullptr) {
+ return std::make_shared<rdma::RdmaConnect>();
+ }
+ return _default_connect;
+}
+
+int RdmaTransport::CutFromIOBuf(butil::IOBuf *buf) {
+ if (_rdma_ep && _rdma_state != RDMA_OFF) {
+ butil::IOBuf *data_arr[1] = {buf};
+ return _rdma_ep->CutFromIOBufList(data_arr, 1);
+ } else {
+ return _tcp_transport->CutFromIOBuf(buf);
+ }
+}
+
+ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, size_t ndata) {
+ if (_rdma_ep && _rdma_state != RDMA_OFF) {
+ return _rdma_ep->CutFromIOBufList(buf, ndata);
+ }
+ return _tcp_transport->CutFromIOBufList(buf, ndata);
+}
+
+int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
+ bool pollin, const timespec duetime) {
+ if (_rdma_state == RDMA_ON) {
+ const int expected_val = _epollout_butex
+ ->load(butil::memory_order_acquire);
+ CHECK(_rdma_ep != NULL);
+ if (!_rdma_ep->IsWritable()) {
+ g_vars->nwaitepollout << 1;
+ if (bthread::butex_wait(_epollout_butex, expected_val, &duetime) <
0) {
+ if (errno != EAGAIN && errno != ETIMEDOUT) {
+ const int saved_errno = errno;
+ PLOG(WARNING) << "Fail to wait rdma window of " << _socket;
+ _socket->SetFailed(saved_errno,
+ "Fail to wait rdma window of %s: %s",
+ _socket->description().c_str(),
+ berror(saved_errno));
+ }
+ if (_socket->Failed()) {
+ // NOTE:
+ // Different from TCP, we cannot find the RDMA channel
+ // failed by writing to it. Thus we must check if it
+ // is already failed here.
+ return 1;
+ }
+ }
+ }
+ } else {
+ return _tcp_transport->WaitEpollOut(_epollout_butex, pollin, duetime);
+ }
+ return 0;
+}
+
+void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
+ bthread_t tid;
+ if (FLAGS_usercode_in_coroutine) {
+ OnEdge(_socket);
+ } else if (rdma::FLAGS_rdma_edisp_unsched == false) {
+ auto rc = bthread_start_background(&tid, &attr, OnEdge, _socket);
+ if (rc != 0) {
+ LOG(FATAL) << "Fail to start ProcessEvent";
+ OnEdge(_socket);
+ }
+ } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+ LOG(FATAL) << "Fail to start ProcessEvent";
+ OnEdge(_socket);
+ }
+}
+
+void RdmaTransport::QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) {
+ if (last_msg && !rdma::FLAGS_rdma_use_polling) {
+ return;
+ }
+ InputMessageBase* to_run_msg = input_msg.release();
+ if (!to_run_msg) {
+ return;
+ }
+
+ if (rdma::FLAGS_rdma_disable_bthread) {
+ ProcessInputMessage(to_run_msg);
+ return;
+ }
+ // Create bthread for last_msg. The bthread is not scheduled
+ // until bthread_flush() is called (in the worse case).
+
+ // TODO(gejun): Join threads.
+ bthread_t th;
+ bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
+ BTHREAD_ATTR_PTHREAD :
+
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+ tmp.keytable_pool = _socket->keytable_pool();
+ tmp.tag = bthread_self_tag();
+ bthread_attr_set_name(&tmp, "ProcessInputMessage");
+
+ if (!FLAGS_usercode_in_coroutine && bthread_start_background(
+ &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
+ ++*num_bthread_created;
+ } else {
+ ProcessInputMessage(to_run_msg);
+ }
+}
+
+void RdmaTransport::Debug(std::ostream &os) {
+ if (_rdma_state == RDMA_ON && _rdma_ep) {
+ _rdma_ep->DebugInfo(os);
+ }
+}
+
+int RdmaTransport::ContextInitOrDie(bool serverOrNot, const void* _options) {
+ if (serverOrNot) {
+ if (!OptionsAvailableOverRdma(static_cast<const ServerOptions
*>(_options))) {
+ return -1;
+ }
+ rdma::GlobalRdmaInitializeOrDie();
+ if (!rdma::InitPollingModeWithTag(static_cast<const ServerOptions
*>(_options)->bthread_tag)) {
+ return -1;
+ }
+ } else {
+ if (!OptionsAvailableForRdma(static_cast<const ChannelOptions
*>(_options))) {
+ return -1;
+ }
+ rdma::GlobalRdmaInitializeOrDie();
+ if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
+ return -1;
+ }
+ return 0;
+ }
+
+ return 0;
+}
+
+bool RdmaTransport::OptionsAvailableForRdma(const ChannelOptions* opt) {
+ if (opt->has_ssl_options()) {
+ LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
+ return false;
+ }
+ if (!rdma::SupportedByRdma(opt->protocol.name())) {
+ LOG(WARNING) << "Cannot use " << opt->protocol.name()
+ << " over RDMA";
+ return false;
+ }
+ return true;
+}
+
+bool RdmaTransport::OptionsAvailableOverRdma(const ServerOptions* opt) {
+ if (opt->rtmp_service) {
+ LOG(WARNING) << "RTMP is not supported by RDMA";
+ return false;
+ }
+ if (opt->has_ssl_options()) {
+ LOG(WARNING) << "SSL is not supported by RDMA";
+ return false;
+ }
+ if (opt->nshead_service) {
+ LOG(WARNING) << "NSHEAD is not supported by RDMA";
+ return false;
+ }
+ if (opt->mongo_service_adaptor) {
+ LOG(WARNING) << "MONGO is not supported by RDMA";
+ return false;
+ }
+ return true;
+}
+}
+#endif
\ No newline at end of file
diff --git a/src/brpc/rdma_transport.h b/src/brpc/rdma_transport.h
new file mode 100644
index 00000000..7e62edff
--- /dev/null
+++ b/src/brpc/rdma_transport.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_RDMA_TRANSPORT_H
+#define BRPC_RDMA_TRANSPORT_H
+
+#if BRPC_WITH_RDMA
+#include "brpc/socket.h"
+#include "brpc/channel.h"
+#include "brpc/transport.h"
+
+namespace brpc {
+class RdmaTransport : public Transport {
+ friend class TransportFactory;
+ friend class rdma::RdmaEndpoint;
+ friend class rdma::RdmaConnect;
+public:
+ void Init(Socket* socket, const SocketOptions& options) override;
+ void Release() override;
+ int Reset(int32_t expected_nref) override;
+ std::shared_ptr<AppConnect> Connect() override;
+ int CutFromIOBuf(butil::IOBuf* buf) override;
+ ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) override;
+ int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const
timespec duetime) override;
+ void ProcessEvent(bthread_attr_t attr) override;
+ void QueueMessage(InputMessageClosure& inputMsg, int* num_bthread_created,
bool last_msg) override;
+ void Debug(std::ostream &os) override;
+ rdma::RdmaEndpoint* GetRdmaEp() {
+ CHECK(_rdma_ep != NULL);
+ return _rdma_ep;
+ }
+ static int ContextInitOrDie(bool serverOrNot, const void* _options);
+private:
+ static bool OptionsAvailableForRdma(const ChannelOptions* opt);
+ static bool OptionsAvailableOverRdma(const ServerOptions* opt);
+private:
+ // The on/off state of RDMA
+ enum RdmaState {
+ RDMA_ON,
+ RDMA_OFF,
+ RDMA_UNKNOWN
+ };
+ // The RdmaEndpoint
+ rdma::RdmaEndpoint* _rdma_ep = NULL;
+ // Should use RDMA or not
+ RdmaState _rdma_state;
+ std::shared_ptr<TcpTransport> _tcp_transport;
+};
+}
+#endif // BRPC_WITH_RDMA
+#endif //BRPC_RDMA_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 8e2368bc..9470220d 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -81,6 +81,7 @@
#include "brpc/details/tcmalloc_extension.h"
#include "brpc/rdma/rdma_helper.h"
#include "brpc/baidu_master_service.h"
+#include "brpc/transport_factory.h"
inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {
const char old_fill = os.fill();
@@ -146,7 +147,7 @@ ServerOptions::ServerOptions()
, internal_port(-1)
, has_builtin_services(true)
, force_ssl(false)
- , use_rdma(false)
+ , socket_mode(SOCKET_MODE_TCP)
, baidu_master_service(NULL)
, http_master_service(NULL)
, health_reporter(NULL)
@@ -772,27 +773,6 @@ bool Server::CreateConcurrencyLimiter(const
AdaptiveMaxConcurrency& amc,
return true;
}
-#if BRPC_WITH_RDMA
-static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
- if (opt->rtmp_service) {
- LOG(WARNING) << "RTMP is not supported by RDMA";
- return false;
- }
- if (opt->has_ssl_options()) {
- LOG(WARNING) << "SSL is not supported by RDMA";
- return false;
- }
- if (opt->nshead_service) {
- LOG(WARNING) << "NSHEAD is not supported by RDMA";
- return false;
- }
- if (opt->mongo_service_adaptor) {
- LOG(WARNING) << "MONGO is not supported by RDMA";
- return false;
- }
- return true;
-}
-#endif
static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0);
static bool g_default_ignore_eovercrowded(false);
@@ -889,20 +869,10 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
<< FLAGS_task_group_ntags << ")";
return -1;
}
-
- if (_options.use_rdma) {
-#if BRPC_WITH_RDMA
- if (!OptionsAvailableOverRdma(&_options)) {
- return -1;
- }
- rdma::GlobalRdmaInitializeOrDie();
- if (!rdma::InitPollingModeWithTag(_options.bthread_tag)) {
- return -1;
- }
-#else
- LOG(WARNING) << "Cannot use rdma since brpc does not compile with
rdma";
+ int ret = TransportFactory::ContextInitOrDie(_options.socket_mode, true,
&_options);
+ if (ret != 0) {
+ LOG(ERROR) << "Fail to initialize transport context for server, ret="
<< ret;
return -1;
-#endif
}
if (_options.http_master_service) {
@@ -1170,7 +1140,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
LOG(ERROR) << "Fail to build acceptor";
return -1;
}
- _am->_use_rdma = _options.use_rdma;
+ _am->_socket_mode = _options.socket_mode;
_am->_bthread_tag = _options.bthread_tag;
}
// Set `_status' to RUNNING before accepting connections
diff --git a/src/brpc/server.h b/src/brpc/server.h
index c262375c..9f69a834 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -45,6 +45,7 @@
#include "brpc/concurrency_limiter.h"
#include "brpc/baidu_master_service.h"
#include "brpc/rpc_pb_message_factory.h"
+#include "brpc/socket_mode.h"
namespace brpc {
@@ -223,9 +224,9 @@ struct ServerOptions {
// Force ssl for all connections of the port to Start().
bool force_ssl;
- // Whether the server uses rdma or not
- // Default: false
- bool use_rdma;
+ // the server socket mode uses tcp or rdma or other
+ // Default: SOCKET_MODE_TCP
+ SocketMode socket_mode;
// [CAUTION] This option is for implementing specialized baidu-std proxies,
// most users don't need it. Don't change this option unless you fully
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index e431acef..9b14d430 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -50,8 +50,7 @@
#include "brpc/policy/rtmp_protocol.h" // FIXME
#include "brpc/periodic_task.h"
#include "brpc/details/health_check.h"
-#include "brpc/rdma/rdma_endpoint.h"
-#include "brpc/rdma/rdma_helper.h"
+#include "brpc/transport_factory.h"
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
@@ -456,6 +455,7 @@ Socket::Socket(Forbidden f)
, _tos(0)
, _reset_fd_real_us(-1)
, _on_edge_triggered_events(NULL)
+ , _need_on_edge_trigger(false)
, _user(NULL)
, _conn(NULL)
, _preferred_index(-1)
@@ -473,8 +473,8 @@ Socket::Socket(Forbidden f)
, _auth_context(NULL)
, _ssl_state(SSL_UNKNOWN)
, _ssl_session(NULL)
- , _rdma_ep(NULL)
- , _rdma_state(RDMA_OFF)
+ , _socket_mode(SOCKET_MODE_TCP)
+ , _transport(nullptr)
, _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
, _controller_released_socket(false)
, _overcrowded(false)
@@ -601,7 +601,7 @@ int Socket::ResetFileDescriptor(int fd) {
SetSocketOptions(fd);
- if (_on_edge_triggered_events) {
+ if (_transport->HasOnEdgeTrigger()) {
if (_io_event.AddConsumer(fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
@@ -721,6 +721,11 @@ int Socket::OnCreated(const SocketOptions& options) {
auto guard = butil::MakeScopeGuard([this] {
_io_event.Reset();
});
+ // start build the transport
+ _socket_mode = options.socket_mode;
+ _transport = TransportFactory::CreateTransport(options.socket_mode);
+ CHECK(NULL != _transport);
+ _transport->Init(this, options);
g_vars->nsocket << 1;
CHECK(NULL == _shared_part.load(butil::memory_order_relaxed));
@@ -731,9 +736,10 @@ int Socket::OnCreated(const SocketOptions& options) {
_local_side = options.local_side;
_device_name = options.device_name;
_on_edge_triggered_events = options.on_edge_triggered_events;
+ _need_on_edge_trigger = options.need_on_edge_trigger;
_user = options.user;
_conn = options.conn;
- _app_connect = options.app_connect;
+ _app_connect = _transport->Connect();
_preferred_index = -1;
_hc_count = 0;
CHECK(_read_buf.empty());
@@ -757,22 +763,6 @@ int Socket::OnCreated(const SocketOptions& options) {
_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
_ssl_session = NULL;
_ssl_ctx = options.initial_ssl_ctx;
-#if BRPC_WITH_RDMA
- CHECK(_rdma_ep == NULL);
- if (options.use_rdma) {
- _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(this);
- if (!_rdma_ep) {
- const int saved_errno = errno;
- PLOG(ERROR) << "Fail to create RdmaEndpoint";
- SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
- berror(saved_errno));
- return -1;
- }
- _rdma_state = RDMA_UNKNOWN;
- } else {
- _rdma_state = RDMA_OFF;
- }
-#endif
_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
_controller_released_socket.store(false, butil::memory_order_relaxed);
_overcrowded = false;
@@ -852,7 +842,7 @@ void Socket::BeforeRecycled() {
};
const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
if (ValidFileDescriptor(prev_fd)) {
- if (_on_edge_triggered_events != NULL) {
+ if (_transport->HasOnEdgeTrigger()) {
_io_event.RemoveConsumer(prev_fd);
}
close(prev_fd);
@@ -860,15 +850,7 @@ void Socket::BeforeRecycled() {
g_vars->channel_conn << -1;
}
}
-
-#if BRPC_WITH_RDMA
- if (_rdma_ep) {
- delete _rdma_ep;
- _rdma_ep = NULL;
- _rdma_state = RDMA_UNKNOWN;
- }
-#endif
-
+ _transport->Release();
reset_parsing_context(NULL);
_read_buf.clear();
@@ -1013,7 +995,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
// It's safe to close previous fd (provided expected_nref is correct).
const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
if (ValidFileDescriptor(prev_fd)) {
- if (_on_edge_triggered_events != NULL) {
+ if (_transport->HasOnEdgeTrigger()) {
_io_event.RemoveConsumer(prev_fd);
}
close(prev_fd);
@@ -1021,13 +1003,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
g_vars->channel_conn << -1;
}
}
-
-#if BRPC_WITH_RDMA
- if (_rdma_ep) {
- _rdma_ep->Reset();
- _rdma_state = RDMA_UNKNOWN;
- }
-#endif
+ _transport->Reset(expected_nref);
_local_side = butil::EndPoint();
if (_ssl_session) {
@@ -1181,13 +1157,6 @@ int Socket::Status(SocketId id, int32_t* nref) {
return -1;
}
-void* Socket::ProcessEvent(void* arg) {
- // the enclosed Socket is valid and free to access inside this function.
- SocketUniquePtr s(static_cast<Socket*>(arg));
- s->_on_edge_triggered_events(s.get());
- return NULL;
-}
-
// Check if there're new requests appended.
// If yes, point old_head to reversed new requests and return false;
// If no:
@@ -1771,16 +1740,7 @@ int Socket::StartWrite(WriteRequest* req, const
WriteOptions& opt) {
butil::IOBuf* data_arr[1] = { &req->data };
nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {
-#if BRPC_WITH_RDMA
- if (_rdma_ep && _rdma_state != RDMA_OFF) {
- butil::IOBuf* data_arr[1] = { &req->data };
- nw = _rdma_ep->CutFromIOBufList(data_arr, 1);
- } else {
-#else
- {
-#endif
- nw = req->data.cut_into_file_descriptor(fd());
- }
+ nw = _transport->CutFromIOBuf(&req->data);
}
if (nw < 0) {
// RTMP may return EOVERCROWDED
@@ -1882,45 +1842,11 @@ void* Socket::KeepWrite(void* void_arg) {
// which may turn on _overcrowded to stop pending requests from
// growing infinitely.
const timespec duetime =
- butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
-#if BRPC_WITH_RDMA
- if (s->_rdma_state == RDMA_ON) {
- const int expected_val = s->_epollout_butex
- ->load(butil::memory_order_acquire);
- CHECK(s->_rdma_ep != NULL);
- if (!s->_rdma_ep->IsWritable()) {
- g_vars->nwaitepollout << 1;
- if (bthread::butex_wait(s->_epollout_butex,
- expected_val, &duetime) < 0) {
- if (errno != EAGAIN && errno != ETIMEDOUT) {
- const int saved_errno = errno;
- PLOG(WARNING) << "Fail to wait rdma window of " <<
*s;
- s->SetFailed(saved_errno, "Fail to wait rdma
window of %s: %s",
- s->description().c_str(),
berror(saved_errno));
- }
- if (s->Failed()) {
- // NOTE:
- // Different from TCP, we cannot find the RDMA
channel
- // failed by writing to it. Thus we must check if
it
- // is already failed here.
- break;
- }
- }
- }
- } else {
-#else
- {
-#endif
- g_vars->nwaitepollout << 1;
- bool pollin = (s->_on_edge_triggered_events != NULL);
- const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
- if (rc < 0 && errno != ETIMEDOUT) {
- const int saved_errno = errno;
- PLOG(WARNING) << "Fail to wait epollout of " << *s;
- s->SetFailed(saved_errno, "Fail to wait epollout of %s:
%s",
- s->description().c_str(), berror(saved_errno));
- break;
- }
+
butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
+ bool pollin = s->_transport->HasOnEdgeTrigger();
+ int ret = s->_transport->WaitEpollOut(s->_epollout_butex, pollin,
duetime);
+ if (ret == 1) {
+ break;
}
}
if (NULL == cur_tail) {
@@ -1960,13 +1886,7 @@ ssize_t Socket::DoWrite(WriteRequest* req) {
if (_conn) {
return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
} else {
-#if BRPC_WITH_RDMA
- if (_rdma_ep && _rdma_state != RDMA_OFF) {
- return _rdma_ep->CutFromIOBufList(data_list, ndata);
- }
-#endif
- return butil::IOBuf::cut_multiple_into_file_descriptor(
- fd(), data_list, ndata);
+ return _transport->CutFromIOBufList(data_list, ndata);
}
}
@@ -2155,7 +2075,6 @@ ssize_t Socket::DoRead(size_t size_hint) {
errno = ESSL;
return -1;
}
- CHECK(_rdma_state == RDMA_OFF);
return _read_buf.append_from_file_descriptor(fd(), size_hint);
}
@@ -2257,7 +2176,7 @@ int Socket::OnInputEvent(void* user_data, uint32_t events,
if (Address(id, &s) < 0) {
return -1;
}
- if (NULL == s->_on_edge_triggered_events) {
+ if (!s->_transport->HasOnEdgeTrigger()) {
// Callback can be NULL when receiving error epoll events
// (Added into epoll by `WaitConnected')
return 0;
@@ -2283,28 +2202,15 @@ int Socket::OnInputEvent(void* user_data, uint32_t
events,
// is just 1500~1700/s
g_vars->neventthread << 1;
- bthread_t tid;
// transfer ownership as well, don't use s anymore!
Socket* const p = s.release();
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
attr.tag = bthread_self_tag();
- bthread_attr_set_name(&attr, "ProcessEvent");
- if (FLAGS_usercode_in_coroutine) {
- ProcessEvent(p);
-#if BRPC_WITH_RDMA
- } else if (rdma::FLAGS_rdma_edisp_unsched) {
- auto rc = bthread_start_background(&tid, &attr, ProcessEvent, p);
- if (rc != 0) {
- LOG(FATAL) << "Fail to start ProcessEvent";
- ProcessEvent(p);
- }
-#endif
- } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
- LOG(FATAL) << "Fail to start ProcessEvent";
- ProcessEvent(p);
- }
+ // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY
+ attr.flags = attr.flags & (~BTHREAD_GLOBAL_PRIORITY);
+ p->_transport->ProcessEvent(attr);
}
return 0;
}
@@ -2606,11 +2512,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\n}";
}
#endif
-#if BRPC_WITH_RDMA
- if (ptr->_rdma_state == RDMA_ON && ptr->_rdma_ep) {
- ptr->_rdma_ep->DebugInfo(os);
- }
-#endif
+ ptr->_transport->Debug(os);
{ os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); }
}
@@ -2833,10 +2735,11 @@ int Socket::GetPooledSocket(SocketUniquePtr*
pooled_socket) {
opt.local_side = butil::EndPoint(local_side().ip, 0);
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
+ opt.need_on_edge_trigger = _need_on_edge_trigger;
opt.initial_ssl_ctx = _ssl_ctx;
opt.keytable_pool = _keytable_pool;
opt.app_connect = _app_connect;
- opt.use_rdma = (_rdma_ep) ? true : false;
+ opt.socket_mode = _socket_mode;
socket_pool = new SocketPool(opt);
SocketPool* expected = NULL;
if (!main_sp->socket_pool.compare_exchange_strong(
@@ -2935,10 +2838,11 @@ int Socket::GetShortSocket(SocketUniquePtr*
short_socket) {
opt.local_side = butil::EndPoint(local_side().ip, 0);
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
+ opt.need_on_edge_trigger = _need_on_edge_trigger;
opt.initial_ssl_ctx = _ssl_ctx;
opt.keytable_pool = _keytable_pool;
opt.app_connect = _app_connect;
- opt.use_rdma = (_rdma_ep) ? true : false;
+ opt.socket_mode = _socket_mode;
if (get_client_side_messenger()->Create(opt, &id) != 0 ||
Address(id, short_socket) != 0) {
return -1;
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index a3e23230..c2f751e3 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -42,6 +42,7 @@
#include "brpc/event_dispatcher.h"
#include "brpc/versioned_ref_with_id.h"
#include "brpc/health_check_option.h"
+#include "brpc/socket_mode.h"
namespace brpc {
namespace policy {
@@ -61,6 +62,7 @@ class Socket;
class AuthContext;
class EventDispatcher;
class Stream;
+class Transport;
// A special closure for processing the about-to-recycle socket. Socket does
// not delete SocketUser, if you want, `delete this' at the end of
@@ -268,11 +270,20 @@ struct SocketOptions {
// until new data arrives. The callback will not be called from more than
// one thread at any time.
void (*on_edge_triggered_events)(Socket*){NULL};
+ // Indicates that this socket requires an edge-triggered event handler even
+ // if `on_edge_triggered_events` is left as NULL by the caller. When this
+ // flag is true and `on_edge_triggered_events` is NULL, the underlying
+ // transport-specific implementation (e.g. a transport subclass) is allowed
+ // to install a suitable default `on_edge_triggered_events` callback on
+ // behalf of the user. Typical usage is by transports/protocols that rely
+ // on edge-triggered I/O semantics but want the framework to provide the
+ // actual event handler.
+ bool need_on_edge_trigger{false};
int health_check_interval_s{-1};
// Only accept ssl connection.
bool force_ssl{false};
std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
- bool use_rdma{false};
+ SocketMode socket_mode{SOCKET_MODE_TCP};
bthread_keytable_pool_t* keytable_pool{NULL};
SocketConnection* conn{NULL};
std::shared_ptr<AppConnect> app_connect;
@@ -313,6 +324,10 @@ friend class policy::H2GlobalStreamCreator;
friend class VersionedRefWithId<Socket>;
friend class IOEvent<Socket>;
friend void DereferenceSocket(Socket*);
+friend class Transport;
+friend class TcpTransport;
+friend class RdmaTransport;
+friend class TransportFactory;
class SharedPart;
struct WriteRequest;
@@ -650,13 +665,6 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(Socket);
- // The on/off state of RDMA
- enum RdmaState {
- RDMA_ON,
- RDMA_OFF,
- RDMA_UNKNOWN
- };
-
int ConductError(bthread_id_t);
int StartWrite(WriteRequest*, const WriteOptions&);
@@ -732,7 +740,6 @@ private:
// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);
- static void* ProcessEvent(void*);
static void* KeepWrite(void*);
@@ -839,7 +846,7 @@ private:
// of EventDispatcher::AddConsumer (event_dispatcher.h)
// carefully before implementing the callback.
void (*_on_edge_triggered_events)(Socket*);
-
+ bool _need_on_edge_trigger;
// A set of callbacks to monitor important events of this socket.
// Initialized by SocketOptions.user
SocketUser* _user;
@@ -918,10 +925,9 @@ private:
SSL* _ssl_session; // owner
std::shared_ptr<SocketSSLContext> _ssl_ctx;
- // The RdmaEndpoint
- rdma::RdmaEndpoint* _rdma_ep;
- // Should use RDMA or not
- RdmaState _rdma_state;
+ // Should use SOCKET_MODE_RDMA or SOCKET_MODE_TCP or Other, default is
SOCKET_MODE_TCP Transport
+ SocketMode _socket_mode{SOCKET_MODE_TCP};
+ std::shared_ptr<Transport> _transport;
// Pass from controller, for progressive reading.
ConnectionType _connection_type_for_progressive_read;
diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h
index 7cf08804..b1922bf8 100644
--- a/src/brpc/socket_map.h
+++ b/src/brpc/socket_map.h
@@ -84,12 +84,12 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id,
inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
- bool use_rdma,
+ SocketMode socket_mode,
const HealthCheckOption& hc_option) {
SocketOptions opt;
opt.remote_side = key.peer.addr;
opt.initial_ssl_ctx = ssl_ctx;
- opt.use_rdma = use_rdma;
+ opt.socket_mode = socket_mode;
opt.hc_option = hc_option;
return SocketMapInsert(key, id, opt);
}
@@ -97,13 +97,13 @@ inline int SocketMapInsert(const SocketMapKey& key,
SocketId* id,
inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
HealthCheckOption hc_option;
- return SocketMapInsert(key, id, ssl_ctx, false, hc_option);
+ return SocketMapInsert(key, id, ssl_ctx, SOCKET_MODE_TCP, hc_option);
}
inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) {
std::shared_ptr<SocketSSLContext> empty_ptr;
HealthCheckOption hc_option;
- return SocketMapInsert(key, id, empty_ptr, false, hc_option);
+ return SocketMapInsert(key, id, empty_ptr, SOCKET_MODE_TCP, hc_option);
}
// Find the SocketId associated with `key'.
@@ -164,12 +164,12 @@ public:
int Init(const SocketMapOptions&);
int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx,
- bool use_rdma,
+ SocketMode socket_mode,
const HealthCheckOption& hc_option) {
SocketOptions opt;
opt.remote_side = key.peer.addr;
opt.initial_ssl_ctx = ssl_ctx;
- opt.use_rdma = use_rdma;
+ opt.socket_mode = socket_mode;
opt.hc_option = hc_option;
return Insert(key, id, opt);
}
@@ -177,12 +177,12 @@ public:
int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
HealthCheckOption hc_option;
- return Insert(key, id, ssl_ctx, false, hc_option);
+ return Insert(key, id, ssl_ctx, SOCKET_MODE_TCP, hc_option);
}
int Insert(const SocketMapKey& key, SocketId* id) {
std::shared_ptr<SocketSSLContext> empty_ptr;
HealthCheckOption hc_option;
- return Insert(key, id, empty_ptr, false, hc_option);
+ return Insert(key, id, empty_ptr, SOCKET_MODE_TCP, hc_option);
}
int Insert(const SocketMapKey& key, SocketId* id, SocketOptions& opt);
diff --git a/src/brpc/socket_mode.h b/src/brpc/socket_mode.h
new file mode 100644
index 00000000..8bce0189
--- /dev/null
+++ b/src/brpc/socket_mode.h
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COMMON_H
+#define BRPC_COMMON_H
+namespace brpc {
+enum SocketMode {
+ SOCKET_MODE_TCP = 0,
+ SOCKET_MODE_RDMA = 1
+};
+}
+#endif //BRPC_COMMON_H
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.cpp b/src/brpc/tcp_transport.cpp
new file mode 100644
index 00000000..49c6f68d
--- /dev/null
+++ b/src/brpc/tcp_transport.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "tcp_transport.h"
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector* g_vars;
+
+void TcpTransport::Init(Socket* socket, const SocketOptions& options) {
+ _socket = socket;
+ _default_connect = options.app_connect;
+ _on_edge_trigger = options.on_edge_triggered_events;
+ if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+ _on_edge_trigger = InputMessenger::OnNewMessages;
+ }
+}
+
+void TcpTransport::Release(){}
+
+int TcpTransport::Reset(int32_t expected_nref) {
+ return 0;
+}
+
+int TcpTransport::CutFromIOBuf(butil::IOBuf* buf) {
+ return buf->cut_into_file_descriptor(_socket->fd());
+}
+
+std::shared_ptr<AppConnect> TcpTransport::Connect() {
+ return _default_connect;
+}
+
+ssize_t TcpTransport::CutFromIOBufList(butil::IOBuf** buf, size_t ndata) {
+ return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf,
ndata);
+}
+
+int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex, bool
pollin, const timespec duetime) {
+ g_vars->nwaitepollout << 1;
+ const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
+ if (rc < 0 && errno != ETIMEDOUT) {
+ const int saved_errno = errno;
+ PLOG(WARNING) << "Fail to wait epollout of " << _socket;
+ _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
+ _socket->description().c_str(),
berror(saved_errno));
+ return 1;
+ }
+ return 0;
+}
+
+void TcpTransport::ProcessEvent(bthread_attr_t attr) {
+ bthread_t tid;
+ if (FLAGS_usercode_in_coroutine) {
+ OnEdge(_socket);
+ } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+ LOG(FATAL) << "Fail to start ProcessEvent";
+ OnEdge(_socket);
+ }
+}
+void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) {
+ InputMessageBase* to_run_msg = input_msg.release();
+ if (!to_run_msg) {
+ return;
+ }
+ // Create bthread for last_msg. The bthread is not scheduled
+ // until bthread_flush() is called (in the worse case).
+ bthread_t th;
+ bthread_attr_t tmp = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+ tmp.keytable_pool = _socket->keytable_pool();
+ tmp.tag = bthread_self_tag();
+ bthread_attr_set_name(&tmp, "ProcessInputMessage");
+ if (!FLAGS_usercode_in_coroutine && bthread_start_background(
+ &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
+ ++*num_bthread_created;
+ } else {
+ ProcessInputMessage(to_run_msg);
+ }
+}
+void TcpTransport::Debug(std::ostream &os) {}
+}
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.h b/src/brpc/tcp_transport.h
new file mode 100644
index 00000000..b8c6b5e6
--- /dev/null
+++ b/src/brpc/tcp_transport.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TCP_TRANSPORT_H
+#define BRPC_TCP_TRANSPORT_H
+
+#include "brpc/transport.h"
+#include "brpc/socket.h"
+
+namespace brpc {
+class TcpTransport : public Transport {
+ friend class TransportFactory;
+public:
+ void Init(Socket* socket, const SocketOptions& options) override;
+ void Release() override;
+ int Reset(int32_t expected_nref) override;
+ std::shared_ptr<AppConnect> Connect() override;
+ int CutFromIOBuf(butil::IOBuf* buf) override;
+ ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) override;
+ int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const
timespec duetime) override;
+ void ProcessEvent(bthread_attr_t attr) override;
+ void QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) override;
+ void Debug(std::ostream &os) override;
+};
+}
+
+#endif //BRPC_TCP_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/transport.h b/src/brpc/transport.h
new file mode 100644
index 00000000..ca898508
--- /dev/null
+++ b/src/brpc/transport.h
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TRANSPORT_H
+#define BRPC_TRANSPORT_H
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "server.h"
+
+namespace brpc {
+using OnEdgeTrigger = std::function<void (Socket*)>;
+class Transport {
+ friend class TransportFactory;
+public:
+ static void* OnEdge(void* arg) {
+ // the enclosed Socket is valid and free to access inside this
function.
+ SocketUniquePtr s(static_cast<Socket*>(arg));
+ const OnEdgeTrigger on_edge_trigger =
s->_transport->GetOnEdgeTrigger();
+ on_edge_trigger(s.get());
+ return NULL;
+ }
+
+ static void* ProcessInputMessage(void* void_arg) {
+ InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
+ msg->_process(msg);
+ return NULL;
+ }
+ virtual ~Transport() = default;
+ virtual void Init(Socket* socket, const SocketOptions& options) = 0;
+ virtual void Release() = 0;
+ virtual int Reset(int32_t expected_nref) = 0;
+ virtual std::shared_ptr<AppConnect> Connect() = 0;
+ virtual int CutFromIOBuf(butil::IOBuf* buf) = 0;
+ virtual ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) = 0;
+ virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin,
const timespec duetime) = 0;
+ virtual void ProcessEvent(bthread_attr_t attr) = 0;
+ virtual void QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) = 0;
+ virtual void Debug(std::ostream &os) = 0;
+
+ bool HasOnEdgeTrigger() {
+ return _on_edge_trigger != NULL;
+ }
+ OnEdgeTrigger GetOnEdgeTrigger() {
+ return _on_edge_trigger;
+ }
+protected:
+ Socket* _socket;
+ std::shared_ptr<AppConnect> _default_connect;
+ OnEdgeTrigger _on_edge_trigger;
+};
+}
+#endif //BRPC_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/transport_factory.cpp b/src/brpc/transport_factory.cpp
new file mode 100644
index 00000000..b29a5e6d
--- /dev/null
+++ b/src/brpc/transport_factory.cpp
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transport_factory.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma_transport.h"
+namespace brpc {
+int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot,
const void* _options) {
+ if (mode == SOCKET_MODE_TCP) {
+ return 0;
+ }
+#if BRPC_WITH_RDMA
+ else if (mode == SOCKET_MODE_RDMA) {
+ return RdmaTransport::ContextInitOrDie(serverOrNot, _options);
+ }
+#endif
+ else {
+ LOG(ERROR) << "unknown transport type " << mode;
+ return 1;
+ }
+}
+
+std::shared_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
+ if (mode == SOCKET_MODE_TCP) {
+ return std::unique_ptr<TcpTransport>(new TcpTransport());
+ }
+#if BRPC_WITH_RDMA
+ else if (mode == SOCKET_MODE_RDMA) {
+ return std::unique_ptr<RdmaTransport>(new RdmaTransport());
+ }
+#endif
+ else {
+ LOG(ERROR) << "socket_mode set error";
+ return nullptr;
+ }
+}
+} // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/transport_factory.h b/src/brpc/transport_factory.h
new file mode 100644
index 00000000..bdbf4c2b
--- /dev/null
+++ b/src/brpc/transport_factory.h
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TRANSPORT_FACTORY_H
+#define BRPC_TRANSPORT_FACTORY_H
+
+#include "brpc/errno.pb.h"
+#include "brpc/socket_mode.h"
+#include "brpc/transport.h"
+
+#if BRPC_WITH_RDMA
+BAIDU_REGISTER_ERRNO(brpc::ERDMA, "RDMA verbs error");
+BAIDU_REGISTER_ERRNO(brpc::ERDMAMEM, "Memory not registered for RDMA");
+#endif
+
+namespace brpc {
+// TransportFactory to create transport instance with socket_mode {TCP, RDMA}
+class TransportFactory {
+public:
+ static int ContextInitOrDie(SocketMode mode, bool serverOrNot, const void*
_options);
+ // create transport instance with socket mode
+ static std::shared_ptr<Transport> CreateTransport(SocketMode mode);
+};
+}
+
+#endif //BRPC_TRANSPORT_FACTORY_H
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]