This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new c7ae57aa Bugfix: SQ overflow (#3145)
c7ae57aa is described below
commit c7ae57aa310b9618911ef75c6361ba80a8e082f2
Author: Bright Chen <[email protected]>
AuthorDate: Mon Dec 22 19:00:47 2025 +0800
Bugfix: SQ overflow (#3145)
* Bugfix: The failure of ibv_post_send is caused by polling send CQE before
recv CQE
* Split send and recv comp channel
* Use wr_id to update _sq_window_size
* Send CQ and recv CQ share comp channel
* Add IMM window
* Deallocate polling cq
* Update RDMA documents
---
docs/cn/rdma.md | 48 ++--
docs/en/rdma.md | 43 ++--
src/brpc/rdma/rdma_endpoint.cpp | 525 ++++++++++++++++++++++++----------------
src/brpc/rdma/rdma_endpoint.h | 51 ++--
src/brpc/socket.cpp | 25 +-
test/brpc_rdma_unittest.cpp | 6 +-
6 files changed, 405 insertions(+), 293 deletions(-)
diff --git a/docs/cn/rdma.md b/docs/cn/rdma.md
index 29b0b6fb..e7754598 100644
--- a/docs/cn/rdma.md
+++ b/docs/cn/rdma.md
@@ -35,7 +35,7 @@ RDMA与TCP不同,不使用socket接口进行通信。但是在实现上仍然
brpc内部使用RDMA
RC模式,每个RdmaEndpoint对应一个QP。RDMA连接建立依赖于前置TCP建连,TCP建连后双方交换必要参数,如GID、QPN等,再发起RDMA连接并实现数据传输。这个过程我们称为握手(参见RdmaEndpoint)。因为握手需要TCP连接,因此RdmaEndpoint所在的Socket类中,原本的TCP
fd仍然有效。握手过程采用了brpc中已有的AppConnect逻辑。注意,握手用的TCP连接在后续数据传输阶段并不会收发数据,但仍保持为EST状态。一旦TCP连接中断,其上对应的RDMA连接同样会置错。
-RdmaEndpoint数据传输逻辑的第一个重要特性是零拷贝。要发送的所有数据默认都存放在IOBuf的Block中,因此所发送的Block需要等到对端确认接收完成后才可以释放,这些Block的引用被存放于RdmaEndpoint::_sbuf中。而要实现接收零拷贝,则需要确保接受端所预提交的接收缓冲区必须直接在IOBuf的Block里面,被存放于RdmaEndpoint::_rbuf。注意,接收端预提交的每一段Block,有一个固定的大小(recv_block_size)。发送端发送时,一个请求最多只能有这么大,否则接收端则无法成功接收。
+RdmaEndpoint数据传输逻辑的第一个重要特性是零拷贝。要发送的所有数据默认都存放在IOBuf的Block中,因此所发送的Block需要等到发送CQE触发后才可以释放,这些Block的引用被存放于RdmaEndpoint::_sbuf中。而要实现接收零拷贝,则需要确保接受端所预提交的接收缓冲区必须直接在IOBuf的Block里面,被存放于RdmaEndpoint::_rbuf。注意,接收端预提交的每一段Block,有一个固定的大小(recv_block_size)。发送端发送时,一个请求最多只能有这么大,否则接收端则无法成功接收。
RdmaEndpoint数据传输逻辑的第二个重要特性是滑动窗口流控。这一流控机制是为了避免发送端持续在发送,其速度超过了接收端处理的速度。TCP传输中也有类似的逻辑,但是是由内核协议栈来实现的。RdmaEndpoint内实现了这一流控机制,通过接收端显式回复ACK来确认接收端处理完毕。为了减少ACK本身的开销,让ACK以立即数形式返回,可以被附在数据消息里。
@@ -52,26 +52,26 @@ RDMA支持事件驱动和轮询两种模式,默认是事件驱动模式,通
# 参数
可配置参数说明:
-* rdma_trace_verbose: 日志中打印RDMA建连相关信息,默认false
-* rdma_recv_zerocopy: 是否启用接收零拷贝,默认true
-* rdma_zerocopy_min_size: 接收零拷贝最小的msg大小,默认512B
-* rdma_recv_block_type:
为接收数据预准备的block类型,分为三类default(8KB)/large(64KB)/huge(2MB),默认为default
-* rdma_prepared_qp_size: 程序启动预生成的QP的大小,默认128
-* rdma_prepared_qp_cnt: 程序启动预生成的QP的数量,默认1024
-* rdma_max_sge: 允许的最大发送SGList长度,默认为0,即采用硬件所支持的最大长度
-* rdma_sq_size: SQ大小,默认128
-* rdma_rq_size: RQ大小,默认128
-* rdma_cqe_poll_once: 从CQ中一次性poll出的CQE数量,默认32
-* rdma_gid_index: 使用本地GID表中的Index,默认为-1,即选用最大的可用GID Index
-* rdma_port: 使用IB设备的port number,默认为1
-* rdma_device: 使用IB设备的名称,默认为空,即使用第一个active的设备
-* rdma_memory_pool_initial_size_mb: 内存池的初始大小,单位MB,默认1024
-* rdma_memory_pool_increase_size_mb: 内存池每次动态增长的大小,单位MB,默认1024
-* rdma_memory_pool_max_regions: 最大的内存池块数,默认16
-* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4
-* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128
-* rdma_use_polling: 是否使用RDMA的轮询模式,默认false
-* rdma_poller_num: 轮询模式下的poller数目,默认1
-* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false
-* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false
-* rdma_disable_bthread: 禁用bthread,默认是false
+* rdma_trace_verbose: 日志中打印RDMA建连相关信息,默认false。
+* rdma_recv_zerocopy: 是否启用接收零拷贝,默认true。
+* rdma_zerocopy_min_size: 接收零拷贝最小的msg大小,默认512B。
+* rdma_recv_block_type:
为接收数据预准备的block类型,分为三类default(8KB)/large(64KB)/huge(2MB),默认为default。
+* rdma_prepared_qp_size: 程序启动预生成的QP的大小,默认128。
+* rdma_prepared_qp_cnt: 程序启动预生成的QP的数量,默认1024。
+* rdma_max_sge: 允许的最大发送SGList长度,默认为0,即采用硬件所支持的最大长度。
+* rdma_sq_size: SQ大小,默认128。
+* rdma_rq_size: RQ大小,默认128。
+* rdma_cqe_poll_once: 从CQ中一次性poll出的CQE数量,默认32。
+* rdma_gid_index: 使用本地GID表中的Index,默认为-1,即选用最大的可用GID Index。
+* rdma_port: 使用IB设备的port number,默认为1。
+* rdma_device: 使用IB设备的名称,默认为空,即使用第一个active的设备。
+* rdma_memory_pool_initial_size_mb: 内存池的初始大小,单位MB,默认1024。
+* rdma_memory_pool_increase_size_mb: 内存池每次动态增长的大小,单位MB,默认1024。
+* rdma_memory_pool_max_regions: 最大的内存池块数,默认3。
+* rdma_memory_pool_buckets: 内存池中为避免竞争采用的bucket数目,默认为4。
+* rdma_memory_pool_tls_cache_num: 内存池中thread local的缓存block数目,默认为128。
+* rdma_use_polling: 是否使用RDMA的轮询模式,默认false。
+* rdma_poller_num: 轮询模式下的poller数目,默认1。
+* rdma_poller_yield: 轮询模式下的poller是否主动放弃CPU,默认是false。
+* rdma_edisp_unsched: 让事件驱动器不可以被调度,默认是false。
+* rdma_disable_bthread: 禁用bthread,默认是false。
diff --git a/docs/en/rdma.md b/docs/en/rdma.md
index c0e88ce9..99f1ecd7 100644
--- a/docs/en/rdma.md
+++ b/docs/en/rdma.md
@@ -35,7 +35,7 @@ RDMA does not use socket API like TCP. However, the
brpc::Socket class is still
brpc uses RDMA RC mode. Every RdmaEndpoint has its own QP. Before establishing
RDMA connection, a TCP connection is necessary to exchange some information
such as GID and QPN. We call this procedure handshake. Since handshake needs
TCP connection, the TCP fd in the corresponding Socket is still valid. The
handshake procedure is completed in the AppConnect way in brpc. The TCP
connection will keep in EST state but not be used for data transmission after
RDMA connection is established. Onc [...]
-The first key feature in RdmaEndpoint data transmission is zero copy. All data
which need to transmit is in the Blocks of IOBuf. Thus all the Blocks need to
be released after the remote side completes the receiving. The reference of
these Blocks are stored in RdmaEndpoint::_sbuf. In order to realize receiving
zero copy, the receive side must post receive buffers in Blocks of IOBuf, which
are stored in RdmaEndpoint::_rbuf. Note that all the Blocks posted in the
receive side has a fixed si [...]
+The first key feature in RdmaEndpoint data transmission is zero copy. All data
which need to transmit is in the Blocks of IOBuf. Thus all the Blocks need to
be released after the sent CQEs are triggered. The reference of these Blocks
are stored in RdmaEndpoint::_sbuf. In order to realize receiving zero copy, the
receive side must post receive buffers in Blocks of IOBuf, which are stored in
RdmaEndpoint::_rbuf. Note that all the Blocks posted in the receive side has a
fixed size (recv_blo [...]
The second key feature in RdmaEndpoint data transmission is sliding window
flow control. The flow control is to avoid fast transmit side overwhelming slow
receive side. TCP has similar mechanism in kernel TCP stack. RdmaEndpoint
implements this mechanism with explicit ACKs from receive side. to reduce the
overhead of ACKs, the ACK number can be piggybacked in ordinary data message as
immediate data.
@@ -50,21 +50,26 @@ RDMA is hardware-related. It has some different concepts
such as device, port, G
# Parameters
Configurable parameters:
-* rdma_trace_verbose: to print RDMA connection information in log,default is
false
-* rdma_recv_zerocopy: enable zero copy in receive side,default is true
-* rdma_zerocopy_min_size: the min message size for receive zero copy (in
Byte),default is 512
-* rdma_recv_block_type: the block type used for receiving, can be
default(8KB)/large(64KB)/huge(2MB),default is default
-* rdma_prepared_qp_size: the size of QPs created at the beginning of the
application,default is 128
-* rdma_prepared_qp_cnt: the number of QPs created at the beginning of the
application,default is 1024
-* rdma_max_sge: the max length of sglist, default is 0, which is the max
length allowed by the device
-* rdma_sq_size: the size of SQ,default is 128
-* rdma_rq_size: the size of RQ,default is 128
-* rdma_cqe_poll_once: the number of CQE pooled from CQ once,default is 32
-* rdma_gid_index: the index of local GID table used,default is -1,which is the
maximum GID index
-* rdma_port: the port number used,default is 1
-* rdma_device: the IB device name,default is empty,which is the first active
device
-* rdma_memory_pool_initial_size_mb: the initial region size of RDMA memory
pool (in MB),default is 1024
-* rdma_memory_pool_increase_size_mb: the step increase region size of RDMA
memory pool (in MB),default is 1024
-* rdma_memory_pool_max_regions: the max number of regions in RDMA memory
pool,default is 16
-* rdma_memory_pool_buckets: the number of buckets for avoiding mutex
contention in RDMA memory pool,default is 4
-* rdma_memory_pool_tls_cache_num: the number of thread local cached blocks in
RDMA memory pool,default is 128
+* rdma_trace_verbose: to print RDMA connection information in log,default is
false.
+* rdma_recv_zerocopy: enable zero copy in receive side,default is true.
+* rdma_zerocopy_min_size: the min message size for receive zero copy (in
Byte),default is 512.
+* rdma_recv_block_type: the block type used for receiving, can be
default(8KB)/large(64KB)/huge(2MB),default is default.
+* rdma_prepared_qp_size: the size of QPs created at the beginning of the
application,default is 128.
+* rdma_prepared_qp_cnt: the number of QPs created at the beginning of the
application,default is 1024.
+* rdma_max_sge: the max length of sglist, default is 0, which is the max
length allowed by the device.
+* rdma_sq_size: the size of SQ,default is 128.
+* rdma_rq_size: the size of RQ,default is 128.
+* rdma_cqe_poll_once: the number of CQE pooled from CQ once,default is 32.
+* rdma_gid_index: the index of local GID table used,default is -1,which is the
maximum GID index.
+* rdma_port: the port number used,default is 1.
+* rdma_device: the IB device name,default is empty,which is the first active
device.
+* rdma_memory_pool_initial_size_mb: the initial region size of RDMA memory
pool (in MB),default is 1024.
+* rdma_memory_pool_increase_size_mb: the step increase region size of RDMA
memory pool (in MB),default is 1024.
+* rdma_memory_pool_max_regions: the max number of regions in RDMA memory
pool,default is 3.
+* rdma_memory_pool_buckets: the number of buckets for avoiding mutex
contention in RDMA memory pool,default is 4.
+* rdma_memory_pool_tls_cache_num: the number of thread local cached blocks in
RDMA memory pool,default is 128.
+* rdma_use_polling: Whether to use RDMA polling mode, default is false.
+* rdma_poller_num: The number of pollers in polling mode, default is 1.
+* rdma_poller_yield: Whether pollers in polling mode voluntarily relinquish
the CPU, default is false.
+* rdma_edisp_unsched`: Prevents the event driver from being scheduled, default
is false.
+* rdma_disable_bthread: Disables bthread, default is false.
diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 51767565..616ef332 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -147,24 +147,21 @@ void HelloMessage::Deserialize(void* data) {
qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16));
}
-RdmaResource::RdmaResource()
- : qp(NULL)
- , cq(NULL)
- , comp_channel(NULL)
- , next(NULL) { }
-
RdmaResource::~RdmaResource() {
- if (qp) {
+ if (NULL != qp) {
IbvDestroyQp(qp);
- qp = NULL;
}
- if (cq) {
- IbvDestroyCq(cq);
- cq = NULL;
+ if (NULL != polling_cq) {
+ IbvDestroyCq(polling_cq);
+ }
+ if (NULL != send_cq) {
+ IbvDestroyCq(send_cq);
}
- if (comp_channel) {
+ if (NULL != recv_cq) {
+ IbvDestroyCq(recv_cq);
+ }
+ if (NULL != comp_channel) {
IbvDestroyCompChannel(comp_channel);
- comp_channel = NULL;
}
}
@@ -172,13 +169,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
: _socket(s)
, _state(UNINIT)
, _resource(NULL)
- , _cq_events(0)
+ , _send_cq_events(0)
+ , _recv_cq_events(0)
, _cq_sid(INVALID_SOCKET_ID)
, _sq_size(FLAGS_rdma_sq_size)
, _rq_size(FLAGS_rdma_rq_size)
- , _sbuf()
- , _rbuf()
- , _rbuf_data()
, _remote_recv_block_size(0)
, _accumulated_ack(0)
, _unsolicited(0)
@@ -189,7 +184,9 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
, _rq_received(0)
, _local_window_capacity(0)
, _remote_window_capacity(0)
- , _window_size(0)
+ , _sq_imm_window_size(0)
+ , _remote_rq_window_size(0)
+ , _sq_window_size(0)
, _new_rq_wrs(0)
{
if (_sq_size < MIN_QP_SIZE) {
@@ -215,22 +212,28 @@ RdmaEndpoint::~RdmaEndpoint() {
void RdmaEndpoint::Reset() {
DeallocateResources();
- _cq_events = 0;
- _cq_sid = INVALID_SOCKET_ID;
_state = UNINIT;
+ _resource = NULL;
+ _send_cq_events = 0;
+ _recv_cq_events = 0;
+ _cq_sid = INVALID_SOCKET_ID;
_sbuf.clear();
_rbuf.clear();
_rbuf_data.clear();
+ _remote_recv_block_size = 0;
_accumulated_ack = 0;
_unsolicited = 0;
+ _unsolicited_bytes = 0;
_sq_current = 0;
_sq_unsignaled = 0;
- _local_window_capacity = 0;
- _remote_window_capacity = 0;
- _window_size.store(0, butil::memory_order_relaxed);
- _new_rq_wrs = 0;
_sq_sent = 0;
_rq_received = 0;
+ _local_window_capacity = 0;
+ _remote_window_capacity = 0;
+ _sq_imm_window_size = 0;
+ _remote_rq_window_size.store(0, butil::memory_order_relaxed);
+ _sq_window_size.store(0, butil::memory_order_relaxed);
+ _new_rq_wrs.store(0, butil::memory_order_relaxed);
}
void RdmaConnect::StartConnect(const Socket* socket,
@@ -255,6 +258,7 @@ void RdmaConnect::StartConnect(const Socket* socket,
if (bthread_start_background(&tid, &attr,
RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0)
{
LOG(FATAL) << "Fail to start handshake bthread";
+ Run();
} else {
s.release();
}
@@ -516,8 +520,12 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
ep->_local_window_capacity =
std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM;
ep->_remote_window_capacity =
- std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
- ep->_window_size.store(ep->_local_window_capacity,
butil::memory_order_relaxed);
+ std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM;
+ ep->_sq_imm_window_size = RESERVED_WR_NUM;
+ ep->_remote_rq_window_size.store(
+ ep->_local_window_capacity, butil::memory_order_relaxed);
+ ep->_sq_window_size.store(
+ ep->_local_window_capacity, butil::memory_order_relaxed);
ep->_state = C_BRINGUP_QP;
if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) <
0) {
@@ -548,11 +556,11 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
if (s->_rdma_state == Socket::RDMA_ON) {
ep->_state = ESTABLISHED;
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
- << "Handshake ends (use rdma) on " << s->description();
+ << "Client handshake ends (use rdma) on " << s->description();
} else {
ep->_state = FALLBACK_TCP;
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
- << "Handshake ends (use tcp) on " << s->description();
+ << "Client handshake ends (use tcp) on " << s->description();
}
errno = 0;
@@ -624,8 +632,12 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
ep->_local_window_capacity =
std::min(ep->_sq_size, remote_msg.rq_size) - RESERVED_WR_NUM;
ep->_remote_window_capacity =
- std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
- ep->_window_size.store(ep->_local_window_capacity,
butil::memory_order_relaxed);
+ std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM;
+ ep->_sq_imm_window_size = RESERVED_WR_NUM;
+ ep->_remote_rq_window_size.store(
+ ep->_local_window_capacity, butil::memory_order_relaxed);
+ ep->_sq_window_size.store(
+ ep->_local_window_capacity, butil::memory_order_relaxed);
ep->_state = S_ALLOC_QPCQ;
if (ep->AllocateResources() < 0) {
@@ -701,13 +713,13 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
s->_rdma_state = Socket::RDMA_ON;
ep->_state = ESTABLISHED;
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
- << "Handshake ends (use rdma) on " << s->description();
+ << "Server handshake ends (use rdma) on " << s->description();
}
} else {
s->_rdma_state = Socket::RDMA_OFF;
ep->_state = FALLBACK_TCP;
LOG_IF(INFO, FLAGS_rdma_trace_verbose)
- << "Handshake ends (use tcp) on " << s->description();
+ << "Server handshake ends (use tcp) on " << s->description();
}
ep->TryReadOnTcp();
@@ -720,7 +732,8 @@ bool RdmaEndpoint::IsWritable() const {
return false;
}
- return _window_size.load(butil::memory_order_relaxed) > 0;
+ return _remote_rq_window_size.load(butil::memory_order_relaxed) > 0 &&
+ _sq_window_size.load(butil::memory_order_relaxed) > 0;
}
// RdmaIOBuf inherits from IOBuf to provide a new function.
@@ -790,13 +803,16 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf**
from, size_t ndata) {
size_t total_len = 0;
size_t current = 0;
- uint32_t window = 0;
+ uint32_t remote_rq_window_size =
+ _remote_rq_window_size.load(butil::memory_order_relaxed);
+ uint32_t sq_window_size =
+ _sq_window_size.load(butil::memory_order_relaxed);
ibv_send_wr wr;
int max_sge = GetRdmaMaxSge();
ibv_sge sglist[max_sge];
while (current < ndata) {
- window = _window_size.load(butil::memory_order_relaxed);
- if (window == 0) {
+ if (remote_rq_window_size == 0 || sq_window_size == 0) {
+ // There is no space left in SQ or remote RQ.
if (total_len > 0) {
break;
} else {
@@ -815,7 +831,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from,
size_t ndata) {
size_t sge_index = 0;
while (sge_index < (uint32_t)max_sge &&
this_len < _remote_recv_block_size) {
- if (data->size() == 0) {
+ if (data->empty()) {
// The current IOBuf is empty, find next one
++current;
if (current == ndata) {
@@ -826,8 +842,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from,
size_t ndata) {
}
ssize_t len = data->cut_into_sglist_and_iobuf(
- sglist, &sge_index, to, max_sge,
- _remote_recv_block_size - this_len);
+ sglist, &sge_index, to, max_sge, _remote_recv_block_size -
this_len);
if (len < 0) {
return -1;
}
@@ -845,7 +860,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from,
size_t ndata) {
wr.imm_data = butil::HostToNet32(imm);
// Avoid too much recv completion event to reduce the cpu overhead
bool solicited = false;
- if (window == 1 || current + 1 >= ndata) {
+ if (remote_rq_window_size == 1 || sq_window_size == 1 || current + 1
>= ndata) {
// Only last message in the write queue or last message in the
// current window will be flagged as solicited.
solicited = true;
@@ -878,6 +893,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from,
size_t ndata) {
// Refer to:
//
http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
wr.send_flags |= IBV_SEND_SIGNALED;
+ wr.wr_id = _sq_unsignaled;
_sq_unsignaled = 0;
}
@@ -886,9 +902,9 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from,
size_t ndata) {
if (err != 0) {
// We use other way to guarantee the Send Queue is not full.
// So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
- << ", window=" << window
- << ", sq_current=" << _sq_current;
+ std::ostringstream oss;
+ DebugInfo(oss, ", ");
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << " " <<
oss.str();
errno = err;
return -1;
}
@@ -898,18 +914,22 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf**
from, size_t ndata) {
_sq_current = 0;
}
- // Update _window_size. Note that _window_size will never be negative.
+ // Update `_remote_rq_window_size' and `_sq_window_size'. Note that
+ // `_remote_rq_window_size' and `_sq_window_size' will never be
negative.
// Because there is at most one thread can enter this function for each
- // Socket, and the other thread of HandleCompletion can only add this
- // counter.
- _window_size.fetch_sub(1, butil::memory_order_relaxed);
+ // Socket, and the other thread of HandleCompletion can only add these
+ // counters.
+ remote_rq_window_size =
+ _remote_rq_window_size.fetch_sub(1, butil::memory_order_relaxed) -
1;
+ sq_window_size = _sq_window_size.fetch_sub(1,
butil::memory_order_relaxed) - 1;
}
return total_len;
}
int RdmaEndpoint::SendAck(int num) {
- if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) >
_remote_window_capacity / 2) {
+ if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) >
_remote_window_capacity / 2 &&
+ _sq_imm_window_size > 0) {
return SendImm(_new_rq_wrs.exchange(0, butil::memory_order_relaxed));
}
return 0;
@@ -924,17 +944,24 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
memset(&wr, 0, sizeof(wr));
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.imm_data = butil::HostToNet32(imm);
- wr.send_flags |= IBV_SEND_SOLICITED;
- wr.send_flags |= IBV_SEND_SIGNALED;
+ wr.send_flags |= IBV_SEND_SOLICITED | IBV_SEND_SIGNALED;
+ wr.wr_id = 0;
ibv_send_wr* bad = NULL;
int err = ibv_post_send(_resource->qp, &wr, &bad);
if (err != 0) {
+ std::ostringstream oss;
+ DebugInfo(oss, ", ");
// We use other way to guarantee the Send Queue is not full.
// So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << " " <<
oss.str();
return -1;
}
+
+ // `_sq_imm_window_size' will never be negative.
+ // Because IMM can only be sent if
+ // `_sq_imm_window_size` is greater than 0.
+ _sq_imm_window_size -= 1;
return 0;
}
@@ -942,8 +969,30 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
bool zerocopy = FLAGS_rdma_recv_zerocopy;
switch (wc.opcode) {
case IBV_WC_SEND: { // send completion
- // Do nothing
- break;
+ if (0 == wc.wr_id) {
+ _sq_imm_window_size += 1;
+ // If there are any unacknowledged recvs, send an ack.
+ SendAck(0);
+ return 0;
+ }
+ // Update SQ window.
+ uint16_t wnd_to_update = wc.wr_id;
+ for (uint16_t i = 0; i < wnd_to_update; ++i) {
+ _sbuf[_sq_sent++].clear();
+ if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+ _sq_sent = 0;
+ }
+ }
+ butil::subtle::MemoryBarrier();
+
+ _sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed);
+ if (_remote_rq_window_size.load(butil::memory_order_relaxed) >=
+ _local_window_capacity / 8) {
+ // Do not wake up writing thread right after polling IBV_WC_SEND.
+ // Otherwise the writing thread may switch to background too
quickly.
+ _socket->WakeAsEpollOut();
+ }
+ return 0;
}
case IBV_WC_RECV: { // recv completion
// Please note that only the first wc.byte_len bytes is valid
@@ -953,32 +1002,21 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
CHECK(_state != FALLBACK_TCP);
if (zerocopy) {
- butil::IOBuf tmp;
- _rbuf[_rq_received].cutn(&tmp, wc.byte_len);
- _socket->_read_buf.append(tmp);
+ _rbuf[_rq_received].cutn(&_socket->_read_buf, wc.byte_len);
} else {
// Copy data when the receive data is really small
_socket->_read_buf.append(_rbuf_data[_rq_received],
wc.byte_len);
}
}
- if (wc.imm_data > 0) {
- // Clear sbuf here because we ignore event wakeup for send
completions
- uint32_t acks = butil::NetToHost32(wc.imm_data);
- uint32_t num = acks;
- while (num > 0) {
- _sbuf[_sq_sent++].clear();
- if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
- _sq_sent = 0;
- }
- --num;
- }
- butil::subtle::MemoryBarrier();
-
+ if (0 != (wc.wc_flags & IBV_WC_WITH_IMM) && wc.imm_data > 0) {
// Update window
+ uint32_t acks = butil::NetToHost32(wc.imm_data);
uint32_t wnd_thresh = _local_window_capacity / 8;
- if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >=
wnd_thresh
- || acks >= wnd_thresh) {
- // Do not wake up writing thread right after _window_size > 0.
+ uint32_t remote_rq_window_size =
+ _remote_rq_window_size.fetch_add(acks,
butil::memory_order_relaxed);
+ if (_sq_window_size.load(butil::memory_order_relaxed) > 0 &&
+ (remote_rq_window_size >= wnd_thresh || acks >= wnd_thresh)) {
+ // Do not wake up writing thread right after
_remote_rq_window_size > 0.
// Otherwise the writing thread may switch to background too
quickly.
_socket->WakeAsEpollOut();
}
@@ -1050,71 +1088,73 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy)
{
return 0;
}
-static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
- RdmaResource* res = new (std::nothrow) RdmaResource;
- if (!res) {
- return NULL;
- }
+static ibv_qp* AllocateQp(ibv_cq* send_cq, ibv_cq* recv_cq, uint32_t sq_size,
uint32_t rq_size) {
+ ibv_qp_init_attr attr;
+ memset(&attr, 0, sizeof(attr));
+ attr.send_cq = send_cq;
+ attr.recv_cq = recv_cq;
+ attr.cap.max_send_wr = sq_size;
+ attr.cap.max_recv_wr = rq_size;
+ attr.cap.max_send_sge = GetRdmaMaxSge();
+ attr.cap.max_recv_sge = 1;
+ attr.qp_type = IBV_QPT_RC;
+ return IbvCreateQp(GetRdmaPd(), &attr);
+}
+static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
+ std::unique_ptr<RdmaResource> resource(new RdmaResource);
if (!FLAGS_rdma_use_polling) {
- res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
- if (!res->comp_channel) {
+ resource->comp_channel = IbvCreateCompChannel(GetRdmaContext());
+ if (NULL == resource->comp_channel) {
PLOG(WARNING) << "Fail to create comp channel for CQ";
- delete res;
return NULL;
}
- butil::make_close_on_exec(res->comp_channel->fd);
- if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
+ if (butil::make_close_on_exec(resource->comp_channel->fd) < 0) {
+ PLOG(WARNING) << "Fail to set comp channel close-on-exec";
+ return NULL;
+ }
+ if (butil::make_non_blocking(resource->comp_channel->fd) < 0) {
PLOG(WARNING) << "Fail to set comp channel nonblocking";
- delete res;
return NULL;
}
- res->cq = IbvCreateCq(GetRdmaContext(), 2 *
FLAGS_rdma_prepared_qp_size,
- NULL, res->comp_channel, GetRdmaCompVector());
- if (!res->cq) {
- PLOG(WARNING) << "Fail to create CQ";
- delete res;
+ resource->send_cq = IbvCreateCq(GetRdmaContext(),
FLAGS_rdma_prepared_qp_size,
+ NULL, resource->comp_channel,
GetRdmaCompVector());
+ if (NULL == resource->send_cq) {
+ PLOG(WARNING) << "Fail to create send CQ";
return NULL;
}
- } else {
- res->cq = IbvCreateCq(GetRdmaContext(), 2 *
FLAGS_rdma_prepared_qp_size,
- NULL, NULL, 0);
- if (!res->cq) {
- PLOG(WARNING) << "Fail to create CQ";
- delete res;
+
+ resource->recv_cq = IbvCreateCq(GetRdmaContext(),
FLAGS_rdma_prepared_qp_size,
+ NULL, resource->comp_channel,
GetRdmaCompVector());
+ if (NULL == resource->recv_cq) {
+ PLOG(WARNING) << "Fail to create recv CQ";
return NULL;
}
- }
- ibv_qp_init_attr attr;
- memset(&attr, 0, sizeof(attr));
- attr.send_cq = res->cq;
- attr.recv_cq = res->cq;
- // NOTE: Since we hope to reduce send completion events, we set signaled
- // send_wr every 1/4 of the total wnd. The wnd will increase when the ack
- // is received, which means the receive side has already received the data
- // in the corresponding send_wr. However, the ack does not mean the send_wr
- // has been removed from SQ if it is set unsignaled. The reason is that
- // the unsignaled send_wr is removed from SQ only after the CQE of next
- // signaled send_wr is polled. Thus in a rare case, a new send_wr cannot be
- // posted to SQ even in the wnd is not empty. In order to solve this
- // problem, we enlarge the size of SQ to contain redundant 1/4 of the wnd,
- // which is the maximum number of unsignaled send_wrs.
- attr.cap.max_send_wr = sq_size * 5 / 4; /*NOTE*/
- attr.cap.max_recv_wr = rq_size;
- attr.cap.max_send_sge = GetRdmaMaxSge();
- attr.cap.max_recv_sge = 1;
- attr.qp_type = IBV_QPT_RC;
- res->qp = IbvCreateQp(GetRdmaPd(), &attr);
- if (!res->qp) {
- PLOG(WARNING) << "Fail to create QP";
- delete res;
- return NULL;
+ resource->qp = AllocateQp(resource->send_cq, resource->recv_cq,
sq_size, rq_size);
+ if (NULL == resource->qp) {
+ PLOG(WARNING) << "Fail to create QP";
+ return NULL;
+ }
+ } else {
+ resource->polling_cq =
+ IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size,
NULL, NULL, 0);
+ if (NULL == resource->polling_cq) {
+ PLOG(WARNING) << "Fail to create polling CQ";
+ return NULL;
+ }
+ resource->qp = AllocateQp(resource->polling_cq,
+ resource->polling_cq,
+ sq_size, rq_size);
+ if (NULL == resource->qp) {
+ PLOG(WARNING) << "Fail to create QP";
+ return NULL;
+ }
}
- return res;
+ return resource.release();
}
int RdmaEndpoint::AllocateResources() {
@@ -1143,6 +1183,13 @@ int RdmaEndpoint::AllocateResources() {
}
if (!FLAGS_rdma_use_polling) {
+ if (0 != ReqNotifyCq(true)) {
+ return -1;
+ }
+ if (0 != ReqNotifyCq(false)) {
+ return -1;
+ }
+
SocketOptions options;
options.user = this;
options.keytable_pool = _socket->_keytable_pool;
@@ -1150,13 +1197,6 @@ int RdmaEndpoint::AllocateResources() {
options.on_edge_triggered_events = PollCq;
if (Socket::Create(options, &_cq_sid) < 0) {
PLOG(WARNING) << "Fail to create socket for cq";
- return -1;
- }
-
- int err = ibv_req_notify_cq(_resource->cq, 1);
- if (err != 0) {
- LOG(WARNING) << "Fail to arm CQ comp channel: " << berror(err);
- return -1;
}
} else {
SocketOptions options;
@@ -1263,6 +1303,15 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid,
uint32_t qp_num) {
return 0;
}
+static void DeallocateCq(ibv_cq* cq) {
+ if (NULL == cq) {
+ return;
+ }
+
+ int err = IbvDestroyCq(cq);
+ LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err);
+}
+
void RdmaEndpoint::DeallocateResources() {
if (!_resource) {
return;
@@ -1280,88 +1329,119 @@ void RdmaEndpoint::DeallocateResources() {
move_to_rdma_resource_list = true;
}
}
- int fd = -1;
- if (_resource->comp_channel) {
- fd = _resource->comp_channel->fd;
+
+ if (NULL != _resource->send_cq) {
+ IbvAckCqEvents(_resource->send_cq, _send_cq_events);
+ }
+ if (NULL != _resource->recv_cq) {
+ IbvAckCqEvents(_resource->recv_cq, _recv_cq_events);
}
- int err;
+
+ bool remove_consumer = true;
if (!move_to_rdma_resource_list) {
- if (_resource->qp) {
- err = IbvDestroyQp(_resource->qp);
- if (err != 0) {
- LOG(WARNING) << "Fail to destroy QP: " << berror(err);
- }
+ if (NULL != _resource->qp) {
+ int err = IbvDestroyQp(_resource->qp);
+ LOG_IF(WARNING, 0 != err) << "Fail to destroy QP: " << berror(err);
_resource->qp = NULL;
}
- if (_resource->cq) {
- IbvAckCqEvents(_resource->cq, _cq_events);
- err = IbvDestroyCq(_resource->cq);
- if (err != 0) {
- PLOG(WARNING) << "Fail to destroy CQ: " << berror(err);
- }
- _resource->cq = NULL;
- }
- if (_resource->comp_channel) {
- // destroy comp_channel will destroy this fd
+
+ DeallocateCq(_resource->polling_cq);
+ DeallocateCq(_resource->send_cq);
+ DeallocateCq(_resource->recv_cq);
+
+ if (NULL != _resource->comp_channel) {
+ // Destroy send_comp_channel will destroy this fd,
// so that we should remove it from epoll fd first
- _socket->_io_event.RemoveConsumer(fd);
- fd = -1;
- err = IbvDestroyCompChannel(_resource->comp_channel);
- if (err != 0) {
- LOG(WARNING) << "Fail to destroy CQ channel: " << berror(err);
- }
- _resource->comp_channel = NULL;
+ int fd = _resource->comp_channel->fd;
+ GetGlobalEventDispatcher(fd,
_socket->_io_event.bthread_tag()).RemoveConsumer(fd);
+ remove_consumer = false;
+ int err = IbvDestroyCompChannel(_resource->comp_channel);
+ LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ channel: " <<
berror(err);
+
}
+
+ _resource->polling_cq = NULL;
+ _resource->send_cq = NULL;
+ _resource->recv_cq = NULL;
+ _resource->comp_channel = NULL;
delete _resource;
_resource = NULL;
}
- SocketUniquePtr s;
- if (_cq_sid != INVALID_SOCKET_ID) {
+ if (INVALID_SOCKET_ID != _cq_sid) {
+ SocketUniquePtr s;
if (Socket::Address(_cq_sid, &s) == 0) {
- s->_user = NULL; // do not release user (this RdmaEndpoint)
- if (fd >= 0) {
- _socket->_io_event.RemoveConsumer(fd);
+ if (remove_consumer) {
+ s->_io_event.RemoveConsumer(s->_fd);
}
- s->_fd = -1; // already remove fd from epoll fd
+ s->_user = NULL; // Do not release user (this RdmaEndpoint).
+ s->_fd = -1; // Already remove fd from epoll fd.
s->SetFailed();
}
- _cq_sid = INVALID_SOCKET_ID;
}
if (move_to_rdma_resource_list) {
- if (_resource->cq) {
- IbvAckCqEvents(_resource->cq, _cq_events);
- }
BAIDU_SCOPED_LOCK(*g_rdma_resource_mutex);
_resource->next = g_rdma_resource_list;
g_rdma_resource_list = _resource;
}
-
- _resource = NULL;
}
static const int MAX_CQ_EVENTS = 128;
-int RdmaEndpoint::GetAndAckEvents() {
- int events = 0; void* context = NULL;
- while (1) {
- if (IbvGetCqEvent(_resource->comp_channel, &_resource->cq, &context)
!= 0) {
+int RdmaEndpoint::GetAndAckEvents(SocketUniquePtr& s) {
+ void* context = NULL;
+ ibv_cq* cq = NULL;
+ while (true) {
+ if (IbvGetCqEvent(_resource->comp_channel, &cq, &context) != 0) {
if (errno != EAGAIN) {
+ const int saved_errno = errno;
+ PLOG(ERROR) << "Fail to get cq event from " <<
s->description();
+ s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
+ s->description().c_str(), berror(saved_errno));
return -1;
}
break;
}
- ++events;
+ if (cq == _resource->send_cq) {
+ ++_send_cq_events;
+ } else if (cq == _resource->recv_cq) {
+ ++_recv_cq_events;
+ } else {
+ // Unexpected CQ event that does not belong to
+ // this endpoint's send/recv CQs.
+ LOG(WARNING) << "Unexpected CQ event from cq=" << cq
+ << " of " << s->description();
+ // Acknowledge this single event immediately
+ // to avoid leaking unacknowledged events.
+ IbvAckCqEvents(cq, 1);
+ }
}
- if (events == 0) {
- return 0;
+ if (_send_cq_events >= MAX_CQ_EVENTS) {
+ IbvAckCqEvents(_resource->send_cq, _send_cq_events);
+ _send_cq_events = 0;
+ }
+ if (_recv_cq_events >= MAX_CQ_EVENTS) {
+ IbvAckCqEvents(_resource->recv_cq, _recv_cq_events);
+ _recv_cq_events = 0;
}
- _cq_events += events;
- if (_cq_events >= MAX_CQ_EVENTS) {
- IbvAckCqEvents(_resource->cq, _cq_events);
- _cq_events = 0;
+ return 0;
+}
+
+int RdmaEndpoint::ReqNotifyCq(bool send_cq) {
+ errno = ibv_req_notify_cq(
+ send_cq ? _resource->send_cq : _resource->recv_cq,
+ send_cq ? 0 : 1);
+ if (0 != errno) {
+ const int saved_errno = errno;
+ PLOG(WARNING) << "Fail to arm " << (send_cq ? "send" : "recv")
+ << " CQ comp channel from " << _socket->description();
+ _socket->SetFailed(saved_errno, "Fail to arm %s CQ channel from %s:
%s",
+ send_cq ? "send" : "recv",
_socket->description().c_str(),
+ berror(saved_errno));
+ return -1;
}
+
return 0;
}
@@ -1377,14 +1457,17 @@ void RdmaEndpoint::PollCq(Socket* m) {
}
CHECK(ep == s->_rdma_ep);
+ bool send = false;
+ ibv_cq* cq = ep->_resource->recv_cq;
+
if (!FLAGS_rdma_use_polling) {
- if (ep->GetAndAckEvents() < 0) {
- const int saved_errno = errno;
- PLOG(ERROR) << "Fail to get cq event: " << s->description();
- s->SetFailed(saved_errno, "Fail to get cq event from %s: %s",
- s->description().c_str(), berror(saved_errno));
+ if (ep->GetAndAckEvents(s) < 0) {
return;
}
+ } else {
+ // Polling is considered as non-send, so no need to change `send'.
+ // Only need to poll polling_cq.
+ cq = ep->_resource->polling_cq;
}
int progress = Socket::PROGRESS_INIT;
@@ -1392,7 +1475,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
InputMessenger::InputMessageClosure last_msg;
ibv_wc wc[FLAGS_rdma_cqe_poll_once];
while (true) {
- int cnt = ibv_poll_cq(ep->_resource->cq, FLAGS_rdma_cqe_poll_once, wc);
+ int cnt = ibv_poll_cq(cq, FLAGS_rdma_cqe_poll_once, wc);
if (cnt < 0) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to poll cq: " << s->description();
@@ -1404,18 +1487,24 @@ void RdmaEndpoint::PollCq(Socket* m) {
if (FLAGS_rdma_use_polling) {
return;
}
+
+ if (!send) {
+ // It's send cq's turn.
+ send = true;
+ cq = ep->_resource->send_cq;
+ continue;
+ }
+ // `recv_cq' and `send_cq' have been polled.
if (!notified) {
// Since RDMA only provides one shot event, we have to call the
// notify function every time. Because there is a possibility
// that the event arrives after the poll but before the notify,
// we should re-poll the CQ once after the notify to check if
// there is an available CQE.
- errno = ibv_req_notify_cq(ep->_resource->cq, 1);
- if (errno != 0) {
- const int saved_errno = errno;
- PLOG(WARNING) << "Fail to arm CQ comp channel: " <<
s->description();
- s->SetFailed(saved_errno, "Fail to arm cq channel from %s:
%s",
- s->description().c_str(), berror(saved_errno));
+ if (0 != ep->ReqNotifyCq(true)) {
+ return;
+ }
+ if (0 != ep->ReqNotifyCq(false)) {
return;
}
notified = true;
@@ -1424,11 +1513,14 @@ void RdmaEndpoint::PollCq(Socket* m) {
if (!m->MoreReadEvents(&progress)) {
break;
}
- if (ep->GetAndAckEvents() < 0) {
- s->SetFailed(errno, "Fail to ack CQ event on %s",
- s->description().c_str());
+
+ if (0 != ep->GetAndAckEvents(s)) {
return;
}
+
+ // Restart polling from `recv_cq'.
+ send = false;
+ cq = ep->_resource->recv_cq;
notified = false;
continue;
}
@@ -1437,7 +1529,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
ssize_t bytes = 0;
for (int i = 0; i < cnt; ++i) {
if (s->Failed()) {
- continue;
+ return;
}
if (wc[i].status != IBV_WC_SUCCESS) {
@@ -1453,11 +1545,15 @@ void RdmaEndpoint::PollCq(Socket* m) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to handle RDMA completion: " <<
s->description();
s->SetFailed(saved_errno, "Fail to handle rdma completion from
%s: %s",
- s->description().c_str(), berror(saved_errno));
+ s->description().c_str(), berror(saved_errno));
} else if (nr > 0) {
bytes += nr;
}
}
+ // Send CQE has no messages to process.
+ if (send) {
+ continue;
+ }
// Just call PrcessNewMessage once for all of these CQEs.
// Otherwise it may call too many bthread_flush to affect performance.
@@ -1491,20 +1587,21 @@ std::string RdmaEndpoint::GetStateStr() const {
}
}
-void RdmaEndpoint::DebugInfo(std::ostream& os) const {
- os << "\nrdma_state=ON"
- << "\nhandshake_state=" << GetStateStr()
- << "\nrdma_window_size=" <<
_window_size.load(butil::memory_order_relaxed)
- << "\nrdma_local_window_capacity=" << _local_window_capacity
- << "\nrdma_remote_window_capacity=" << _remote_window_capacity
- << "\nrdma_sbuf_head=" << _sq_current
- << "\nrdma_sbuf_tail=" << _sq_sent
- << "\nrdma_rbuf_head=" << _rq_received
- << "\nrdma_unacked_rq_wr=" << _new_rq_wrs
- << "\nrdma_received_ack=" << _accumulated_ack
- << "\nrdma_unsolicited_sent=" << _unsolicited
- << "\nrdma_unsignaled_sq_wr=" << _sq_unsignaled
- << "\n";
+void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector)
const {
+ os << "rdma_state=ON"
+ << connector << "handshake_state=" << GetStateStr()
+ << connector << "rdma_sq_imm_window_size=" << _sq_imm_window_size
+ << connector << "rdma_remote_rq_window_size=" <<
_remote_rq_window_size.load(butil::memory_order_relaxed)
+ << connector << "rdma_sq_window_size=" <<
_sq_window_size.load(butil::memory_order_relaxed)
+ << connector << "rdma_local_window_capacity=" << _local_window_capacity
+ << connector << "rdma_remote_window_capacity=" <<
_remote_window_capacity
+ << connector << "rdma_sbuf_head=" << _sq_current
+ << connector << "rdma_sbuf_tail=" << _sq_sent
+ << connector << "rdma_rbuf_head=" << _rq_received
+ << connector << "rdma_unacked_rq_wr=" <<
_new_rq_wrs.load(butil::memory_order_relaxed)
+ << connector << "rdma_received_ack=" << _accumulated_ack
+ << connector << "rdma_unsolicited_sent=" << _unsolicited
+ << connector << "rdma_unsignaled_sq_wr=" << _sq_unsignaled;
}
int RdmaEndpoint::GlobalInitialize() {
@@ -1515,6 +1612,8 @@ int RdmaEndpoint::GlobalInitialize() {
} else if (FLAGS_rdma_recv_block_type == "huge") {
g_rdma_recv_block_size = GetBlockSize(2) - IOBUF_BLOCK_HEADER_LEN;
} else {
+ LOG(ERROR) << "rdma_recv_block_type incorrect "
+ << "(valid value: default/large/huge)";
errno = EINVAL;
return -1;
}
@@ -1558,9 +1657,9 @@ void RdmaEndpoint::GlobalRelease() {
std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag,
- std::function<void(void)> callback,
- std::function<void(void)> init_fn,
- std::function<void(void)> release_fn) {
+ std::function<void()> callback,
+ std::function<void()> init_fn,
+ std::function<void()> release_fn) {
if (!FLAGS_rdma_use_polling) {
return 0;
}
@@ -1642,7 +1741,7 @@ void RdmaEndpoint::PollingModeRelease(bthread_tag_t tag) {
auto& running = group.running;
running.store(false, std::memory_order_relaxed);
for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
- bthread_join(pollers[i].tid, nullptr);
+ bthread_join(pollers[i].tid, NULL);
}
}
@@ -1651,7 +1750,7 @@ void RdmaEndpoint::PollerAddCqSid() {
auto& group = _poller_groups[bthread_self_tag()];
auto& pollers = group.pollers;
auto& poller = pollers[index];
- if (_cq_sid != INVALID_SOCKET_ID) {
+ if (INVALID_SOCKET_ID != _cq_sid) {
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::ADD});
}
}
@@ -1661,7 +1760,7 @@ void RdmaEndpoint::PollerRemoveCqSid() {
auto& group = _poller_groups[bthread_self_tag()];
auto& pollers = group.pollers;
auto& poller = pollers[index];
- if (_cq_sid != INVALID_SOCKET_ID) {
+ if (INVALID_SOCKET_ID != _cq_sid) {
poller.op_queue.Enqueue(CqSidOp{_cq_sid, CqSidOp::REMOVE});
}
}
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..eb4714ef 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -54,26 +54,30 @@ public:
private:
void Run();
- void (*_done)(int, void*);
- void* _data;
+ void (*_done)(int, void*){NULL};
+ void* _data{NULL};
};
struct RdmaResource {
- ibv_qp* qp;
- ibv_cq* cq;
- ibv_comp_channel* comp_channel;
- RdmaResource* next;
- RdmaResource();
+ RdmaResource* next{NULL};
+ ibv_qp* qp{NULL};
+ // For polling mode.
+ ibv_cq* polling_cq{NULL};
+ // For event mode.
+ ibv_cq* send_cq{NULL};
+ ibv_cq* recv_cq{NULL};
+ ibv_comp_channel* comp_channel{NULL};
+ RdmaResource() = default;
~RdmaResource();
DISALLOW_COPY_AND_ASSIGN(RdmaResource);
};
class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser {
friend class RdmaConnect;
-friend class brpc::Socket;
+friend class Socket;
public:
- RdmaEndpoint(Socket* s);
- ~RdmaEndpoint();
+ explicit RdmaEndpoint(Socket* s);
+ ~RdmaEndpoint() override;
// Global initialization
// Return 0 if success, -1 if failed and errno set
@@ -92,7 +96,8 @@ public:
bool IsWritable() const;
// For debug
- void DebugInfo(std::ostream& os) const;
+ void DebugInfo(std::ostream& os,
+ butil::StringPiece connector = "\n") const;
// Callback when there is new epollin event on TCP fd
static void OnNewDataFromTcp(Socket* m);
@@ -195,7 +200,10 @@ private:
int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num);
// Get event from comp channel and ack the events
- int GetAndAckEvents();
+ int GetAndAckEvents(SocketUniquePtr& s);
+
+ // Request completion notification on a send/recv CQ.
+ int ReqNotifyCq(bool send_cq);
// Poll CQ and get the work completion
static void PollCq(Socket* m);
@@ -221,10 +229,11 @@ private:
// rdma resource
RdmaResource* _resource;
- // the number of events requiring ack
- int _cq_events;
+ // The number of events requiring ack.
+ unsigned int _send_cq_events;
+ unsigned int _recv_cq_events;
- // the SocketId which wrap the comp channel of CQ
+ // The SocketId which wrap the comp channel of CQ.
SocketId _cq_sid;
// Capacity of local Send Queue and local Recv Queue
@@ -257,8 +266,12 @@ private:
uint16_t _local_window_capacity;
// The capacity of remote window: min(local RQ, remote SQ)
uint16_t _remote_window_capacity;
+ // The number of IMM WRs we can post to the local Send Queue.
+ uint16_t _sq_imm_window_size;
+ // The number of WRs we can send to remote side.
+ butil::atomic<uint16_t> _remote_rq_window_size;
// The number of WRs we can post to the local Send Queue
- butil::atomic<uint16_t> _window_size;
+ butil::atomic<uint16_t> _sq_window_size;
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
@@ -282,9 +295,9 @@ private:
butil::MPSCQueue<CqSidOp, butil::ObjectPoolAllocator<CqSidOp>>
op_queue;
// Callback used for io_uring/spdk etc
std::function<void()> callback;
- // Init and Destory function
- std::function<void(void)> init_fn;
- std::function<void(void)> release_fn;
+ // Init and Destroy function
+ std::function<void()> init_fn;
+ std::function<void()> release_fn;
};
// Poller group
struct BAIDU_CACHELINE_ALIGNMENT PollerGroup {
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index ec530098..9490650b 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -895,7 +895,7 @@ void Socket::BeforeRecycled() {
const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) {
SocketUniquePtr ptr;
- if (Socket::Address(asid, &ptr) == 0) {
+ if (Address(asid, &ptr) == 0) {
ptr->ReleaseAdditionalReference();
}
}
@@ -1319,7 +1319,7 @@ int Socket::Connect(const timespec* abstime,
SocketOptions options;
options.bthread_tag = _io_event.bthread_tag();
options.user = req;
- if (Socket::Create(options, &connect_id) != 0) {
+ if (Create(options, &connect_id) != 0) {
LOG(FATAL) << "Fail to create Socket";
delete req;
return -1;
@@ -1328,7 +1328,7 @@ int Socket::Connect(const timespec* abstime,
// `connect_id'. We hold an additional reference here to
// ensure `req' to be valid in this scope
SocketUniquePtr s;
- CHECK_EQ(0, Socket::Address(connect_id, &s));
+ CHECK_EQ(0, Address(connect_id, &s));
// Add `sockfd' into epoll so that `HandleEpollOutRequest' will
// be called with `req' when epoll event reaches
@@ -1425,7 +1425,7 @@ int Socket::ConnectIfNot(const timespec* abstime,
WriteRequest* req) {
void Socket::WakeAsEpollOut() {
_epollout_butex->fetch_add(1, butil::memory_order_release);
- bthread::butex_wake_except(_epollout_butex, 0);
+ bthread::butex_wake_except(_epollout_butex, INVALID_BTHREAD);
}
int Socket::OnOutputEvent(void* user_data, uint32_t,
@@ -1436,7 +1436,7 @@ int Socket::OnOutputEvent(void* user_data, uint32_t,
// added into epoll, these sockets miss the signal inside
// `SetFailed' and therefore must be signalled here using
// `AddressFailedAsWell' to prevent waiting forever
- if (Socket::AddressFailedAsWell(id, &s) < 0) {
+ if (AddressFailedAsWell(id, &s) < 0) {
// Ignore recycled sockets
return -1;
}
@@ -1456,7 +1456,7 @@ int Socket::OnOutputEvent(void* user_data, uint32_t,
void Socket::HandleEpollOutTimeout(void* arg) {
SocketId id = (SocketId)arg;
SocketUniquePtr s;
- if (Socket::Address(id, &s) != 0) {
+ if (Address(id, &s) != 0) {
return;
}
EpollOutRequest* req = dynamic_cast<EpollOutRequest*>(s->user());
@@ -1532,12 +1532,11 @@ int Socket::KeepWriteIfConnected(int fd, int err, void*
data) {
// Run ssl connect in a new bthread to avoid blocking
// the current bthread (thus blocking the EventDispatcher)
bthread_t th;
- std::unique_ptr<google::protobuf::Closure> thrd_func(brpc::NewCallback(
- Socket::CheckConnectedAndKeepWrite, fd, err, data));
+ std::unique_ptr<google::protobuf::Closure> thrd_func(
+ NewCallback(CheckConnectedAndKeepWrite, fd, err, data));
bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
bthread_attr_set_name(&attr, "CheckConnectedAndKeepWrite");
- if ((err = bthread_start_background(&th, &attr,
- RunClosure, thrd_func.get())) ==
0) {
+ if ((err = bthread_start_background(&th, &attr, RunClosure,
thrd_func.get())) == 0) {
thrd_func.release();
return 0;
} else {
@@ -2323,7 +2322,7 @@ std::ostream& operator<<(std::ostream& os, const
ObjectPtr<T>& obj) {
void Socket::DebugSocket(std::ostream& os, SocketId id) {
SocketUniquePtr ptr;
- int ret = Socket::AddressFailedAsWell(id, &ptr);
+ int ret = AddressFailedAsWell(id, &ptr);
if (ret < 0) {
os << "SocketId=" << id << " is invalid or recycled";
return;
@@ -2920,7 +2919,7 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket)
{
opt.app_connect = _app_connect;
opt.use_rdma = (_rdma_ep) ? true : false;
if (get_client_side_messenger()->Create(opt, &id) != 0 ||
- Socket::Address(id, short_socket) != 0) {
+ Address(id, short_socket) != 0) {
return -1;
}
(*short_socket)->ShareStats(this);
@@ -2931,7 +2930,7 @@ int Socket::GetAgentSocket(SocketUniquePtr* out, bool
(*checkfn)(Socket*)) {
SocketId id = _agent_socket_id.load(butil::memory_order_relaxed);
SocketUniquePtr tmp_sock;
do {
- if (Socket::Address(id, &tmp_sock) == 0) {
+ if (Address(id, &tmp_sock) == 0) {
if (checkfn == NULL || checkfn(tmp_sock.get())) {
out->swap(tmp_sock);
return 0;
diff --git a/test/brpc_rdma_unittest.cpp b/test/brpc_rdma_unittest.cpp
index 066d0127..ccb280f1 100644
--- a/test/brpc_rdma_unittest.cpp
+++ b/test/brpc_rdma_unittest.cpp
@@ -209,8 +209,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_magic_str) {
uint8_t data[RDMA_HELLO_MSG_LEN];
memcpy(data, "PRPC", 4); // send as normal baidu_std protocol
- memset(data + 4, 0, 32);
- ASSERT_EQ(38, write(sockfd, data, 38));
+ ASSERT_EQ(4, write(sockfd, data, 4));
usleep(100000); // wait for server to handle the msg
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
@@ -660,9 +659,6 @@ TEST_F(RdmaTest, client_send_data_on_tcp_after_ack_send) {
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
ASSERT_EQ(sizeof(flags), write(sockfd1, &flags, sizeof(flags)));
usleep(100000);
- ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
- close(sockfd1);
- usleep(100000); // wait for server to handle the msg
ASSERT_EQ(NULL, GetSocketFromServer(0));
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]