This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 02879c89 Add Transport to support more communication protocol 
extensions (#3199)
02879c89 is described below

commit 02879c89d962f1b6363979437515131b4457ffb8
Author: Chuang Zhang <[email protected]>
AuthorDate: Tue Jan 27 10:59:00 2026 +0800

    Add Transport to support more communication protocol extensions (#3199)
    
    * Add The transport layer to support communication protocols of different 
device vendors.
    
    * Refine the SocketMode name style and clean some unused code
    
    * Refine Transport Debug method param and RdmaTransport WaitEpollOut code
    
    * format the code, remove indentation for top class and variables in new 
file
    
    * review code
    
    ---------
    
    Co-authored-by: wenjiecn <[email protected]>
---
 example/rdma_performance/client.cpp        |   2 +-
 example/rdma_performance/server.cpp        |   2 +-
 src/brpc/acceptor.cpp                      |  16 +-
 src/brpc/acceptor.h                        |   5 +-
 src/brpc/channel.cpp                       |  38 +----
 src/brpc/channel.h                         |   7 +-
 src/brpc/details/naming_service_thread.cpp |   2 +-
 src/brpc/details/naming_service_thread.h   |   3 +-
 src/brpc/input_message_base.h              |   1 +
 src/brpc/input_messenger.cpp               |  69 ++-------
 src/brpc/input_messenger.h                 |  37 ++---
 src/brpc/rdma/rdma_endpoint.cpp            |  51 ++++---
 src/brpc/rdma_transport.cpp                | 238 +++++++++++++++++++++++++++++
 src/brpc/rdma_transport.h                  |  65 ++++++++
 src/brpc/server.cpp                        |  42 +----
 src/brpc/server.h                          |   7 +-
 src/brpc/socket.cpp                        | 160 ++++---------------
 src/brpc/socket.h                          |  34 +++--
 src/brpc/socket_map.h                      |  16 +-
 src/brpc/socket_mode.h                     |  26 ++++
 src/brpc/tcp_transport.cpp                 |  94 ++++++++++++
 src/brpc/tcp_transport.h                   |  41 +++++
 src/brpc/transport.h                       |  66 ++++++++
 src/brpc/transport_factory.cpp             |  51 +++++++
 src/brpc/transport_factory.h               |  40 +++++
 25 files changed, 775 insertions(+), 338 deletions(-)

diff --git a/example/rdma_performance/client.cpp 
b/example/rdma_performance/client.cpp
index 57d0c06c..a7ed2c99 100644
--- a/example/rdma_performance/client.cpp
+++ b/example/rdma_performance/client.cpp
@@ -102,7 +102,7 @@ public:
 
     int Init() {
         brpc::ChannelOptions options;
-        options.use_rdma = FLAGS_use_rdma;
+        options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : 
brpc::SOCKET_MODE_TCP;
         options.protocol = FLAGS_protocol;
         options.connection_type = FLAGS_connection_type;
         options.timeout_ms = FLAGS_rpc_timeout_ms;
diff --git a/example/rdma_performance/server.cpp 
b/example/rdma_performance/server.cpp
index d3d00057..2e93e1ee 100644
--- a/example/rdma_performance/server.cpp
+++ b/example/rdma_performance/server.cpp
@@ -76,7 +76,7 @@ int main(int argc, char* argv[]) {
     g_last_time.store(0, butil::memory_order_relaxed);
 
     brpc::ServerOptions options;
-    options.use_rdma = FLAGS_use_rdma;
+    options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : 
brpc::SOCKET_MODE_TCP;
     if (server.Start(FLAGS_port, &options) != 0) {
         LOG(ERROR) << "Fail to start EchoServer";
         return -1;
diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp
index fd6564c9..f9c22a68 100644
--- a/src/brpc/acceptor.cpp
+++ b/src/brpc/acceptor.cpp
@@ -21,8 +21,8 @@
 #include "butil/fd_guard.h"                 // fd_guard 
 #include "butil/fd_utility.h"               // make_close_on_exec
 #include "butil/time.h"                     // gettimeofday_us
-#include "brpc/rdma/rdma_endpoint.h"
 #include "brpc/acceptor.h"
+#include "brpc/transport_factory.h"
 
 
 namespace brpc {
@@ -40,7 +40,7 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
     , _empty_cond(&_map_mutex)
     , _force_ssl(false)
     , _ssl_ctx(NULL) 
-    , _use_rdma(false)
+    , _socket_mode(SOCKET_MODE_TCP)
     , _bthread_tag(BTHREAD_TAG_DEFAULT) {
 }
 
@@ -282,18 +282,10 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* 
acception) {
         options.fd = in_fd;
         butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
         options.user = acception->user();
+        options.need_on_edge_trigger = true;
         options.force_ssl = am->_force_ssl;
         options.initial_ssl_ctx = am->_ssl_ctx;
-#if BRPC_WITH_RDMA
-        if (am->_use_rdma) {
-            options.on_edge_triggered_events = 
rdma::RdmaEndpoint::OnNewDataFromTcp;
-        } else {
-#else
-        {
-#endif
-            options.on_edge_triggered_events = InputMessenger::OnNewMessages;
-        }
-        options.use_rdma = am->_use_rdma;
+        options.socket_mode = am->_socket_mode;
         options.bthread_tag = am->_bthread_tag;
         if (Socket::Create(options, &socket_id) != 0) {
             LOG(ERROR) << "Fail to create Socket";
diff --git a/src/brpc/acceptor.h b/src/brpc/acceptor.h
index 69f632aa..77942bec 100644
--- a/src/brpc/acceptor.h
+++ b/src/brpc/acceptor.h
@@ -22,6 +22,7 @@
 #include "butil/synchronization/condition_variable.h"
 #include "butil/containers/flat_map.h"
 #include "brpc/input_messenger.h"
+#include "brpc/socket_mode.h"
 
 
 namespace brpc {
@@ -110,8 +111,8 @@ private:
     bool _force_ssl;
     std::shared_ptr<SocketSSLContext> _ssl_ctx;
 
-    // Whether to use rdma or not
-    bool _use_rdma;
+    // Choose to use a certain socket: 0 TCP, 1 RDMA
+    SocketMode _socket_mode;
 
     // Acceptor belongs to this tag
     bthread_tag_t _bthread_tag;
diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp
index a130f613..86124c25 100644
--- a/src/brpc/channel.cpp
+++ b/src/brpc/channel.cpp
@@ -37,6 +37,7 @@
 #include "brpc/details/usercode_backup_pool.h"       // TooManyUserCode
 #include "brpc/rdma/rdma_helper.h"
 #include "brpc/policy/esp_authenticator.h"
+#include "brpc/transport_factory.h"
 
 namespace brpc {
 
@@ -60,7 +61,7 @@ ChannelOptions::ChannelOptions()
     , connection_type(CONNECTION_TYPE_UNKNOWN)
     , succeed_without_server(true)
     , log_succeed_without_server(true)
-    , use_rdma(false)
+    , socket_mode(SOCKET_MODE_TCP)
     , auth(NULL)
     , backup_request_policy(NULL)
     , retry_policy(NULL)
@@ -130,7 +131,7 @@ static ChannelSignature ComputeChannelSignature(const 
ChannelOptions& opt) {
         } else {
             // All disabled ChannelSSLOptions are the same
         }
-        if (opt.use_rdma) {
+        if (opt.socket_mode == SOCKET_MODE_RDMA) {
             buf.append("|rdma");
         }
         butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
@@ -173,20 +174,6 @@ Channel::~Channel() {
     }
 }
 
-#if BRPC_WITH_RDMA
-static bool OptionsAvailableForRdma(const ChannelOptions* opt) {
-    if (opt->has_ssl_options()) {
-        LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
-        return false;
-    }
-    if (!rdma::SupportedByRdma(opt->protocol.name())) {
-        LOG(WARNING) << "Cannot use " << opt->protocol.name()
-                     << " over RDMA";
-        return false;
-    }
-    return true;
-}
-#endif
 
 int Channel::InitChannelOptions(const ChannelOptions* options) {
     if (options) {  // Override default options if user provided one.
@@ -201,19 +188,10 @@ int Channel::InitChannelOptions(const ChannelOptions* 
options) {
         _options.hc_option.health_check_path = FLAGS_health_check_path;
         _options.hc_option.health_check_timeout_ms = 
FLAGS_health_check_timeout_ms;
     }
-    if (_options.use_rdma) {
-#if BRPC_WITH_RDMA
-        if (!OptionsAvailableForRdma(&_options)) {
-            return -1;
-        }
-        rdma::GlobalRdmaInitializeOrDie();
-        if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
-            return -1;
-        }
-#else
-        LOG(WARNING) << "Cannot use rdma since brpc does not compile with 
rdma";
+    auto ret = TransportFactory::ContextInitOrDie(_options.socket_mode, false, 
&_options);
+    if (ret != 0) {
+        LOG(ERROR) << "Fail to initialize transport context for channel, ret=" 
<< ret;
         return -1;
-#endif
     }
 
     _serialize_request = protocol->serialize_request;
@@ -388,7 +366,7 @@ int Channel::InitSingle(const butil::EndPoint& 
server_addr_and_port,
     SocketOptions opt;
     opt.local_side = client_endpoint;
     opt.initial_ssl_ctx = ssl_ctx;
-    opt.use_rdma = _options.use_rdma;
+    opt.socket_mode = _options.socket_mode;
     opt.hc_option = _options.hc_option;
     opt.device_name = _options.device_name;
     if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
@@ -436,7 +414,7 @@ int Channel::Init(const char* ns_url,
     GetNamingServiceThreadOptions ns_opt;
     ns_opt.succeed_without_server = _options.succeed_without_server;
     ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
-    ns_opt.socket_option.use_rdma = _options.use_rdma;
+    ns_opt.socket_option.socket_mode = _options.socket_mode;
     ns_opt.channel_signature = ComputeChannelSignature(_options);
     ns_opt.socket_option.hc_option =  _options.hc_option;
     ns_opt.socket_option.local_side = client_endpoint;
diff --git a/src/brpc/channel.h b/src/brpc/channel.h
index 0f349ac6..7c257c05 100644
--- a/src/brpc/channel.h
+++ b/src/brpc/channel.h
@@ -37,6 +37,7 @@
 #include "brpc/backup_request_policy.h"
 #include "brpc/naming_service_filter.h"
 #include "brpc/health_check_option.h"
+#include "brpc/socket_mode.h"
 
 namespace brpc {
 
@@ -105,9 +106,9 @@ struct ChannelOptions {
     const ChannelSSLOptions& ssl_options() const { return *_ssl_options; }
     ChannelSSLOptions* mutable_ssl_options();
 
-    // Let this channel use rdma rather than tcp.
-    // Default: false
-    bool use_rdma;
+    // Let this channel Choose to use a certain socket: 0 SOCKET_MODE_TCP, 1 
SOCKET_MODE_RDMA.
+    // Default: SOCKET_MODE_TCP
+    SocketMode socket_mode;
 
     // Turn on authentication for this channel if `auth' is not NULL.
     // Note `auth' will not be deleted by channel and must remain valid when
diff --git a/src/brpc/details/naming_service_thread.cpp 
b/src/brpc/details/naming_service_thread.cpp
index f882b225..7eb005e8 100644
--- a/src/brpc/details/naming_service_thread.cpp
+++ b/src/brpc/details/naming_service_thread.cpp
@@ -125,7 +125,7 @@ void NamingServiceThread::Actions::ResetServers(
         //       Socket. SocketMapKey may be passed through AddWatcher. Make 
sure
         //       to pick those Sockets with the right settings during 
OnAddedServers
         const SocketMapKey key(_added[i], _owner->_options.channel_signature);
-        CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, 
+        CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id,
                                     _owner->_options.socket_option));
         _added_sockets.push_back(tagged_id);
     }
diff --git a/src/brpc/details/naming_service_thread.h 
b/src/brpc/details/naming_service_thread.h
index 9acb8f29..f01fbea6 100644
--- a/src/brpc/details/naming_service_thread.h
+++ b/src/brpc/details/naming_service_thread.h
@@ -27,6 +27,7 @@
 #include "brpc/naming_service.h"                // NamingService
 #include "brpc/naming_service_filter.h"         // NamingServiceFilter
 #include "brpc/socket_map.h"
+#include "brpc/socket_mode.h"
 
 namespace brpc {
 
@@ -45,7 +46,7 @@ struct GetNamingServiceThreadOptions {
     GetNamingServiceThreadOptions()
         : succeed_without_server(false)
         , log_succeed_without_server(true) {
-    socket_option.use_rdma = false;
+    socket_option.socket_mode = SOCKET_MODE_TCP;
 }
     
     bool succeed_without_server;
diff --git a/src/brpc/input_message_base.h b/src/brpc/input_message_base.h
index 86b25785..b117eb99 100644
--- a/src/brpc/input_message_base.h
+++ b/src/brpc/input_message_base.h
@@ -55,6 +55,7 @@ private:
 friend class InputMessenger;
 friend void* ProcessInputMessage(void*);
 friend class Stream;
+friend class Transport;
     int64_t _received_us;
     int64_t _base_real_us;
     SocketUniquePtr _socket;
diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index 1b8a86f2..925c8776 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -29,7 +29,7 @@
 #include "brpc/protocol.h"                 // ListProtocols
 #include "brpc/rdma/rdma_endpoint.h"
 #include "brpc/input_messenger.h"
-
+#include "brpc/transport_factory.h"
 
 namespace brpc {
 
@@ -112,8 +112,7 @@ ParseResult InputMessenger::CutInputMessage(
                     // The length of `data' must be PROTO_DUMMY_LEN + 1 to 
store extra ending char '\0'
                     char data[PROTO_DUMMY_LEN + 1];
                     m->_read_buf.copy_to_cstr(data, PROTO_DUMMY_LEN);
-                    if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0 &&
-                        m->_rdma_state == Socket::RDMA_OFF) {
+                    if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0) {
                         // To avoid timeout when client uses RDMA but server 
uses TCP
                         return MakeParseError(PARSE_ERROR_TRY_OTHERS);
                     }
@@ -191,46 +190,13 @@ struct RunLastMessage {
     }
 };
 
-static void QueueMessage(InputMessageBase* to_run_msg,
-                         int* num_bthread_created,
-                         bthread_keytable_pool_t* keytable_pool) {
-    if (!to_run_msg) {
-        return;
-    }
-
-#if BRPC_WITH_RDMA
-    if (rdma::FLAGS_rdma_disable_bthread) {
-        ProcessInputMessage(to_run_msg);
-        return;
-    }
-#endif
-    // Create bthread for last_msg. The bthread is not scheduled
-    // until bthread_flush() is called (in the worse case).
-                
-    // TODO(gejun): Join threads.
-    bthread_t th;
-    bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
-                          BTHREAD_ATTR_PTHREAD :
-                          BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
-    tmp.keytable_pool = keytable_pool;
-    tmp.tag = bthread_self_tag();
-    bthread_attr_set_name(&tmp, "ProcessInputMessage");
-    
-    if (!FLAGS_usercode_in_coroutine && bthread_start_background(
-            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
-        ++*num_bthread_created;
-    } else {
-        ProcessInputMessage(to_run_msg);
-    }
-}
-
-InputMessenger::InputMessageClosure::~InputMessageClosure() noexcept(false) {
+InputMessageClosure::~InputMessageClosure() noexcept(false) {
     if (_msg) {
         ProcessInputMessage(_msg);
     }
 }
 
-void InputMessenger::InputMessageClosure::reset(InputMessageBase* m) {
+void InputMessageClosure::reset(InputMessageBase* m) {
     if (_msg) {
         ProcessInputMessage(_msg);
     }
@@ -303,7 +269,7 @@ int InputMessenger::ProcessNewMessage(
         // This unique_ptr prevents msg to be lost before transfering
         // ownership to last_msg
         DestroyingPtr<InputMessageBase> msg(pr.message());
-        QueueMessage(last_msg.release(), &num_bthread_created, 
m->_keytable_pool);
+        m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
         if (_handlers[index].process == NULL) {
             LOG(ERROR) << "process of index=" << index << " is NULL";
             continue;
@@ -336,22 +302,19 @@ int InputMessenger::ProcessNewMessage(
             // Transfer ownership to last_msg
             last_msg.reset(msg.release());
         } else {
-            QueueMessage(msg.release(), &num_bthread_created,
-                                m->_keytable_pool);
+            last_msg.reset(msg.release());
+            m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
             bthread_flush();
             num_bthread_created = 0;
         }
     }
-#if BRPC_WITH_RDMA
     // In RDMA polling mode, all messages must be executed in a new bthread and
     // not in the bthread where the polling bthread is located, because the
     // method for processing messages may call synchronization primitives,
     // causing the polling bthread to be scheduled out.
-    if (rdma::FLAGS_rdma_use_polling) {
-        QueueMessage(last_msg.release(), &num_bthread_created,
-                     m->_keytable_pool);
+    if (m->_socket_mode == SOCKET_MODE_RDMA) {
+        m->_transport->QueueMessage(last_msg, &num_bthread_created, true);
     }
-#endif
     if (num_bthread_created) {
         bthread_flush();
     }
@@ -414,8 +377,7 @@ void InputMessenger::OnNewMessages(Socket* m) {
             }
         }
 
-        if (m->_rdma_state == Socket::RDMA_OFF && messenger->ProcessNewMessage(
-                    m, nr, read_eof, received_us, base_realtime, last_msg) < 
0) {
+        if (messenger->ProcessNewMessage(m, nr, read_eof, received_us, 
base_realtime, last_msg) < 0) {
             return;
         } 
     }
@@ -533,16 +495,7 @@ int InputMessenger::Create(const butil::EndPoint& 
remote_side,
 
 int InputMessenger::Create(SocketOptions options, SocketId* id) {
     options.user = this;
-#if BRPC_WITH_RDMA
-    if (options.use_rdma) {
-        options.on_edge_triggered_events = 
rdma::RdmaEndpoint::OnNewDataFromTcp;
-        options.app_connect = std::make_shared<rdma::RdmaConnect>();
-    } else {
-#else
-    {
-#endif
-        options.on_edge_triggered_events = OnNewMessages;
-    }
+    options.need_on_edge_trigger = true;
     // Enable keepalive by options or Gflag.
     // Priority: options > Gflag.
     if (options.keepalive_options || FLAGS_socket_keepalive) {
diff --git a/src/brpc/input_messenger.h b/src/brpc/input_messenger.h
index 1c191a87..8482c3f3 100644
--- a/src/brpc/input_messenger.h
+++ b/src/brpc/input_messenger.h
@@ -29,7 +29,7 @@ namespace brpc {
 namespace rdma {
 class RdmaEndpoint;
 }
-
+class TcpTransport;
 struct InputMessageHandler {
     // The callback to cut a message from `source'.
     // Returned message will be passed to process_request or process_response
@@ -70,9 +70,28 @@ struct InputMessageHandler {
     const char* name;
 };
 
+class InputMessageClosure {
+public:
+    InputMessageClosure() : _msg(NULL) { }
+    ~InputMessageClosure() noexcept(false);
+
+    InputMessageBase* release() {
+        InputMessageBase* m = _msg;
+        _msg = NULL;
+        return m;
+    }
+
+    void reset(InputMessageBase* m);
+
+private:
+    InputMessageBase* _msg;
+};
+
 // Process messages from connections.
 // `Message' corresponds to a client's request or a server's response.
 class InputMessenger : public SocketUser {
+friend class Socket;
+friend class TcpTransport;
 friend class rdma::RdmaEndpoint;
 public:
     explicit InputMessenger(size_t capacity = 128);
@@ -111,22 +130,6 @@ protected:
     static void OnNewMessages(Socket* m);
     
 private:
-    class InputMessageClosure {
-    public:
-        InputMessageClosure() : _msg(NULL) { }
-        ~InputMessageClosure() noexcept(false);
-
-        InputMessageBase* release() {
-            InputMessageBase* m = _msg;
-            _msg = NULL;
-            return m;
-        }
-
-        void reset(InputMessageBase* m);
-
-    private:
-        InputMessageBase* _msg;
-    };
 
     // Find a valid scissor from `handlers' to cut off `header' and `payload'
     // from m->read_buf, save index of the scissor into `index'.
diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 616ef332..3cc2107f 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -30,6 +30,7 @@
 #include "brpc/rdma/block_pool.h"
 #include "brpc/rdma/rdma_helper.h"
 #include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma_transport.h"
 
 DECLARE_int32(task_group_ntags);
 
@@ -239,14 +240,15 @@ void RdmaEndpoint::Reset() {
 void RdmaConnect::StartConnect(const Socket* socket,
                                void (*done)(int err, void* data),
                                void* data) {
-    CHECK(socket->_rdma_ep != NULL);
+    auto* rdma_transport = 
static_cast<RdmaTransport*>(socket->_transport.get());
+    CHECK(rdma_transport->_rdma_ep != NULL);
     SocketUniquePtr s;
     if (Socket::Address(socket->id(), &s) != 0) {
         return;
     }
     if (!IsRdmaAvailable()) {
-        socket->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
-        s->_rdma_state = Socket::RDMA_OFF;
+        rdma_transport->_rdma_ep->_state = RdmaEndpoint::FALLBACK_TCP;
+        rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
         done(0, data);
         return;
     }
@@ -256,7 +258,7 @@ void RdmaConnect::StartConnect(const Socket* socket,
     bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
     bthread_attr_set_name(&attr, "RdmaProcessHandshakeAtClient");
     if (bthread_start_background(&tid, &attr,
-                RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) 
{
+                RdmaEndpoint::ProcessHandshakeAtClient, 
rdma_transport->_rdma_ep) < 0) {
         LOG(FATAL) << "Fail to start handshake bthread";
         Run();
     } else {
@@ -299,7 +301,8 @@ static void TryReadOnTcpDuringRdmaEst(Socket* s) {
 }
 
 void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
-    RdmaEndpoint* ep = m->_rdma_ep;
+    auto* rdma_transport = static_cast<RdmaTransport*>(m->_transport.get());
+    RdmaEndpoint* ep = rdma_transport->GetRdmaEp();
     CHECK(ep != NULL);
 
     int progress = Socket::PROGRESS_INIT;
@@ -308,7 +311,7 @@ void RdmaEndpoint::OnNewDataFromTcp(Socket* m) {
             if (!m->CreatedByConnect()) {
                 if (!IsRdmaAvailable()) {
                     ep->_state = FALLBACK_TCP;
-                    m->_rdma_state = Socket::RDMA_OFF;
+                    rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
                     continue;
                 }
                 bthread_t tid;
@@ -433,9 +436,10 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
 
     // First initialize CQ and QP resources
     ep->_state = C_ALLOC_QPCQ;
+    auto* rdma_transport = static_cast<RdmaTransport*>(s->_transport.get());
     if (ep->AllocateResources() < 0) {
         LOG(WARNING) << "Fallback to tcp:" << s->description();
-        s->_rdma_state = Socket::RDMA_OFF;
+        rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
         ep->_state = FALLBACK_TCP;
         return NULL;
     }
@@ -514,7 +518,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
     if (!HelloNegotiationValid(remote_msg)) {
         LOG(WARNING) << "Fail to negotiate with server, fallback to tcp:"
                      << s->description();
-        s->_rdma_state = Socket::RDMA_OFF;
+        rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
     } else {
         ep->_remote_recv_block_size = remote_msg.block_size;
         ep->_local_window_capacity = 
@@ -530,16 +534,16 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
         ep->_state = C_BRINGUP_QP;
         if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 
0) {
             LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << 
s->description();
-            s->_rdma_state = Socket::RDMA_OFF;
+            rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
         } else {
-            s->_rdma_state = Socket::RDMA_ON;
+            rdma_transport->_rdma_state = RdmaTransport::RDMA_ON;
         }
     }
 
     // Send ACK message to server
     ep->_state = C_ACK_SEND;
     uint32_t flags = 0;
-    if (s->_rdma_state != Socket::RDMA_OFF) {
+    if (rdma_transport->_rdma_state != RdmaTransport::RDMA_OFF) {
         flags |= ACK_MSG_RDMA_OK;
     }
     uint32_t* tmp = (uint32_t*)data;  // avoid GCC warning on strict-aliasing
@@ -553,7 +557,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
         return NULL;
     }
 
-    if (s->_rdma_state == Socket::RDMA_ON) {
+    if (rdma_transport->_rdma_state == RdmaTransport::RDMA_ON) {
         ep->_state = ESTABLISHED;
         LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
             << "Client handshake ends (use rdma) on " << s->description();
@@ -586,7 +590,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
         ep->_state = FAILED;
         return NULL;
     }
-
+    auto* rdma_transport = static_cast<RdmaTransport*>(s->_transport.get());
     if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) {
         LOG_IF(INFO, FLAGS_rdma_trace_verbose) << "It seems that the "
             << "client does not use RDMA, fallback to TCP:"
@@ -594,7 +598,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
         // we need to copy data read back to _socket->_read_buf
         s->_read_buf.append(data, MAGIC_STR_LEN);
         ep->_state = FALLBACK_TCP;
-        s->_rdma_state = Socket::RDMA_OFF;
+        rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
         ep->TryReadOnTcp();
         return NULL;
     }
@@ -626,7 +630,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
     if (!HelloNegotiationValid(remote_msg)) {
         LOG(WARNING) << "Fail to negotiate with client, fallback to tcp:"
                      << s->description();
-        s->_rdma_state = Socket::RDMA_OFF;
+        rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
     } else {
         ep->_remote_recv_block_size = remote_msg.block_size;
         ep->_local_window_capacity = 
@@ -643,13 +647,13 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
         if (ep->AllocateResources() < 0) {
             LOG(WARNING) << "Fail to allocate rdma resources, fallback to tcp:"
                          << s->description();
-            s->_rdma_state = Socket::RDMA_OFF;
+            rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
         } else {
             ep->_state = S_BRINGUP_QP;
             if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, 
remote_msg.qp_num) < 0) {
                 LOG(WARNING) << "Fail to bringup QP, fallback to tcp:"
                              << s->description();
-                s->_rdma_state = Socket::RDMA_OFF;
+                rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
             }
         }
     }
@@ -658,7 +662,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
     ep->_state = S_HELLO_SEND;
     HelloMessage local_msg;
     local_msg.msg_len = g_rdma_hello_msg_len;
-    if (s->_rdma_state == Socket::RDMA_OFF) {
+    if (rdma_transport->_rdma_state == RdmaTransport::RDMA_OFF) {
         local_msg.impl_ver = 0;
         local_msg.hello_ver = 0;
     } else {
@@ -702,7 +706,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
     uint32_t* tmp = (uint32_t*)data;  // avoid GCC warning on strict-aliasing
     uint32_t flags = butil::NetToHost32(*tmp);
     if (flags & ACK_MSG_RDMA_OK) {
-        if (s->_rdma_state == Socket::RDMA_OFF) {
+        if (rdma_transport->_rdma_state == RdmaTransport::RDMA_OFF) {
             LOG(WARNING) << "Fail to parse Hello Message length from client:"
                          << s->description();
             s->SetFailed(EPROTO, "Fail to complete rdma handshake from %s: %s",
@@ -710,13 +714,13 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
             ep->_state = FAILED;
             return NULL;
         } else {
-            s->_rdma_state = Socket::RDMA_ON;
+            rdma_transport->_rdma_state = RdmaTransport::RDMA_ON;
             ep->_state = ESTABLISHED;
             LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
                 << "Server handshake ends (use rdma) on " << s->description();
         }
     } else {
-        s->_rdma_state = Socket::RDMA_OFF;
+        rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF;
         ep->_state = FALLBACK_TCP;
         LOG_IF(INFO, FLAGS_rdma_trace_verbose) 
             << "Server handshake ends (use tcp) on " << s->description();
@@ -1455,7 +1459,8 @@ void RdmaEndpoint::PollCq(Socket* m) {
     if (Socket::Address(ep->_socket->id(), &s) < 0) {
         return;
     }
-    CHECK(ep == s->_rdma_ep);
+    auto* rdma_transport = static_cast<RdmaTransport*>(s->_transport.get());
+    CHECK(ep == rdma_transport->_rdma_ep);
 
     bool send = false;
     ibv_cq* cq = ep->_resource->recv_cq;
@@ -1472,7 +1477,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
 
     int progress = Socket::PROGRESS_INIT;
     bool notified = false;
-    InputMessenger::InputMessageClosure last_msg;
+    InputMessageClosure last_msg;
     ibv_wc wc[FLAGS_rdma_cqe_poll_once];
     while (true) {
         int cnt = ibv_poll_cq(cq, FLAGS_rdma_cqe_poll_once, wc);
diff --git a/src/brpc/rdma_transport.cpp b/src/brpc/rdma_transport.cpp
new file mode 100644
index 00000000..d980c5a0
--- /dev/null
+++ b/src/brpc/rdma_transport.cpp
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if BRPC_WITH_RDMA
+
+#include "brpc/rdma_transport.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma/rdma_endpoint.h"
+#include "brpc/rdma/rdma_helper.h"
+
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector *g_vars;
+
+void RdmaTransport::Init(Socket *socket, const SocketOptions &options) {
+    CHECK(_rdma_ep == NULL);
+    if (options.socket_mode == SOCKET_MODE_RDMA) {
+        _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket);
+        if (!_rdma_ep) {
+            const int saved_errno = errno;
+            PLOG(ERROR) << "Fail to create RdmaEndpoint";
+            socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+                                                  berror(saved_errno));
+        }
+        _rdma_state = RDMA_UNKNOWN;
+    } else {
+        _rdma_state = RDMA_OFF;
+        socket->_socket_mode = SOCKET_MODE_TCP;
+    }
+    _socket = socket;
+    _default_connect = options.app_connect;
+    _on_edge_trigger = options.on_edge_triggered_events;
+    if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+        _on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
+    }
+    _tcp_transport = std::make_shared<TcpTransport>();
+    _tcp_transport->Init(socket, options);
+}
+
+void RdmaTransport::Release() {
+    if (_rdma_ep) {
+        delete _rdma_ep;
+        _rdma_ep = NULL;
+        _rdma_state = RDMA_UNKNOWN;
+    }
+}
+
+int RdmaTransport::Reset(int32_t expected_nref) {
+    if (_rdma_ep) {
+        _rdma_ep->Reset();
+        _rdma_state = RDMA_UNKNOWN;
+    }
+    return 0;
+}
+
+std::shared_ptr<AppConnect> RdmaTransport::Connect() {
+    if (_default_connect == nullptr) {
+        return  std::make_shared<rdma::RdmaConnect>();
+    }
+    return _default_connect;
+}
+
+int RdmaTransport::CutFromIOBuf(butil::IOBuf *buf) {
+    if (_rdma_ep && _rdma_state != RDMA_OFF) {
+        butil::IOBuf *data_arr[1] = {buf};
+        return _rdma_ep->CutFromIOBufList(data_arr, 1);
+    } else {
+        return _tcp_transport->CutFromIOBuf(buf);
+    }
+}
+
+ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, size_t ndata) {
+    if (_rdma_ep && _rdma_state != RDMA_OFF) {
+        return _rdma_ep->CutFromIOBufList(buf, ndata);
+    }
+    return _tcp_transport->CutFromIOBufList(buf, ndata);
+}
+
+int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
+                                    bool pollin, const timespec duetime) {
+    if (_rdma_state == RDMA_ON) {
+        const int expected_val = _epollout_butex
+                ->load(butil::memory_order_acquire);
+        CHECK(_rdma_ep != NULL);
+        if (!_rdma_ep->IsWritable()) {
+            g_vars->nwaitepollout << 1;
+            if (bthread::butex_wait(_epollout_butex, expected_val, &duetime) < 
0) {
+                if (errno != EAGAIN && errno != ETIMEDOUT) {
+                    const int saved_errno = errno;
+                    PLOG(WARNING) << "Fail to wait rdma window of " << _socket;
+                    _socket->SetFailed(saved_errno,
+                                "Fail to wait rdma window of %s: %s",
+                                _socket->description().c_str(),
+                                berror(saved_errno));
+                }
+                if (_socket->Failed()) {
+                    // NOTE:
+                    // Different from TCP, we cannot find the RDMA channel
+                    // failed by writing to it. Thus we must check if it
+                    // is already failed here.
+                    return 1;
+                }
+            }
+        }
+    } else {
+        return _tcp_transport->WaitEpollOut(_epollout_butex, pollin, duetime);
+    }
+    return 0;
+}
+
+void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
+    bthread_t tid;
+    if (FLAGS_usercode_in_coroutine) {
+        OnEdge(_socket);
+    } else if (rdma::FLAGS_rdma_edisp_unsched == false) {
+        auto rc = bthread_start_background(&tid, &attr, OnEdge, _socket);
+        if (rc != 0) {
+            LOG(FATAL) << "Fail to start ProcessEvent";
+            OnEdge(_socket);
+        }
+    } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+        LOG(FATAL) << "Fail to start ProcessEvent";
+        OnEdge(_socket);
+    }
+}
+
+void RdmaTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {
+    if (last_msg && !rdma::FLAGS_rdma_use_polling) {
+        return;
+    }
+    InputMessageBase* to_run_msg = input_msg.release();
+    if (!to_run_msg) {
+        return;
+    }
+
+    if (rdma::FLAGS_rdma_disable_bthread) {
+        ProcessInputMessage(to_run_msg);
+        return;
+    }
+    // Create bthread for last_msg. The bthread is not scheduled
+    // until bthread_flush() is called (in the worse case).
+
+    // TODO(gejun): Join threads.
+    bthread_t th;
+    bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
+                                      BTHREAD_ATTR_PTHREAD :
+                                                                    
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+    tmp.keytable_pool = _socket->keytable_pool();
+    tmp.tag = bthread_self_tag();
+    bthread_attr_set_name(&tmp, "ProcessInputMessage");
+
+    if (!FLAGS_usercode_in_coroutine && bthread_start_background(
+            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
+        ++*num_bthread_created;
+    } else {
+        ProcessInputMessage(to_run_msg);
+    }
+}
+
+void RdmaTransport::Debug(std::ostream &os) {
+    if (_rdma_state == RDMA_ON && _rdma_ep) {
+        _rdma_ep->DebugInfo(os);
+    }
+}
+
+int RdmaTransport::ContextInitOrDie(bool serverOrNot, const void* _options) {
+    if (serverOrNot) {
+        if (!OptionsAvailableOverRdma(static_cast<const ServerOptions 
*>(_options))) {
+            return -1;
+        }
+        rdma::GlobalRdmaInitializeOrDie();
+        if (!rdma::InitPollingModeWithTag(static_cast<const ServerOptions 
*>(_options)->bthread_tag)) {
+            return -1;
+        }
+    } else {
+        if (!OptionsAvailableForRdma(static_cast<const ChannelOptions 
*>(_options))) {
+            return -1;
+        }
+        rdma::GlobalRdmaInitializeOrDie();
+        if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
+            return -1;
+        }
+        return 0;
+    }
+
+    return 0;
+}
+
+bool RdmaTransport::OptionsAvailableForRdma(const ChannelOptions* opt) {
+    if (opt->has_ssl_options()) {
+        LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
+        return false;
+    }
+    if (!rdma::SupportedByRdma(opt->protocol.name())) {
+        LOG(WARNING) << "Cannot use " << opt->protocol.name()
+                     << " over RDMA";
+        return false;
+    }
+    return true;
+}
+
+bool RdmaTransport::OptionsAvailableOverRdma(const ServerOptions* opt) {
+    if (opt->rtmp_service) {
+        LOG(WARNING) << "RTMP is not supported by RDMA";
+        return false;
+    }
+    if (opt->has_ssl_options()) {
+        LOG(WARNING) << "SSL is not supported by RDMA";
+        return false;
+    }
+    if (opt->nshead_service) {
+        LOG(WARNING) << "NSHEAD is not supported by RDMA";
+        return false;
+    }
+    if (opt->mongo_service_adaptor) {
+        LOG(WARNING) << "MONGO is not supported by RDMA";
+        return false;
+    }
+    return true;
+}
+}
+#endif
\ No newline at end of file
diff --git a/src/brpc/rdma_transport.h b/src/brpc/rdma_transport.h
new file mode 100644
index 00000000..7e62edff
--- /dev/null
+++ b/src/brpc/rdma_transport.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_RDMA_TRANSPORT_H
+#define BRPC_RDMA_TRANSPORT_H
+
+#if BRPC_WITH_RDMA
+#include "brpc/socket.h"
+#include "brpc/channel.h"
+#include "brpc/transport.h"
+
+namespace brpc {
+class RdmaTransport : public Transport {
+    friend class TransportFactory;
+    friend class rdma::RdmaEndpoint;
+    friend class rdma::RdmaConnect;
+public:
+    void Init(Socket* socket, const SocketOptions& options) override;
+    void Release() override;
+    int Reset(int32_t expected_nref) override;
+    std::shared_ptr<AppConnect> Connect() override;
+    int CutFromIOBuf(butil::IOBuf* buf) override;
+    ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) override;
+    int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const 
timespec duetime) override;
+    void ProcessEvent(bthread_attr_t attr) override;
+    void QueueMessage(InputMessageClosure& inputMsg, int* num_bthread_created, 
bool last_msg) override;
+    void Debug(std::ostream &os) override;
+    rdma::RdmaEndpoint* GetRdmaEp() {
+        CHECK(_rdma_ep != NULL);
+        return _rdma_ep;
+    }
+    static int ContextInitOrDie(bool serverOrNot, const void* _options);
+private:
+    static bool OptionsAvailableForRdma(const ChannelOptions* opt);
+    static bool OptionsAvailableOverRdma(const ServerOptions* opt);
+private:
+    // The on/off state of RDMA
+    enum RdmaState {
+        RDMA_ON,
+        RDMA_OFF,
+        RDMA_UNKNOWN
+    };
+    // The RdmaEndpoint
+    rdma::RdmaEndpoint* _rdma_ep = NULL;
+    // Should use RDMA or not
+    RdmaState _rdma_state;
+    std::shared_ptr<TcpTransport>  _tcp_transport;
+};
+}
+#endif // BRPC_WITH_RDMA
+#endif //BRPC_RDMA_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 8e2368bc..9470220d 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -81,6 +81,7 @@
 #include "brpc/details/tcmalloc_extension.h"
 #include "brpc/rdma/rdma_helper.h"
 #include "brpc/baidu_master_service.h"
+#include "brpc/transport_factory.h"
 
 inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {
     const char old_fill = os.fill();
@@ -146,7 +147,7 @@ ServerOptions::ServerOptions()
     , internal_port(-1)
     , has_builtin_services(true)
     , force_ssl(false)
-    , use_rdma(false)
+    , socket_mode(SOCKET_MODE_TCP)
     , baidu_master_service(NULL)
     , http_master_service(NULL)
     , health_reporter(NULL)
@@ -772,27 +773,6 @@ bool Server::CreateConcurrencyLimiter(const 
AdaptiveMaxConcurrency& amc,
     return true;
 }
 
-#if BRPC_WITH_RDMA
-static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
-    if (opt->rtmp_service) {
-        LOG(WARNING) << "RTMP is not supported by RDMA";
-        return false;
-    }
-    if (opt->has_ssl_options()) {
-        LOG(WARNING) << "SSL is not supported by RDMA";
-        return false;
-    }
-    if (opt->nshead_service) {
-        LOG(WARNING) << "NSHEAD is not supported by RDMA";
-        return false;
-    }
-    if (opt->mongo_service_adaptor) {
-        LOG(WARNING) << "MONGO is not supported by RDMA";
-        return false;
-    }
-    return true;
-}
-#endif
 
 static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0);
 static bool g_default_ignore_eovercrowded(false);
@@ -889,20 +869,10 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
                    << FLAGS_task_group_ntags << ")";
         return -1;
     }
-
-    if (_options.use_rdma) {
-#if BRPC_WITH_RDMA
-        if (!OptionsAvailableOverRdma(&_options)) {
-            return -1;
-        }
-        rdma::GlobalRdmaInitializeOrDie();
-        if (!rdma::InitPollingModeWithTag(_options.bthread_tag)) {
-            return -1;
-        }
-#else
-        LOG(WARNING) << "Cannot use rdma since brpc does not compile with 
rdma";
+    int ret = TransportFactory::ContextInitOrDie(_options.socket_mode, true, 
&_options);
+    if (ret != 0) {
+        LOG(ERROR) << "Fail to initialize transport context for server, ret=" 
<< ret;
         return -1;
-#endif
     }
 
     if (_options.http_master_service) {
@@ -1170,7 +1140,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
                 LOG(ERROR) << "Fail to build acceptor";
                 return -1;
             }
-            _am->_use_rdma = _options.use_rdma;
+            _am->_socket_mode = _options.socket_mode;
             _am->_bthread_tag = _options.bthread_tag;
         }
         // Set `_status' to RUNNING before accepting connections
diff --git a/src/brpc/server.h b/src/brpc/server.h
index c262375c..9f69a834 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -45,6 +45,7 @@
 #include "brpc/concurrency_limiter.h"
 #include "brpc/baidu_master_service.h"
 #include "brpc/rpc_pb_message_factory.h"
+#include "brpc/socket_mode.h"
 
 namespace brpc {
 
@@ -223,9 +224,9 @@ struct ServerOptions {
     // Force ssl for all connections of the port to Start().
     bool force_ssl;
 
-    // Whether the server uses rdma or not
-    // Default: false
-    bool use_rdma;
+    // the server socket mode uses tcp or rdma or other
+    // Default: SOCKET_MODE_TCP
+    SocketMode socket_mode;
 
     // [CAUTION] This option is for implementing specialized baidu-std proxies,
     // most users don't need it. Don't change this option unless you fully
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index e431acef..9b14d430 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -50,8 +50,7 @@
 #include "brpc/policy/rtmp_protocol.h"  // FIXME
 #include "brpc/periodic_task.h"
 #include "brpc/details/health_check.h"
-#include "brpc/rdma/rdma_endpoint.h"
-#include "brpc/rdma/rdma_helper.h"
+#include "brpc/transport_factory.h"
 #if defined(OS_MACOSX)
 #include <sys/event.h>
 #endif
@@ -456,6 +455,7 @@ Socket::Socket(Forbidden f)
     , _tos(0)
     , _reset_fd_real_us(-1)
     , _on_edge_triggered_events(NULL)
+    , _need_on_edge_trigger(false)
     , _user(NULL)
     , _conn(NULL)
     , _preferred_index(-1)
@@ -473,8 +473,8 @@ Socket::Socket(Forbidden f)
     , _auth_context(NULL)
     , _ssl_state(SSL_UNKNOWN)
     , _ssl_session(NULL)
-    , _rdma_ep(NULL)
-    , _rdma_state(RDMA_OFF)
+    , _socket_mode(SOCKET_MODE_TCP)
+    , _transport(nullptr)
     , _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
     , _controller_released_socket(false)
     , _overcrowded(false)
@@ -601,7 +601,7 @@ int Socket::ResetFileDescriptor(int fd) {
 
     SetSocketOptions(fd);
 
-    if (_on_edge_triggered_events) {
+    if (_transport->HasOnEdgeTrigger()) {
         if (_io_event.AddConsumer(fd) != 0) {
             PLOG(ERROR) << "Fail to add SocketId=" << id() 
                         << " into EventDispatcher";
@@ -721,6 +721,11 @@ int Socket::OnCreated(const SocketOptions& options) {
     auto guard = butil::MakeScopeGuard([this] {
         _io_event.Reset();
     });
+    // start build the transport
+    _socket_mode = options.socket_mode;
+    _transport = TransportFactory::CreateTransport(options.socket_mode);
+    CHECK(NULL != _transport);
+    _transport->Init(this, options);
 
     g_vars->nsocket << 1;
     CHECK(NULL == _shared_part.load(butil::memory_order_relaxed));
@@ -731,9 +736,10 @@ int Socket::OnCreated(const SocketOptions& options) {
     _local_side = options.local_side;
     _device_name = options.device_name;
     _on_edge_triggered_events = options.on_edge_triggered_events;
+    _need_on_edge_trigger = options.need_on_edge_trigger;
     _user = options.user;
     _conn = options.conn;
-    _app_connect = options.app_connect;
+    _app_connect = _transport->Connect();
     _preferred_index = -1;
     _hc_count = 0;
     CHECK(_read_buf.empty());
@@ -757,22 +763,6 @@ int Socket::OnCreated(const SocketOptions& options) {
     _ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
     _ssl_session = NULL;
     _ssl_ctx = options.initial_ssl_ctx;
-#if BRPC_WITH_RDMA
-    CHECK(_rdma_ep == NULL);
-    if (options.use_rdma) {
-        _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(this);
-        if (!_rdma_ep) {
-            const int saved_errno = errno;
-            PLOG(ERROR) << "Fail to create RdmaEndpoint";
-            SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
-                         berror(saved_errno));
-            return -1;
-        }
-        _rdma_state = RDMA_UNKNOWN;
-    } else {
-        _rdma_state = RDMA_OFF;
-    }
-#endif
     _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
     _controller_released_socket.store(false, butil::memory_order_relaxed);
     _overcrowded = false;
@@ -852,7 +842,7 @@ void Socket::BeforeRecycled() {
     };
     const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
     if (ValidFileDescriptor(prev_fd)) {
-        if (_on_edge_triggered_events != NULL) {
+        if (_transport->HasOnEdgeTrigger()) {
             _io_event.RemoveConsumer(prev_fd);
         }
         close(prev_fd);
@@ -860,15 +850,7 @@ void Socket::BeforeRecycled() {
             g_vars->channel_conn << -1;
         }
     }
-
-#if BRPC_WITH_RDMA
-    if (_rdma_ep) {
-        delete _rdma_ep;
-        _rdma_ep = NULL;
-        _rdma_state = RDMA_UNKNOWN;
-    }
-#endif
-
+    _transport->Release();
     reset_parsing_context(NULL);
     _read_buf.clear();
 
@@ -1013,7 +995,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
     // It's safe to close previous fd (provided expected_nref is correct).
     const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
     if (ValidFileDescriptor(prev_fd)) {
-        if (_on_edge_triggered_events != NULL) {
+        if (_transport->HasOnEdgeTrigger()) {
             _io_event.RemoveConsumer(prev_fd);
         }
         close(prev_fd);
@@ -1021,13 +1003,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
             g_vars->channel_conn << -1;
         }
     }
-
-#if BRPC_WITH_RDMA
-    if (_rdma_ep) {
-        _rdma_ep->Reset();
-        _rdma_state = RDMA_UNKNOWN;
-    }
-#endif
+    _transport->Reset(expected_nref);
 
     _local_side = butil::EndPoint();
     if (_ssl_session) {
@@ -1181,13 +1157,6 @@ int Socket::Status(SocketId id, int32_t* nref) {
     return -1;
 }
 
-void* Socket::ProcessEvent(void* arg) {
-    // the enclosed Socket is valid and free to access inside this function.
-    SocketUniquePtr s(static_cast<Socket*>(arg));
-    s->_on_edge_triggered_events(s.get());
-    return NULL;
-}
-
 // Check if there're new requests appended.
 // If yes, point old_head to reversed new requests and return false;
 // If no:
@@ -1771,16 +1740,7 @@ int Socket::StartWrite(WriteRequest* req, const 
WriteOptions& opt) {
         butil::IOBuf* data_arr[1] = { &req->data };
         nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
     } else {
-#if BRPC_WITH_RDMA
-        if (_rdma_ep && _rdma_state != RDMA_OFF) {
-            butil::IOBuf* data_arr[1] = { &req->data };
-            nw = _rdma_ep->CutFromIOBufList(data_arr, 1);
-        } else {
-#else
-        {
-#endif
-            nw = req->data.cut_into_file_descriptor(fd());
-        }
+        nw = _transport->CutFromIOBuf(&req->data);
     }
     if (nw < 0) {
         // RTMP may return EOVERCROWDED
@@ -1882,45 +1842,11 @@ void* Socket::KeepWrite(void* void_arg) {
             // which may turn on _overcrowded to stop pending requests from
             // growing infinitely.
             const timespec duetime =
-                butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
-#if BRPC_WITH_RDMA
-            if (s->_rdma_state == RDMA_ON) {
-                const int expected_val = s->_epollout_butex
-                    ->load(butil::memory_order_acquire);
-                CHECK(s->_rdma_ep != NULL);
-                if (!s->_rdma_ep->IsWritable()) {
-                    g_vars->nwaitepollout << 1;
-                    if (bthread::butex_wait(s->_epollout_butex,
-                            expected_val, &duetime) < 0) {
-                        if (errno != EAGAIN && errno != ETIMEDOUT) {
-                            const int saved_errno = errno;
-                            PLOG(WARNING) << "Fail to wait rdma window of " << 
*s;
-                            s->SetFailed(saved_errno, "Fail to wait rdma 
window of %s: %s",
-                                    s->description().c_str(), 
berror(saved_errno));
-                        }
-                        if (s->Failed()) {
-                            // NOTE:
-                            // Different from TCP, we cannot find the RDMA 
channel
-                            // failed by writing to it. Thus we must check if 
it
-                            // is already failed here.
-                            break;
-                        }
-                    }
-                }
-            } else {
-#else
-            {
-#endif
-                g_vars->nwaitepollout << 1;
-                bool pollin = (s->_on_edge_triggered_events != NULL);
-                const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
-                if (rc < 0 && errno != ETIMEDOUT) {
-                    const int saved_errno = errno;
-                    PLOG(WARNING) << "Fail to wait epollout of " << *s;
-                    s->SetFailed(saved_errno, "Fail to wait epollout of %s: 
%s",
-                             s->description().c_str(), berror(saved_errno));
-                    break;
-                }
+                                
butil::milliseconds_from_now(WAIT_EPOLLOUT_TIMEOUT_MS);
+            bool pollin = s->_transport->HasOnEdgeTrigger();
+            int ret = s->_transport->WaitEpollOut(s->_epollout_butex, pollin, 
duetime);
+            if (ret == 1) {
+                break;
             }
         }
         if (NULL == cur_tail) {
@@ -1960,13 +1886,7 @@ ssize_t Socket::DoWrite(WriteRequest* req) {
         if (_conn) {
             return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
         } else {
-#if BRPC_WITH_RDMA
-            if (_rdma_ep && _rdma_state != RDMA_OFF) {
-                return _rdma_ep->CutFromIOBufList(data_list, ndata);
-            }
-#endif
-            return butil::IOBuf::cut_multiple_into_file_descriptor(
-                fd(), data_list, ndata);
+            return _transport->CutFromIOBufList(data_list, ndata);
         }
     }
 
@@ -2155,7 +2075,6 @@ ssize_t Socket::DoRead(size_t size_hint) {
             errno = ESSL;
             return -1;
         }
-        CHECK(_rdma_state == RDMA_OFF);
         return _read_buf.append_from_file_descriptor(fd(), size_hint);
     }
 
@@ -2257,7 +2176,7 @@ int Socket::OnInputEvent(void* user_data, uint32_t events,
     if (Address(id, &s) < 0) {
         return -1;
     }
-    if (NULL == s->_on_edge_triggered_events) {
+    if (!s->_transport->HasOnEdgeTrigger()) {
         // Callback can be NULL when receiving error epoll events
         // (Added into epoll by `WaitConnected')
         return 0;
@@ -2283,28 +2202,15 @@ int Socket::OnInputEvent(void* user_data, uint32_t 
events,
         // is just 1500~1700/s
         g_vars->neventthread << 1;
 
-        bthread_t tid;
         // transfer ownership as well, don't use s anymore!
         Socket* const p = s.release();
 
         bthread_attr_t attr = thread_attr;
         attr.keytable_pool = p->_keytable_pool;
         attr.tag = bthread_self_tag();
-        bthread_attr_set_name(&attr, "ProcessEvent");
-        if (FLAGS_usercode_in_coroutine) {
-            ProcessEvent(p);
-#if BRPC_WITH_RDMA
-        } else if (rdma::FLAGS_rdma_edisp_unsched) {
-            auto rc = bthread_start_background(&tid, &attr, ProcessEvent, p);
-            if (rc != 0) {
-                LOG(FATAL) << "Fail to start ProcessEvent";
-                ProcessEvent(p);
-            }
-#endif
-        } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
-            LOG(FATAL) << "Fail to start ProcessEvent";
-            ProcessEvent(p);
-        }
+        // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY
+        attr.flags = attr.flags & (~BTHREAD_GLOBAL_PRIORITY);
+        p->_transport->ProcessEvent(attr);
     }
     return 0;
 }
@@ -2606,11 +2512,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
            << "\n}";
     }
 #endif
-#if BRPC_WITH_RDMA
-    if (ptr->_rdma_state == RDMA_ON && ptr->_rdma_ep) {
-        ptr->_rdma_ep->DebugInfo(os);
-    }
-#endif
+    ptr->_transport->Debug(os);
     { os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); }
 }
 
@@ -2833,10 +2735,11 @@ int Socket::GetPooledSocket(SocketUniquePtr* 
pooled_socket) {
         opt.local_side = butil::EndPoint(local_side().ip, 0);
         opt.user = user();
         opt.on_edge_triggered_events = _on_edge_triggered_events;
+        opt.need_on_edge_trigger = _need_on_edge_trigger;
         opt.initial_ssl_ctx = _ssl_ctx;
         opt.keytable_pool = _keytable_pool;
         opt.app_connect = _app_connect;
-        opt.use_rdma =  (_rdma_ep) ? true : false;
+        opt.socket_mode = _socket_mode;
         socket_pool = new SocketPool(opt);
         SocketPool* expected = NULL;
         if (!main_sp->socket_pool.compare_exchange_strong(
@@ -2935,10 +2838,11 @@ int Socket::GetShortSocket(SocketUniquePtr* 
short_socket) {
     opt.local_side = butil::EndPoint(local_side().ip, 0);
     opt.user = user();
     opt.on_edge_triggered_events = _on_edge_triggered_events;
+    opt.need_on_edge_trigger = _need_on_edge_trigger;
     opt.initial_ssl_ctx = _ssl_ctx;
     opt.keytable_pool = _keytable_pool;
     opt.app_connect = _app_connect;
-    opt.use_rdma =  (_rdma_ep) ? true : false;
+    opt.socket_mode = _socket_mode;
     if (get_client_side_messenger()->Create(opt, &id) != 0 ||
         Address(id, short_socket) != 0) {
         return -1;
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index a3e23230..c2f751e3 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -42,6 +42,7 @@
 #include "brpc/event_dispatcher.h"
 #include "brpc/versioned_ref_with_id.h"
 #include "brpc/health_check_option.h"
+#include "brpc/socket_mode.h"
 
 namespace brpc {
 namespace policy {
@@ -61,6 +62,7 @@ class Socket;
 class AuthContext;
 class EventDispatcher;
 class Stream;
+class Transport;
 
 // A special closure for processing the about-to-recycle socket. Socket does
 // not delete SocketUser, if you want, `delete this' at the end of
@@ -268,11 +270,20 @@ struct SocketOptions {
     // until new data arrives. The callback will not be called from more than
     // one thread at any time.
     void (*on_edge_triggered_events)(Socket*){NULL};
+    // Indicates that this socket requires an edge-triggered event handler even
+    // if `on_edge_triggered_events` is left as NULL by the caller. When this
+    // flag is true and `on_edge_triggered_events` is NULL, the underlying
+    // transport-specific implementation (e.g. a transport subclass) is allowed
+    // to install a suitable default `on_edge_triggered_events` callback on
+    // behalf of the user. Typical usage is by transports/protocols that rely
+    // on edge-triggered I/O semantics but want the framework to provide the
+    // actual event handler.
+    bool need_on_edge_trigger{false};
     int health_check_interval_s{-1};
     // Only accept ssl connection.
     bool force_ssl{false};
     std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
-    bool use_rdma{false};
+    SocketMode socket_mode{SOCKET_MODE_TCP};
     bthread_keytable_pool_t* keytable_pool{NULL};
     SocketConnection* conn{NULL};
     std::shared_ptr<AppConnect> app_connect;
@@ -313,6 +324,10 @@ friend class policy::H2GlobalStreamCreator;
 friend class VersionedRefWithId<Socket>;
 friend class IOEvent<Socket>;
 friend void DereferenceSocket(Socket*);
+friend class Transport;
+friend class TcpTransport;
+friend class RdmaTransport;
+friend class TransportFactory;
     class SharedPart;
     struct WriteRequest;
 
@@ -650,13 +665,6 @@ public:
 private:
     DISALLOW_COPY_AND_ASSIGN(Socket);
 
-    // The on/off state of RDMA
-    enum RdmaState {
-        RDMA_ON,
-        RDMA_OFF,
-        RDMA_UNKNOWN
-    };
-
     int ConductError(bthread_id_t);
     int StartWrite(WriteRequest*, const WriteOptions&);
 
@@ -732,7 +740,6 @@ private:
     // Wait until nref hits `expected_nref' and reset some internal resources.
     int WaitAndReset(int32_t expected_nref);
 
-    static void* ProcessEvent(void*);
 
     static void* KeepWrite(void*);
 
@@ -839,7 +846,7 @@ private:
     // of EventDispatcher::AddConsumer (event_dispatcher.h)
     // carefully before implementing the callback.
     void (*_on_edge_triggered_events)(Socket*);
-
+    bool _need_on_edge_trigger;
     // A set of callbacks to monitor important events of this socket.
     // Initialized by SocketOptions.user
     SocketUser* _user;
@@ -918,10 +925,9 @@ private:
     SSL* _ssl_session;               // owner
     std::shared_ptr<SocketSSLContext> _ssl_ctx;
 
-    // The RdmaEndpoint
-    rdma::RdmaEndpoint* _rdma_ep;
-    // Should use RDMA or not
-    RdmaState _rdma_state;
+    // Should use SOCKET_MODE_RDMA or SOCKET_MODE_TCP or Other, default is 
SOCKET_MODE_TCP Transport
+    SocketMode _socket_mode{SOCKET_MODE_TCP};
+    std::shared_ptr<Transport> _transport;
 
     // Pass from controller, for progressive reading.
     ConnectionType _connection_type_for_progressive_read;
diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h
index 7cf08804..b1922bf8 100644
--- a/src/brpc/socket_map.h
+++ b/src/brpc/socket_map.h
@@ -84,12 +84,12 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id,
 
 inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
                     const std::shared_ptr<SocketSSLContext>& ssl_ctx,
-                    bool use_rdma,
+                    SocketMode socket_mode,
                     const HealthCheckOption& hc_option) {
     SocketOptions opt;
     opt.remote_side = key.peer.addr;
     opt.initial_ssl_ctx = ssl_ctx;
-    opt.use_rdma = use_rdma;
+    opt.socket_mode = socket_mode;
     opt.hc_option = hc_option;
     return SocketMapInsert(key, id, opt);
 }
@@ -97,13 +97,13 @@ inline int SocketMapInsert(const SocketMapKey& key, 
SocketId* id,
 inline int SocketMapInsert(const SocketMapKey& key, SocketId* id,
                     const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
     HealthCheckOption hc_option;
-    return SocketMapInsert(key, id, ssl_ctx, false, hc_option);
+    return SocketMapInsert(key, id, ssl_ctx, SOCKET_MODE_TCP, hc_option);
 }
 
 inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) {
     std::shared_ptr<SocketSSLContext> empty_ptr;
     HealthCheckOption hc_option;
-    return SocketMapInsert(key, id, empty_ptr, false, hc_option);
+    return SocketMapInsert(key, id, empty_ptr, SOCKET_MODE_TCP, hc_option);
 }
 
 // Find the SocketId associated with `key'.
@@ -164,12 +164,12 @@ public:
     int Init(const SocketMapOptions&);
     int Insert(const SocketMapKey& key, SocketId* id,
                const std::shared_ptr<SocketSSLContext>& ssl_ctx,
-               bool use_rdma,
+               SocketMode socket_mode,
                const HealthCheckOption& hc_option) {
         SocketOptions opt;
         opt.remote_side = key.peer.addr;
         opt.initial_ssl_ctx = ssl_ctx;
-        opt.use_rdma = use_rdma;
+        opt.socket_mode = socket_mode;
         opt.hc_option = hc_option;
         return Insert(key, id, opt);
 }
@@ -177,12 +177,12 @@ public:
     int Insert(const SocketMapKey& key, SocketId* id,
                const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
         HealthCheckOption hc_option;
-        return Insert(key, id, ssl_ctx, false, hc_option);   
+        return Insert(key, id, ssl_ctx, SOCKET_MODE_TCP, hc_option);
     }
     int Insert(const SocketMapKey& key, SocketId* id) {
         std::shared_ptr<SocketSSLContext> empty_ptr;
         HealthCheckOption hc_option;
-        return Insert(key, id, empty_ptr, false, hc_option);
+        return Insert(key, id, empty_ptr, SOCKET_MODE_TCP, hc_option);
     }
     int Insert(const SocketMapKey& key, SocketId* id, SocketOptions& opt);
 
diff --git a/src/brpc/socket_mode.h b/src/brpc/socket_mode.h
new file mode 100644
index 00000000..8bce0189
--- /dev/null
+++ b/src/brpc/socket_mode.h
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COMMON_H
+#define BRPC_COMMON_H
+namespace brpc {
+enum SocketMode {
+    SOCKET_MODE_TCP = 0,
+    SOCKET_MODE_RDMA = 1
+};
+}
+#endif //BRPC_COMMON_H
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.cpp b/src/brpc/tcp_transport.cpp
new file mode 100644
index 00000000..49c6f68d
--- /dev/null
+++ b/src/brpc/tcp_transport.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "tcp_transport.h"
+namespace brpc {
+DECLARE_bool(usercode_in_coroutine);
+DECLARE_bool(usercode_in_pthread);
+
+extern SocketVarsCollector* g_vars;
+
+void TcpTransport::Init(Socket* socket, const SocketOptions& options) {
+    _socket = socket;
+    _default_connect = options.app_connect;
+    _on_edge_trigger = options.on_edge_triggered_events;
+    if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
+        _on_edge_trigger = InputMessenger::OnNewMessages;
+    }
+}
+
+void TcpTransport::Release(){}
+
+int TcpTransport::Reset(int32_t expected_nref) {
+    return 0;
+}
+
+int TcpTransport::CutFromIOBuf(butil::IOBuf* buf) {
+    return buf->cut_into_file_descriptor(_socket->fd());
+}
+
+std::shared_ptr<AppConnect> TcpTransport::Connect() {
+    return _default_connect;
+}
+
+ssize_t TcpTransport::CutFromIOBufList(butil::IOBuf** buf, size_t ndata) {
+    return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf, 
ndata);
+}
+
+int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex, bool 
pollin, const timespec duetime) {
+    g_vars->nwaitepollout << 1;
+    const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
+    if (rc < 0 && errno != ETIMEDOUT) {
+        const int saved_errno = errno;
+        PLOG(WARNING) << "Fail to wait epollout of " << _socket;
+        _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
+                                         _socket->description().c_str(), 
berror(saved_errno));
+        return 1;
+    }
+    return 0;
+}
+
+void TcpTransport::ProcessEvent(bthread_attr_t attr) {
+    bthread_t tid;
+    if (FLAGS_usercode_in_coroutine) {
+        OnEdge(_socket);
+    } else if (bthread_start_urgent(&tid, &attr, OnEdge, _socket) != 0) {
+        LOG(FATAL) << "Fail to start ProcessEvent";
+        OnEdge(_socket);
+    }
+}
+void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {
+    InputMessageBase* to_run_msg = input_msg.release();
+    if (!to_run_msg) {
+        return;
+    }
+    // Create bthread for last_msg. The bthread is not scheduled
+    // until bthread_flush() is called (in the worse case).
+    bthread_t th;
+    bthread_attr_t tmp = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : 
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+    tmp.keytable_pool = _socket->keytable_pool();
+    tmp.tag = bthread_self_tag();
+    bthread_attr_set_name(&tmp, "ProcessInputMessage");
+    if (!FLAGS_usercode_in_coroutine && bthread_start_background(
+            &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
+        ++*num_bthread_created;
+    } else {
+        ProcessInputMessage(to_run_msg);
+    }
+}
+void TcpTransport::Debug(std::ostream &os) {}
+}
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.h b/src/brpc/tcp_transport.h
new file mode 100644
index 00000000..b8c6b5e6
--- /dev/null
+++ b/src/brpc/tcp_transport.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TCP_TRANSPORT_H
+#define BRPC_TCP_TRANSPORT_H
+
+#include "brpc/transport.h"
+#include "brpc/socket.h"
+
+namespace brpc {
+class TcpTransport : public Transport {
+    friend class TransportFactory;
+public:
+    void Init(Socket* socket, const SocketOptions& options) override;
+    void Release() override;
+    int Reset(int32_t expected_nref) override;
+    std::shared_ptr<AppConnect> Connect() override;
+    int CutFromIOBuf(butil::IOBuf* buf) override;
+    ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) override;
+    int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const 
timespec duetime) override;
+    void ProcessEvent(bthread_attr_t attr) override;
+    void QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) override;
+    void Debug(std::ostream &os) override;
+};
+}
+
+#endif //BRPC_TCP_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/transport.h b/src/brpc/transport.h
new file mode 100644
index 00000000..ca898508
--- /dev/null
+++ b/src/brpc/transport.h
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TRANSPORT_H
+#define BRPC_TRANSPORT_H
+#include "brpc/input_messenger.h"
+#include "brpc/socket.h"
+#include "server.h"
+
+namespace brpc {
+using OnEdgeTrigger = std::function<void (Socket*)>;
+class Transport {
+    friend class TransportFactory;
+public:
+    static void* OnEdge(void* arg) {
+        // the enclosed Socket is valid and free to access inside this 
function.
+        SocketUniquePtr s(static_cast<Socket*>(arg));
+        const OnEdgeTrigger on_edge_trigger = 
s->_transport->GetOnEdgeTrigger();
+        on_edge_trigger(s.get());
+        return NULL;
+    }
+
+    static void* ProcessInputMessage(void* void_arg) {
+        InputMessageBase* msg = static_cast<InputMessageBase*>(void_arg);
+        msg->_process(msg);
+        return NULL;
+    }
+    virtual ~Transport() = default;
+    virtual void Init(Socket* socket, const SocketOptions& options) = 0;
+    virtual void Release() = 0;
+    virtual int Reset(int32_t expected_nref) = 0;
+    virtual std::shared_ptr<AppConnect> Connect() = 0;
+    virtual int CutFromIOBuf(butil::IOBuf* buf) = 0;
+    virtual ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) = 0;
+    virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, 
const timespec duetime) = 0;
+    virtual void ProcessEvent(bthread_attr_t attr) = 0;
+    virtual void QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) = 0;
+    virtual void Debug(std::ostream &os) = 0;
+
+    bool HasOnEdgeTrigger() {
+        return _on_edge_trigger != NULL;
+    }
+    OnEdgeTrigger GetOnEdgeTrigger() {
+        return _on_edge_trigger;
+    }
+protected:
+    Socket* _socket;
+    std::shared_ptr<AppConnect> _default_connect;
+    OnEdgeTrigger _on_edge_trigger;
+};
+}
+#endif //BRPC_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/transport_factory.cpp b/src/brpc/transport_factory.cpp
new file mode 100644
index 00000000..b29a5e6d
--- /dev/null
+++ b/src/brpc/transport_factory.cpp
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transport_factory.h"
+#include "brpc/tcp_transport.h"
+#include "brpc/rdma_transport.h"
+namespace brpc {
+int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, 
const void* _options) {
+    if (mode == SOCKET_MODE_TCP) {
+        return 0;
+    }
+#if BRPC_WITH_RDMA
+    else if (mode == SOCKET_MODE_RDMA) {
+        return RdmaTransport::ContextInitOrDie(serverOrNot, _options);
+    }
+#endif
+    else {
+        LOG(ERROR) << "unknown transport type  " << mode;
+        return 1;
+    }
+}
+
+std::shared_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
+    if (mode == SOCKET_MODE_TCP) {
+        return std::unique_ptr<TcpTransport>(new TcpTransport());
+    }
+#if BRPC_WITH_RDMA
+    else if (mode == SOCKET_MODE_RDMA) {
+        return std::unique_ptr<RdmaTransport>(new RdmaTransport());
+    }
+#endif
+    else {
+        LOG(ERROR) << "socket_mode set error";
+        return nullptr;
+    }
+}
+} // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/transport_factory.h b/src/brpc/transport_factory.h
new file mode 100644
index 00000000..bdbf4c2b
--- /dev/null
+++ b/src/brpc/transport_factory.h
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_TRANSPORT_FACTORY_H
+#define BRPC_TRANSPORT_FACTORY_H
+
+#include "brpc/errno.pb.h"
+#include "brpc/socket_mode.h"
+#include "brpc/transport.h"
+
+#if BRPC_WITH_RDMA
+BAIDU_REGISTER_ERRNO(brpc::ERDMA, "RDMA verbs error");
+BAIDU_REGISTER_ERRNO(brpc::ERDMAMEM, "Memory not registered for RDMA");
+#endif
+
+namespace brpc {
+// TransportFactory to create transport instance with socket_mode {TCP, RDMA}
+class TransportFactory {
+public:
+    static int ContextInitOrDie(SocketMode mode, bool serverOrNot, const void* 
_options);
+    // create transport instance with socket mode
+    static std::shared_ptr<Transport> CreateTransport(SocketMode mode);
+};
+}
+
+#endif //BRPC_TRANSPORT_FACTORY_H
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to