This is an automated email from the ASF dual-hosted git repository.

chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 58fec6fe [rdma] drain CQ after moving into RESET state (#3261)
58fec6fe is described below

commit 58fec6feeb34fd67cdc3653ec2126f35152cbd0c
Author: David Lee <[email protected]>
AuthorDate: Mon May 18 10:18:00 2026 +0800

    [rdma] drain CQ after moving into RESET state (#3261)
    
    Otherwise there might segfault due to the race below:
    
    ```txt
    Socket::OnInputEvent()          |
      `-- ProcessEvent (bthread)    |
                                    |
      [ bthread queueed ]           |  QP error -> SetFailed -> HC -> 
WaitAndReset()
                                    |  Reset() -> _sbuf.clear()
                                    |  CheckHealth() -> Revive()
                                    |
                                    |  Socket is now Addressable!
      RdmaEndpoint:PollCq()         |
      Socket::Address() OK!         |
      RdmaEndpoint:HandleCompletion()
      _sbuf[_sq_sent++].clear()    <= BOOM!  CQ is not drained but _sbuf is 
cleared.
    ```
    
    Another possible fix is to add a _generation_ field in RdmaEndpoint, such 
that:
     - each RdmaEndpoint::Reset() will advance the _generation_ by 1;
     - the RdmaEndpoint::PollCq(m, orig_gen) will need to compare the 
_generation_.
    
    But it will contaminate existing interface, and we need to drain CQ anyway.
    
    Signed-off-by: David Lee <[email protected]>
---
 src/brpc/rdma/rdma_endpoint.cpp | 34 ++++++++++++++++++++++++++++++++++
 1 file changed, 34 insertions(+)

diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index c69bf8ec..5b333938 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -1334,6 +1334,21 @@ static void DeallocateCq(ibv_cq* cq) {
     LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err);
 }
 
+static int DrainCq(ibv_cq* cq) {
+    if (NULL == cq) {
+        return 0;
+    }
+
+    ibv_wc wc;
+    int ret;
+    do {
+        ret = ibv_poll_cq(cq, 1, &wc);
+    } while (ret > 0);
+
+    LOG_IF(ERROR, ret < 0) << "drain CQ failed: " << ret;
+    return ret;
+}
+
 void RdmaEndpoint::DeallocateResources() {
     if (!_resource) {
         return;
@@ -1360,6 +1375,7 @@ void RdmaEndpoint::DeallocateResources() {
     }
 
     bool remove_consumer = true;
+_reclaim:
     if (!move_to_rdma_resource_list) {
         if (NULL != _resource->qp) {
             int err = IbvDestroyQp(_resource->qp);
@@ -1403,6 +1419,24 @@ void RdmaEndpoint::DeallocateResources() {
     }
 
     if (move_to_rdma_resource_list) {
+        // When a QP is moved to the RESET state, all associated send and
+        // receive queues are flushed, meaning any outstanding WRs are 
effectively
+        // abandoned by the hardware.
+        //
+        // However, the CQ associated with that QP is *not* cleared 
automatically,
+        // meaning that it will still contain entries for WRs that completed 
before
+        // the reset.
+        //
+        // The application should finish polling the CQ to remove these 
obsolete
+        // entries before reusing the QP.
+        int ret = DrainCq(_resource->polling_cq);
+        ret += DrainCq(_resource->send_cq);
+        ret += DrainCq(_resource->recv_cq);
+        if (ret < 0) {
+            move_to_rdma_resource_list = false;
+            goto _reclaim;
+        }
+
         BAIDU_SCOPED_LOCK(*g_rdma_resource_mutex);
         _resource->next = g_rdma_resource_list;
         g_rdma_resource_list = _resource;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to