This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new c8753e32 Fix some issues with transport (#3206)
c8753e32 is described below

commit c8753e32fb48caf452a15905076ef9a840ac8e5d
Author: Bright Chen <[email protected]>
AuthorDate: Thu Jan 29 10:13:08 2026 +0800

    Fix some issues with transport (#3206)
    
    1. The return value of CreateTransport should be std::unique_ptr.
    2. Delete BAIDU_REGISTER_ERRNO in transport_factory.h.
    3. Optimize some code formatting.
---
 src/brpc/input_messenger.cpp   |  3 ++-
 src/brpc/rdma_transport.cpp    | 18 +++++++++---------
 src/brpc/rdma_transport.h      |  2 +-
 src/brpc/socket.cpp            |  1 -
 src/brpc/socket.h              |  4 ++--
 src/brpc/socket_mode.h         |  8 ++++----
 src/brpc/tcp_transport.cpp     | 19 ++++++++++++-------
 src/brpc/tcp_transport.h       |  6 +++---
 src/brpc/transport.h           |  2 +-
 src/brpc/transport_factory.cpp |  5 +++--
 src/brpc/transport_factory.h   | 12 +++---------
 11 files changed, 40 insertions(+), 40 deletions(-)

diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index 925c8776..c249cca2 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -377,7 +377,8 @@ void InputMessenger::OnNewMessages(Socket* m) {
             }
         }
 
-        if (messenger->ProcessNewMessage(m, nr, read_eof, received_us, 
base_realtime, last_msg) < 0) {
+        if (messenger->ProcessNewMessage(m, nr, read_eof, received_us,
+                                         base_realtime, last_msg) < 0) {
             return;
         } 
     }
diff --git a/src/brpc/rdma_transport.cpp b/src/brpc/rdma_transport.cpp
index d980c5a0..8fe88c6b 100644
--- a/src/brpc/rdma_transport.cpp
+++ b/src/brpc/rdma_transport.cpp
@@ -35,8 +35,8 @@ void RdmaTransport::Init(Socket *socket, const SocketOptions 
&options) {
         if (!_rdma_ep) {
             const int saved_errno = errno;
             PLOG(ERROR) << "Fail to create RdmaEndpoint";
-            socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
-                                                  berror(saved_errno));
+            socket->SetFailed(
+                saved_errno, "Fail to create RdmaEndpoint: %s", 
berror(saved_errno));
         }
         _rdma_state = RDMA_UNKNOWN;
     } else {
@@ -95,8 +95,7 @@ ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, 
size_t ndata) {
 int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
                                     bool pollin, const timespec duetime) {
     if (_rdma_state == RDMA_ON) {
-        const int expected_val = _epollout_butex
-                ->load(butil::memory_order_acquire);
+        const int expected_val = 
_epollout_butex->load(butil::memory_order_acquire);
         CHECK(_rdma_ep != NULL);
         if (!_rdma_ep->IsWritable()) {
             g_vars->nwaitepollout << 1;
@@ -105,9 +104,9 @@ int RdmaTransport::WaitEpollOut(butil::atomic<int> 
*_epollout_butex,
                     const int saved_errno = errno;
                     PLOG(WARNING) << "Fail to wait rdma window of " << _socket;
                     _socket->SetFailed(saved_errno,
-                                "Fail to wait rdma window of %s: %s",
-                                _socket->description().c_str(),
-                                berror(saved_errno));
+                                       "Fail to wait rdma window of %s: %s",
+                                       _socket->description().c_str(),
+                                       berror(saved_errno));
                 }
                 if (_socket->Failed()) {
                     // NOTE:
@@ -140,7 +139,8 @@ void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
     }
 }
 
-void RdmaTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {
+void RdmaTransport::QueueMessage(InputMessageClosure& input_msg,
+                                 int* num_bthread_created, bool last_msg) {
     if (last_msg && !rdma::FLAGS_rdma_use_polling) {
         return;
     }
@@ -234,5 +234,5 @@ bool RdmaTransport::OptionsAvailableOverRdma(const 
ServerOptions* opt) {
     }
     return true;
 }
-}
+} // namespace brpc
 #endif
\ No newline at end of file
diff --git a/src/brpc/rdma_transport.h b/src/brpc/rdma_transport.h
index 7e62edff..65ae88f7 100644
--- a/src/brpc/rdma_transport.h
+++ b/src/brpc/rdma_transport.h
@@ -60,6 +60,6 @@ private:
     RdmaState _rdma_state;
     std::shared_ptr<TcpTransport>  _tcp_transport;
 };
-}
+} // namespace brpc
 #endif // BRPC_WITH_RDMA
 #endif //BRPC_RDMA_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 9b14d430..b132f2ac 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -474,7 +474,6 @@ Socket::Socket(Forbidden f)
     , _ssl_state(SSL_UNKNOWN)
     , _ssl_session(NULL)
     , _socket_mode(SOCKET_MODE_TCP)
-    , _transport(nullptr)
     , _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
     , _controller_released_socket(false)
     , _overcrowded(false)
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index c2f751e3..816fccdf 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -926,8 +926,8 @@ private:
     std::shared_ptr<SocketSSLContext> _ssl_ctx;
 
     // Should use SOCKET_MODE_RDMA or SOCKET_MODE_TCP or Other, default is 
SOCKET_MODE_TCP Transport
-    SocketMode _socket_mode{SOCKET_MODE_TCP};
-    std::shared_ptr<Transport> _transport;
+    SocketMode _socket_mode;
+    std::unique_ptr<Transport> _transport;
 
     // Pass from controller, for progressive reading.
     ConnectionType _connection_type_for_progressive_read;
diff --git a/src/brpc/socket_mode.h b/src/brpc/socket_mode.h
index 8bce0189..b5d42be4 100644
--- a/src/brpc/socket_mode.h
+++ b/src/brpc/socket_mode.h
@@ -15,12 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef BRPC_COMMON_H
-#define BRPC_COMMON_H
+#ifndef BRPC_SOCKET_MODE_H
+#define BRPC_SOCKET_MODE_H
 namespace brpc {
 enum SocketMode {
     SOCKET_MODE_TCP = 0,
     SOCKET_MODE_RDMA = 1
 };
-}
-#endif //BRPC_COMMON_H
\ No newline at end of file
+} // namespace brpc
+#endif //BRPC_SOCKET_MODE_H
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.cpp b/src/brpc/tcp_transport.cpp
index 49c6f68d..37db7a89 100644
--- a/src/brpc/tcp_transport.cpp
+++ b/src/brpc/tcp_transport.cpp
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "tcp_transport.h"
+#include "brpc/tcp_transport.h"
+
 namespace brpc {
 DECLARE_bool(usercode_in_coroutine);
 DECLARE_bool(usercode_in_pthread);
@@ -49,14 +50,15 @@ ssize_t TcpTransport::CutFromIOBufList(butil::IOBuf** buf, 
size_t ndata) {
     return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf, 
ndata);
 }
 
-int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex, bool 
pollin, const timespec duetime) {
+int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex,
+                               bool pollin, timespec duetime) {
     g_vars->nwaitepollout << 1;
     const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
     if (rc < 0 && errno != ETIMEDOUT) {
         const int saved_errno = errno;
         PLOG(WARNING) << "Fail to wait epollout of " << _socket;
         _socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
-                                         _socket->description().c_str(), 
berror(saved_errno));
+                           _socket->description().c_str(), 
berror(saved_errno));
         return 1;
     }
     return 0;
@@ -71,7 +73,8 @@ void TcpTransport::ProcessEvent(bthread_attr_t attr) {
         OnEdge(_socket);
     }
 }
-void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) {
+void TcpTransport::QueueMessage(InputMessageClosure& input_msg,
+                                int* num_bthread_created, bool) {
     InputMessageBase* to_run_msg = input_msg.release();
     if (!to_run_msg) {
         return;
@@ -79,7 +82,9 @@ void TcpTransport::QueueMessage(InputMessageClosure& 
input_msg, int* num_bthread
     // Create bthread for last_msg. The bthread is not scheduled
     // until bthread_flush() is called (in the worse case).
     bthread_t th;
-    bthread_attr_t tmp = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : 
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+    bthread_attr_t tmp =
+        (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : 
BTHREAD_ATTR_NORMAL) |
+        BTHREAD_NOSIGNAL;
     tmp.keytable_pool = _socket->keytable_pool();
     tmp.tag = bthread_self_tag();
     bthread_attr_set_name(&tmp, "ProcessInputMessage");
@@ -90,5 +95,5 @@ void TcpTransport::QueueMessage(InputMessageClosure& 
input_msg, int* num_bthread
         ProcessInputMessage(to_run_msg);
     }
 }
-void TcpTransport::Debug(std::ostream &os) {}
-}
\ No newline at end of file
+
+} // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.h b/src/brpc/tcp_transport.h
index b8c6b5e6..8a06a85d 100644
--- a/src/brpc/tcp_transport.h
+++ b/src/brpc/tcp_transport.h
@@ -31,11 +31,11 @@ public:
     std::shared_ptr<AppConnect> Connect() override;
     int CutFromIOBuf(butil::IOBuf* buf) override;
     ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) override;
-    int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const 
timespec duetime) override;
+    int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, 
timespec duetime) override;
     void ProcessEvent(bthread_attr_t attr) override;
     void QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) override;
-    void Debug(std::ostream &os) override;
+    void Debug(std::ostream &os) override {}
 };
-}
+} // namespace brpc
 
 #endif //BRPC_TCP_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/transport.h b/src/brpc/transport.h
index ca898508..a2cb868b 100644
--- a/src/brpc/transport.h
+++ b/src/brpc/transport.h
@@ -46,7 +46,7 @@ public:
     virtual std::shared_ptr<AppConnect> Connect() = 0;
     virtual int CutFromIOBuf(butil::IOBuf* buf) = 0;
     virtual ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) = 0;
-    virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, 
const timespec duetime) = 0;
+    virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, 
timespec duetime) = 0;
     virtual void ProcessEvent(bthread_attr_t attr) = 0;
     virtual void QueueMessage(InputMessageClosure& input_msg, int* 
num_bthread_created, bool last_msg) = 0;
     virtual void Debug(std::ostream &os) = 0;
diff --git a/src/brpc/transport_factory.cpp b/src/brpc/transport_factory.cpp
index b29a5e6d..b689e2ed 100644
--- a/src/brpc/transport_factory.cpp
+++ b/src/brpc/transport_factory.cpp
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "transport_factory.h"
+#include "brpc/transport_factory.h"
 #include "brpc/tcp_transport.h"
 #include "brpc/rdma_transport.h"
+
 namespace brpc {
 int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, 
const void* _options) {
     if (mode == SOCKET_MODE_TCP) {
@@ -34,7 +35,7 @@ int TransportFactory::ContextInitOrDie(SocketMode mode, bool 
serverOrNot, const
     }
 }
 
-std::shared_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
+std::unique_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
     if (mode == SOCKET_MODE_TCP) {
         return std::unique_ptr<TcpTransport>(new TcpTransport());
     }
diff --git a/src/brpc/transport_factory.h b/src/brpc/transport_factory.h
index bdbf4c2b..d933a130 100644
--- a/src/brpc/transport_factory.h
+++ b/src/brpc/transport_factory.h
@@ -18,23 +18,17 @@
 #ifndef BRPC_TRANSPORT_FACTORY_H
 #define BRPC_TRANSPORT_FACTORY_H
 
-#include "brpc/errno.pb.h"
 #include "brpc/socket_mode.h"
 #include "brpc/transport.h"
 
-#if BRPC_WITH_RDMA
-BAIDU_REGISTER_ERRNO(brpc::ERDMA, "RDMA verbs error");
-BAIDU_REGISTER_ERRNO(brpc::ERDMAMEM, "Memory not registered for RDMA");
-#endif
-
 namespace brpc {
 // TransportFactory to create transport instance with socket_mode {TCP, RDMA}
 class TransportFactory {
 public:
     static int ContextInitOrDie(SocketMode mode, bool serverOrNot, const void* 
_options);
-    // create transport instance with socket mode
-    static std::shared_ptr<Transport> CreateTransport(SocketMode mode);
+    // Create transport instance with socket mode.
+    static std::unique_ptr<Transport> CreateTransport(SocketMode mode);
 };
-}
+} // namespace brpc
 
 #endif //BRPC_TRANSPORT_FACTORY_H
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to