This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new c8753e32 Fix some issues with transport (#3206)
c8753e32 is described below
commit c8753e32fb48caf452a15905076ef9a840ac8e5d
Author: Bright Chen <[email protected]>
AuthorDate: Thu Jan 29 10:13:08 2026 +0800
Fix some issues with transport (#3206)
1. The return value of CreateTransport should be std::unique_ptr.
2. Delete BAIDU_REGISTER_ERRNO in transport_factory.h.
3. Optimize some code formatting.
---
src/brpc/input_messenger.cpp | 3 ++-
src/brpc/rdma_transport.cpp | 18 +++++++++---------
src/brpc/rdma_transport.h | 2 +-
src/brpc/socket.cpp | 1 -
src/brpc/socket.h | 4 ++--
src/brpc/socket_mode.h | 8 ++++----
src/brpc/tcp_transport.cpp | 19 ++++++++++++-------
src/brpc/tcp_transport.h | 6 +++---
src/brpc/transport.h | 2 +-
src/brpc/transport_factory.cpp | 5 +++--
src/brpc/transport_factory.h | 12 +++---------
11 files changed, 40 insertions(+), 40 deletions(-)
diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index 925c8776..c249cca2 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -377,7 +377,8 @@ void InputMessenger::OnNewMessages(Socket* m) {
}
}
- if (messenger->ProcessNewMessage(m, nr, read_eof, received_us,
base_realtime, last_msg) < 0) {
+ if (messenger->ProcessNewMessage(m, nr, read_eof, received_us,
+ base_realtime, last_msg) < 0) {
return;
}
}
diff --git a/src/brpc/rdma_transport.cpp b/src/brpc/rdma_transport.cpp
index d980c5a0..8fe88c6b 100644
--- a/src/brpc/rdma_transport.cpp
+++ b/src/brpc/rdma_transport.cpp
@@ -35,8 +35,8 @@ void RdmaTransport::Init(Socket *socket, const SocketOptions
&options) {
if (!_rdma_ep) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to create RdmaEndpoint";
- socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
- berror(saved_errno));
+ socket->SetFailed(
+ saved_errno, "Fail to create RdmaEndpoint: %s",
berror(saved_errno));
}
_rdma_state = RDMA_UNKNOWN;
} else {
@@ -95,8 +95,7 @@ ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf,
size_t ndata) {
int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
bool pollin, const timespec duetime) {
if (_rdma_state == RDMA_ON) {
- const int expected_val = _epollout_butex
- ->load(butil::memory_order_acquire);
+ const int expected_val =
_epollout_butex->load(butil::memory_order_acquire);
CHECK(_rdma_ep != NULL);
if (!_rdma_ep->IsWritable()) {
g_vars->nwaitepollout << 1;
@@ -105,9 +104,9 @@ int RdmaTransport::WaitEpollOut(butil::atomic<int>
*_epollout_butex,
const int saved_errno = errno;
PLOG(WARNING) << "Fail to wait rdma window of " << _socket;
_socket->SetFailed(saved_errno,
- "Fail to wait rdma window of %s: %s",
- _socket->description().c_str(),
- berror(saved_errno));
+ "Fail to wait rdma window of %s: %s",
+ _socket->description().c_str(),
+ berror(saved_errno));
}
if (_socket->Failed()) {
// NOTE:
@@ -140,7 +139,8 @@ void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
}
}
-void RdmaTransport::QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) {
+void RdmaTransport::QueueMessage(InputMessageClosure& input_msg,
+ int* num_bthread_created, bool last_msg) {
if (last_msg && !rdma::FLAGS_rdma_use_polling) {
return;
}
@@ -234,5 +234,5 @@ bool RdmaTransport::OptionsAvailableOverRdma(const
ServerOptions* opt) {
}
return true;
}
-}
+} // namespace brpc
#endif
\ No newline at end of file
diff --git a/src/brpc/rdma_transport.h b/src/brpc/rdma_transport.h
index 7e62edff..65ae88f7 100644
--- a/src/brpc/rdma_transport.h
+++ b/src/brpc/rdma_transport.h
@@ -60,6 +60,6 @@ private:
RdmaState _rdma_state;
std::shared_ptr<TcpTransport> _tcp_transport;
};
-}
+} // namespace brpc
#endif // BRPC_WITH_RDMA
#endif //BRPC_RDMA_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 9b14d430..b132f2ac 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -474,7 +474,6 @@ Socket::Socket(Forbidden f)
, _ssl_state(SSL_UNKNOWN)
, _ssl_session(NULL)
, _socket_mode(SOCKET_MODE_TCP)
- , _transport(nullptr)
, _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
, _controller_released_socket(false)
, _overcrowded(false)
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index c2f751e3..816fccdf 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -926,8 +926,8 @@ private:
std::shared_ptr<SocketSSLContext> _ssl_ctx;
// Should use SOCKET_MODE_RDMA or SOCKET_MODE_TCP or Other, default is
SOCKET_MODE_TCP Transport
- SocketMode _socket_mode{SOCKET_MODE_TCP};
- std::shared_ptr<Transport> _transport;
+ SocketMode _socket_mode;
+ std::unique_ptr<Transport> _transport;
// Pass from controller, for progressive reading.
ConnectionType _connection_type_for_progressive_read;
diff --git a/src/brpc/socket_mode.h b/src/brpc/socket_mode.h
index 8bce0189..b5d42be4 100644
--- a/src/brpc/socket_mode.h
+++ b/src/brpc/socket_mode.h
@@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef BRPC_COMMON_H
-#define BRPC_COMMON_H
+#ifndef BRPC_SOCKET_MODE_H
+#define BRPC_SOCKET_MODE_H
namespace brpc {
enum SocketMode {
SOCKET_MODE_TCP = 0,
SOCKET_MODE_RDMA = 1
};
-}
-#endif //BRPC_COMMON_H
\ No newline at end of file
+} // namespace brpc
+#endif //BRPC_SOCKET_MODE_H
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.cpp b/src/brpc/tcp_transport.cpp
index 49c6f68d..37db7a89 100644
--- a/src/brpc/tcp_transport.cpp
+++ b/src/brpc/tcp_transport.cpp
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#include "tcp_transport.h"
+#include "brpc/tcp_transport.h"
+
namespace brpc {
DECLARE_bool(usercode_in_coroutine);
DECLARE_bool(usercode_in_pthread);
@@ -49,14 +50,15 @@ ssize_t TcpTransport::CutFromIOBufList(butil::IOBuf** buf,
size_t ndata) {
return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf,
ndata);
}
-int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex, bool
pollin, const timespec duetime) {
+int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex,
+ bool pollin, timespec duetime) {
g_vars->nwaitepollout << 1;
const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
if (rc < 0 && errno != ETIMEDOUT) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to wait epollout of " << _socket;
_socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
- _socket->description().c_str(),
berror(saved_errno));
+ _socket->description().c_str(),
berror(saved_errno));
return 1;
}
return 0;
@@ -71,7 +73,8 @@ void TcpTransport::ProcessEvent(bthread_attr_t attr) {
OnEdge(_socket);
}
}
-void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) {
+void TcpTransport::QueueMessage(InputMessageClosure& input_msg,
+ int* num_bthread_created, bool) {
InputMessageBase* to_run_msg = input_msg.release();
if (!to_run_msg) {
return;
@@ -79,7 +82,9 @@ void TcpTransport::QueueMessage(InputMessageClosure&
input_msg, int* num_bthread
// Create bthread for last_msg. The bthread is not scheduled
// until bthread_flush() is called (in the worse case).
bthread_t th;
- bthread_attr_t tmp = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
+ bthread_attr_t tmp =
+ (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) |
+ BTHREAD_NOSIGNAL;
tmp.keytable_pool = _socket->keytable_pool();
tmp.tag = bthread_self_tag();
bthread_attr_set_name(&tmp, "ProcessInputMessage");
@@ -90,5 +95,5 @@ void TcpTransport::QueueMessage(InputMessageClosure&
input_msg, int* num_bthread
ProcessInputMessage(to_run_msg);
}
}
-void TcpTransport::Debug(std::ostream &os) {}
-}
\ No newline at end of file
+
+} // namespace brpc
\ No newline at end of file
diff --git a/src/brpc/tcp_transport.h b/src/brpc/tcp_transport.h
index b8c6b5e6..8a06a85d 100644
--- a/src/brpc/tcp_transport.h
+++ b/src/brpc/tcp_transport.h
@@ -31,11 +31,11 @@ public:
std::shared_ptr<AppConnect> Connect() override;
int CutFromIOBuf(butil::IOBuf* buf) override;
ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) override;
- int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const
timespec duetime) override;
+ int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin,
timespec duetime) override;
void ProcessEvent(bthread_attr_t attr) override;
void QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) override;
- void Debug(std::ostream &os) override;
+ void Debug(std::ostream &os) override {}
};
-}
+} // namespace brpc
#endif //BRPC_TCP_TRANSPORT_H
\ No newline at end of file
diff --git a/src/brpc/transport.h b/src/brpc/transport.h
index ca898508..a2cb868b 100644
--- a/src/brpc/transport.h
+++ b/src/brpc/transport.h
@@ -46,7 +46,7 @@ public:
virtual std::shared_ptr<AppConnect> Connect() = 0;
virtual int CutFromIOBuf(butil::IOBuf* buf) = 0;
virtual ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) = 0;
- virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin,
const timespec duetime) = 0;
+ virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin,
timespec duetime) = 0;
virtual void ProcessEvent(bthread_attr_t attr) = 0;
virtual void QueueMessage(InputMessageClosure& input_msg, int*
num_bthread_created, bool last_msg) = 0;
virtual void Debug(std::ostream &os) = 0;
diff --git a/src/brpc/transport_factory.cpp b/src/brpc/transport_factory.cpp
index b29a5e6d..b689e2ed 100644
--- a/src/brpc/transport_factory.cpp
+++ b/src/brpc/transport_factory.cpp
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-#include "transport_factory.h"
+#include "brpc/transport_factory.h"
#include "brpc/tcp_transport.h"
#include "brpc/rdma_transport.h"
+
namespace brpc {
int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot,
const void* _options) {
if (mode == SOCKET_MODE_TCP) {
@@ -34,7 +35,7 @@ int TransportFactory::ContextInitOrDie(SocketMode mode, bool
serverOrNot, const
}
}
-std::shared_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
+std::unique_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
if (mode == SOCKET_MODE_TCP) {
return std::unique_ptr<TcpTransport>(new TcpTransport());
}
diff --git a/src/brpc/transport_factory.h b/src/brpc/transport_factory.h
index bdbf4c2b..d933a130 100644
--- a/src/brpc/transport_factory.h
+++ b/src/brpc/transport_factory.h
@@ -18,23 +18,17 @@
#ifndef BRPC_TRANSPORT_FACTORY_H
#define BRPC_TRANSPORT_FACTORY_H
-#include "brpc/errno.pb.h"
#include "brpc/socket_mode.h"
#include "brpc/transport.h"
-#if BRPC_WITH_RDMA
-BAIDU_REGISTER_ERRNO(brpc::ERDMA, "RDMA verbs error");
-BAIDU_REGISTER_ERRNO(brpc::ERDMAMEM, "Memory not registered for RDMA");
-#endif
-
namespace brpc {
// TransportFactory to create transport instance with socket_mode {TCP, RDMA}
class TransportFactory {
public:
static int ContextInitOrDie(SocketMode mode, bool serverOrNot, const void*
_options);
- // create transport instance with socket mode
- static std::shared_ptr<Transport> CreateTransport(SocketMode mode);
+ // Create transport instance with socket mode.
+ static std::unique_ptr<Transport> CreateTransport(SocketMode mode);
};
-}
+} // namespace brpc
#endif //BRPC_TRANSPORT_FACTORY_H
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]