Copilot commented on code in PR #3145:
URL: https://github.com/apache/brpc/pull/3145#discussion_r2532993304


##########
src/brpc/socket.cpp:
##########
@@ -1425,7 +1425,7 @@ int Socket::ConnectIfNot(const timespec* abstime, 
WriteRequest* req) {
 
 void Socket::WakeAsEpollOut() {
     _epollout_butex->fetch_add(1, butil::memory_order_release);
-    bthread::butex_wake_except(_epollout_butex, 0);
+    bthread::butex_wake_except(_epollout_butex, INVALID_BTHREAD);

Review Comment:
   [nitpick] The parameter change from `0` to `INVALID_BTHREAD` should be 
verified. The function `bthread::butex_wake_except` likely expects a bthread ID 
to exclude from waking. Using `INVALID_BTHREAD` instead of `0` is semantically 
clearer, but please ensure that `INVALID_BTHREAD` is the correct constant for 
this purpose and that both values represent the same thing (i.e., no thread to 
exclude).



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1276,73 +1371,49 @@ void RdmaEndpoint::DeallocateResources() {
             move_to_rdma_resource_list = true;
         }
     }
-    int fd = -1;
-    if (_resource->comp_channel) {
-        fd = _resource->comp_channel->fd;
-    }
-    int err;
     if (!move_to_rdma_resource_list) {
-        if (_resource->qp) {
-            err = IbvDestroyQp(_resource->qp);
-            if (err != 0) {
-                LOG(WARNING) << "Fail to destroy QP: " << berror(err);
-            }
+        if (NULL == _resource->qp) {

Review Comment:
   Logic error: The condition checks if `_resource->qp` is NULL, but then 
attempts to destroy it. The condition should be `!= NULL` or the logic inside 
should be inverted. This will cause `IbvDestroyQp` to be called with a NULL 
pointer.
   ```suggestion
           if (_resource->qp != NULL) {
   ```



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -938,8 +968,16 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
     bool zerocopy = FLAGS_rdma_recv_zerocopy;
     switch (wc.opcode) {
     case IBV_WC_SEND: {  // send completion
-        // Do nothing
-        break;
+        if (SEND_TYPE_RDMA_IMM == wc.wr_id) {
+            // Do nothing for imm.
+            return 0;
+        }
+        // Update window
+        uint16_t wnd_to_update = _local_window_capacity / 4;
+        _sq_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed);

Review Comment:
   The window update logic only triggers on signaled send completions (every 
~1/4 of window capacity based on `_sq_unsignaled`). However, `_sq_window_size` 
is decremented for every send in `CutFromIOBufList` but only incremented here 
in batches of `_local_window_capacity / 4`. This could lead to window size 
underflow or incorrect accounting if the signaling interval doesn't match the 
update interval.



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1276,73 +1371,49 @@ void RdmaEndpoint::DeallocateResources() {
             move_to_rdma_resource_list = true;
         }
     }
-    int fd = -1;
-    if (_resource->comp_channel) {
-        fd = _resource->comp_channel->fd;
-    }
-    int err;
     if (!move_to_rdma_resource_list) {
-        if (_resource->qp) {
-            err = IbvDestroyQp(_resource->qp);
-            if (err != 0) {
-                LOG(WARNING) << "Fail to destroy QP: " << berror(err);
-            }
+        if (NULL == _resource->qp) {
+            int err = IbvDestroyQp(_resource->qp);
+            LOG_IF(WARNING, 0 != err) << "Fail to destroy QP: " << berror(err);
             _resource->qp = NULL;
         }
-        if (_resource->cq) {
-            IbvAckCqEvents(_resource->cq, _cq_events);
-            err = IbvDestroyCq(_resource->cq);
-            if (err != 0) {
-                PLOG(WARNING) << "Fail to destroy CQ: " << berror(err);
-            }
-            _resource->cq = NULL;
-        }
-        if (_resource->comp_channel) {
-            // destroy comp_channel will destroy this fd
-            // so that we should remove it from epoll fd first
-            _socket->_io_event.RemoveConsumer(fd);
-            fd = -1;
-            err = IbvDestroyCompChannel(_resource->comp_channel);
-            if (err != 0) {
-                LOG(WARNING) << "Fail to destroy CQ channel: " << berror(err);
-            }
-            _resource->comp_channel = NULL;
-        }
+
+        DeallocateCq(_resource->send_cq, _resource->send_comp_channel,
+                     _send_cq_events, _socket->_io_event.bthread_tag());
+        DeallocateCq(_resource->recv_cq, _resource->recv_comp_channel,
+                     _recv_cq_events, _socket->_io_event.bthread_tag());
+        _resource->send_cq = NULL;
+        _resource->recv_cq = NULL;
+        _resource->send_comp_channel = NULL;
+        _resource->recv_comp_channel = NULL;
         delete _resource;
-        _resource = NULL;
     }
 
-    SocketUniquePtr s;
-    if (_cq_sid != INVALID_SOCKET_ID) {
-        if (Socket::Address(_cq_sid, &s) == 0) {
-            s->_user = NULL;  // do not release user (this RdmaEndpoint)
-            if (fd >= 0) {
-                _socket->_io_event.RemoveConsumer(fd);
-            }
-            s->_fd = -1;  // already remove fd from epoll fd
-            s->SetFailed();
-        }
-        _cq_sid = INVALID_SOCKET_ID;
-    }
+    SetSocketFailed(_send_cq_sid, move_to_rdma_resource_list);
+    SetSocketFailed(_recv_cq_sid, move_to_rdma_resource_list);
+    SetSocketFailed(_polling_cq_sid, false);
 
     if (move_to_rdma_resource_list) {
-        if (_resource->cq) {
-            IbvAckCqEvents(_resource->cq, _cq_events);
+        if (NULL != _resource->send_cq) {
+            LOG(INFO) << "Remove send " << _resource->send_cq  << " " << 
_send_cq_events;

Review Comment:
   Debug log statement left in production code. This LOG(INFO) statement should 
be removed or changed to a debug-level log (e.g., LOG_IF with a flag check).
   ```suggestion
               LOG(DEBUG) << "Remove send " << _resource->send_cq  << " " << 
_send_cq_events;
   ```



##########
src/brpc/rdma/rdma_endpoint.h:
##########
@@ -129,6 +132,17 @@ friend class brpc::Socket;
     // Process handshake at the server
     static void* ProcessHandshakeAtServer(void* arg);
 
+    // Cearte a socket which wrap the comp channel of CQ.

Review Comment:
   Spelling error: "Cearte" should be "Create".
   ```suggestion
       // Create a socket which wrap the comp channel of CQ.
   ```



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -253,6 +266,7 @@ void RdmaConnect::StartConnect(const Socket* socket,
     if (bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL,
                 RdmaEndpoint::ProcessHandshakeAtClient, socket->_rdma_ep) < 0) 
{
         LOG(FATAL) << "Fail to start handshake bthread";

Review Comment:
   After the bthread fails to start, `Run()` is called which will invoke the 
done callback. However, the SocketUniquePtr `s` is not released in this error 
path, potentially causing a resource leak. The `s.release()` at line 271 only 
happens in the success case. Consider calling `s.release()` before `Run()` or 
handling the cleanup differently.
   ```suggestion
           LOG(FATAL) << "Fail to start handshake bthread";
           s.release();
   ```



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1046,71 +1082,100 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool 
zerocopy) {
     return 0;
 }
 
-static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
-    RdmaResource* res = new (std::nothrow) RdmaResource;
-    if (!res) {
-        return NULL;
+static bool AllocateCq(ibv_comp_channel*& comp_channel, ibv_cq*& cq) {
+    comp_channel = IbvCreateCompChannel(GetRdmaContext());
+    if (NULL == comp_channel) {
+        PLOG(WARNING) << "Fail to create comp channel for CQ";
+        return false;
     }
 
+    if (butil::make_close_on_exec(comp_channel->fd) < 0) {
+        PLOG(WARNING) << "Fail to set comp channel close-on-exec";
+        return false;
+    }
+    if (butil::make_non_blocking(comp_channel->fd) < 0) {
+        PLOG(WARNING) << "Fail to set comp channel nonblocking";
+        return false;
+    }
+
+    cq = IbvCreateCq(GetRdmaContext(), FLAGS_rdma_prepared_qp_size,
+                               NULL, comp_channel, GetRdmaCompVector());
+    if (NULL == cq) {
+        PLOG(WARNING) << "Fail to create CQ";
+        return false;
+    }
+
+    return true;
+}
+
+static ibv_qp* AllocateQp(ibv_cq* send_cq, ibv_cq* recv_cq, uint32_t sq_size, 
uint32_t rq_size) {
+    ibv_qp_init_attr attr;
+    memset(&attr, 0, sizeof(attr));
+    attr.send_cq = send_cq;
+    attr.recv_cq = recv_cq;
+    attr.cap.max_send_wr = sq_size;
+    attr.cap.max_recv_wr = rq_size;
+    attr.cap.max_send_sge = GetRdmaMaxSge();
+    attr.cap.max_recv_sge = 1;
+    attr.qp_type = IBV_QPT_RC;
+    return IbvCreateQp(GetRdmaPd(), &attr);
+}
+
+static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
+    std::unique_ptr<RdmaResource> resource(new RdmaResource);
     if (!FLAGS_rdma_use_polling) {
-        res->comp_channel = IbvCreateCompChannel(GetRdmaContext());
-        if (!res->comp_channel) {
-            PLOG(WARNING) << "Fail to create comp channel for CQ";
-            delete res;
+        if (!AllocateCq(resource->send_comp_channel, resource->send_cq)) {
+            PLOG(WARNING) << "Fail to create send CQ";
             return NULL;
         }
 
-        butil::make_close_on_exec(res->comp_channel->fd);
-        if (butil::make_non_blocking(res->comp_channel->fd) < 0) {
-            PLOG(WARNING) << "Fail to set comp channel nonblocking";
-            delete res;
+        if (!AllocateCq(resource->recv_comp_channel, resource->recv_cq)) {
+            PLOG(WARNING) << "Fail to create recv CQ";
             return NULL;
         }
 
-        res->cq = IbvCreateCq(GetRdmaContext(), 2 * 
FLAGS_rdma_prepared_qp_size,
-                              NULL, res->comp_channel, GetRdmaCompVector());
-        if (!res->cq) {
-            PLOG(WARNING) << "Fail to create CQ";
-            delete res;
+        resource->qp = AllocateQp(resource->send_cq, resource->recv_cq, 
sq_size, rq_size);
+        if (NULL == resource->qp) {
+            PLOG(WARNING) << "Fail to create QP";
             return NULL;
         }
     } else {
-        res->cq = IbvCreateCq(GetRdmaContext(), 2 * 
FLAGS_rdma_prepared_qp_size,
-                              NULL, NULL, 0);
-        if (!res->cq) {
-            PLOG(WARNING) << "Fail to create CQ";
-            delete res;
+        resource->polling_cq =
+            IbvCreateCq(GetRdmaContext(), 2 * FLAGS_rdma_prepared_qp_size, 
NULL, NULL, 0);
+        if (NULL == resource->polling_cq) {
+            PLOG(WARNING) << "Fail to create polling CQ";
+            return NULL;
+        }
+        resource->qp = AllocateQp(resource->polling_cq,
+                                  resource->polling_cq,
+                                  sq_size, rq_size);
+        if (NULL == resource->qp) {
+            PLOG(WARNING) << "Fail to create QP";
             return NULL;
         }
     }
 
-    ibv_qp_init_attr attr;
-    memset(&attr, 0, sizeof(attr));
-    attr.send_cq = res->cq;
-    attr.recv_cq = res->cq;
-    // NOTE: Since we hope to reduce send completion events, we set signaled
-    // send_wr every 1/4 of the total wnd. The wnd will increase when the ack
-    // is received, which means the receive side has already received the data
-    // in the corresponding send_wr. However, the ack does not mean the send_wr
-    // has been removed from SQ if it is set unsignaled. The reason is that
-    // the unsignaled send_wr is removed from SQ only after the CQE of next
-    // signaled send_wr is polled. Thus in a rare case, a new send_wr cannot be
-    // posted to SQ even in the wnd is not empty. In order to solve this
-    // problem, we enlarge the size of SQ to contain redundant 1/4 of the wnd,
-    // which is the maximum number of unsignaled send_wrs.
-    attr.cap.max_send_wr = sq_size * 5 / 4; /*NOTE*/
-    attr.cap.max_recv_wr = rq_size;
-    attr.cap.max_send_sge = GetRdmaMaxSge();
-    attr.cap.max_recv_sge = 1;
-    attr.qp_type = IBV_QPT_RC;
-    res->qp = IbvCreateQp(GetRdmaPd(), &attr);
-    if (!res->qp) {
-        PLOG(WARNING) << "Fail to create QP";
-        delete res;
-        return NULL;
+    return resource.release();
+}
+
+SocketId RdmaEndpoint::CreateSocket(int fd, ibv_cq* cq, int solicited_only) {
+    SocketId socket_id = INVALID_SOCKET_ID;
+    int err = ibv_req_notify_cq(cq, solicited_only);
+    if (err != 0) {
+        LOG(WARNING) << "Fail to arm CQ comp channel: " << berror(err);
+        return socket_id;
     }
 
-    return res;
+    SocketOptions options;
+    options.user = this;
+    options.keytable_pool = _socket->_keytable_pool;
+    options.fd = fd;

Review Comment:
   [nitpick] Inconsistent NULL comparison style. The codebase uses both `NULL 
== pointer` (Yoda conditions) and `pointer != NULL`. For consistency and 
readability, consider using a uniform style throughout the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to