sunce4t commented on issue #3132:
URL: https://github.com/apache/brpc/issues/3132#issuecomment-3466917187
> 是的, 这样可以大大简化实现
@Yangfisher1 @yanglimingcn 大佬们看看这个版本
```
diff --git a/src/brpc/rdma/rdma_endpoint.cpp
b/src/brpc/rdma/rdma_endpoint.cpp
index 1d502a98..99ef1159 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -191,6 +191,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
, _remote_window_capacity(0)
, _window_size(0)
, _new_rq_wrs(0)
+ , _remote_recv_window(0)
{
if (_sq_size < MIN_QP_SIZE) {
_sq_size = MIN_QP_SIZE;
@@ -208,6 +209,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
}
RdmaEndpoint::~RdmaEndpoint() {
+ LOG(INFO) << _window_size << " " << _remote_recv_window << " " <<
_sq_unsignaled;
Reset();
bthread::butex_destroy(_read_butex);
}
@@ -231,6 +233,7 @@ void RdmaEndpoint::Reset() {
_new_rq_wrs = 0;
_sq_sent = 0;
_rq_received = 0;
+ _remote_recv_window.store(0, butil::memory_order_relaxed);
}
void RdmaConnect::StartConnect(const Socket* socket,
@@ -514,7 +517,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
ep->_remote_window_capacity =
std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
ep->_window_size.store(ep->_local_window_capacity,
butil::memory_order_relaxed);
-
+ ep->_remote_recv_window.store(ep->_remote_window_capacity,
butil::memory_order_relaxed);
ep->_state = C_BRINGUP_QP;
if (ep->BringUpQp(remote_msg.lid, remote_msg.gid,
remote_msg.qp_num) < 0) {
LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" <<
s->description();
@@ -622,7 +625,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
ep->_remote_window_capacity =
std::min(ep->_rq_size, remote_msg.sq_size) - RESERVED_WR_NUM,
ep->_window_size.store(ep->_local_window_capacity,
butil::memory_order_relaxed);
-
+ ep->_remote_recv_window.store(ep->_remote_window_capacity,
butil::memory_order_relaxed);
ep->_state = S_ALLOC_QPCQ;
if (ep->AllocateResources() < 0) {
LOG(WARNING) << "Fail to allocate rdma resources, fallback to
tcp:"
@@ -787,12 +790,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf**
from, size_t ndata) {
size_t total_len = 0;
size_t current = 0;
uint32_t window = 0;
+ uint32_t recv_window = 0;
ibv_send_wr wr;
int max_sge = GetRdmaMaxSge();
ibv_sge sglist[max_sge];
while (current < ndata) {
- window = _window_size.load(butil::memory_order_relaxed);
- if (window == 0) {
+ window = _window_size.load(butil::memory_order_acquire);
+ recv_window = _remote_recv_window.load(butil::memory_order_acquire);
+ if (window == 0 || recv_window == 0) {
if (total_len > 0) {
break;
} else {
@@ -898,7 +903,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf**
from, size_t ndata) {
// Because there is at most one thread can enter this function for
each
// Socket, and the other thread of HandleCompletion can only add
this
// counter.
- _window_size.fetch_sub(1, butil::memory_order_relaxed);
+ _window_size.fetch_sub(1, butil::memory_order_release);
+ _remote_recv_window.fetch_sub(1, butil::memory_order_release);
}
return total_len;
@@ -921,7 +927,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.imm_data = butil::HostToNet32(imm);
wr.send_flags |= IBV_SEND_SOLICITED;
- wr.send_flags |= IBV_SEND_SIGNALED;
+ //wr.send_flags |= IBV_SEND_SIGNALED;
ibv_send_wr* bad = NULL;
int err = ibv_post_send(_resource->qp, &wr, &bad);
@@ -938,7 +944,23 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
bool zerocopy = FLAGS_rdma_recv_zerocopy;
switch (wc.opcode) {
case IBV_WC_SEND: { // send completion
- // Do nothing
+ uint16_t wnd_to_update = _local_window_capacity / 4;
+ uint32_t num = wnd_to_update;
+ while(num > 0) {
+ _sbuf[_sq_sent++].clear();
+ if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+ _sq_sent = 0;
+ }
+ --num;
+ }
+ butil::subtle::MemoryBarrier();
+ uint32_t wnd_thresh = _local_window_capacity / 8;
+ _window_size.fetch_add(wnd_to_update, butil::memory_order_release);
+ //if ((_remote_recv_window.load(butil::memory_order_relaxed) >=
wnd_thresh)) {
+ // Do not wake up writing thread right after _window_size > 0.
+ // Otherwise the writing thread may switch to background too
quickly.
+ _socket->WakeAsEpollOut();
+ //}
break;
}
case IBV_WC_RECV: { // recv completion
@@ -958,27 +980,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
}
if (wc.imm_data > 0) {
- // Clear sbuf here because we ignore event wakeup for send
completions
uint32_t acks = butil::NetToHost32(wc.imm_data);
- uint32_t num = acks;
- while (num > 0) {
- _sbuf[_sq_sent++].clear();
- if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
- _sq_sent = 0;
- }
- --num;
- }
- butil::subtle::MemoryBarrier();
-
- // Update window
- uint32_t wnd_thresh = _local_window_capacity / 8;
- if (_window_size.fetch_add(acks, butil::memory_order_relaxed)
>= wnd_thresh
- || acks >= wnd_thresh) {
- // Do not wake up writing thread right after _window_size >
0.
- // Otherwise the writing thread may switch to background
too quickly.
- _socket->WakeAsEpollOut();
- }
- }
+ _remote_recv_window.fetch_add(acks, butil::memory_order_release);
+ }
// We must re-post recv WR
if (PostRecv(1, zerocopy) < 0) {
return -1;
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..8cbaf710 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -262,6 +262,7 @@ private:
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
+ butil::atomic<uint16_t> _remote_recv_window;
// butex for inform read events on TCP fd during handshake
butil::atomic<int> *_read_butex;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]