szy441687879 commented on code in PR #2920:
URL: https://github.com/apache/brpc/pull/2920#discussion_r2094747304


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1486,6 +1526,119 @@ void RdmaEndpoint::GlobalRelease() {
             delete res;
         }
     }
+    // release polling mode at exit or call RdmaEndpoint::PollingModeRelease
+    // explicitly
+    if (FLAGS_rdma_use_polling) {
+        for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+            PollingModeRelease(i);
+        }
+    }
+}
+
+std::vector<RdmaEndpoint::PollerGroup> RdmaEndpoint::_poller_groups;
+
+void RdmaEndpoint::SetCallbackFn(std::function<void()> cb, bthread_tag_t tag) {
+    auto& group = _poller_groups[tag];
+    auto& pollers = group.pollers;
+    for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
+        auto& poller = pollers[i];
+        std::unique_lock<bthread::Mutex> lk(poller.callback_mutex);
+        poller.callback = cb;
+    }
+}
+
+int RdmaEndpoint::PollingModeInitialize(bthread_tag_t tag) {
+    if (!FLAGS_rdma_use_polling) {
+        return 0;
+    }
+    auto& group = _poller_groups[tag];
+    auto& pollers = group.pollers;
+    auto& running = group.running;
+    bool expected = false;
+    if (!running.compare_exchange_strong(expected, true)) {
+        return 0;
+    }
+    struct FnArgs {
+        Poller* poller;
+        std::atomic<bool>* running;
+    };
+    auto fn = [](void* p) -> void* {
+        std::unique_ptr<FnArgs> args(static_cast<FnArgs*>(p));
+        auto poller = args->poller;
+        auto running = args->running;
+        std::unordered_set<SocketId> cq_sids;
+        CqSidOp op;
+        while (running->load(std::memory_order_relaxed)) {
+            while (poller->op_queue.Dequeue(op)) {
+                if (op.type == CqSidOp::ADD) {
+                    cq_sids.emplace(op.sid);
+                } else if (op.type == CqSidOp::REMOVE) {
+                    cq_sids.erase(op.sid);
+                }
+            }
+            for (auto sid : cq_sids) {
+                SocketUniquePtr s;
+                if (Socket::Address(sid, &s) < 0) {
+                    continue;
+                }
+                PollCq(s.get());
+            }
+            {
+                std::unique_lock<bthread::Mutex> lk(poller->callback_mutex);
+                if (poller->callback) {

Review Comment:
   > 在setcallcack的时候,在回调函数里面执行一个一次性的设置可以吗?
   
   初始化可以,但是退出的析构没法实现



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to