ivanallen commented on code in PR #2920:
URL: https://github.com/apache/brpc/pull/2920#discussion_r2002770610


##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1486,6 +1527,84 @@ void RdmaEndpoint::GlobalRelease() {
             delete res;
         }
     }
+
+    if (FLAGS_rdma_use_polling) {
+        PollingModeRelease();
+    }
+}
+
+std::vector<RdmaEndpoint::Poller> RdmaEndpoint::_pollers;
+std::atomic<bool> RdmaEndpoint::_running(false);
+std::function<void()> RdmaEndpoint::_callback(nullptr);
+butil::Mutex RdmaEndpoint::_cb_mutex;
+
+void RdmaEndpoint::SetCallbackFn(std::function<void()> cb) { _callback = cb; }
+
+int RdmaEndpoint::PollingModeInitialize() {
+    auto fn = [](void* args) -> void* {
+        auto poller = static_cast<Poller*>(args);
+        while (_running.load(butil::memory_order_relaxed)) {
+            std::list<Socket*> sockets;
+            {
+                std::unique_lock<butil::Mutex> lk(poller->mutex);
+                sockets = poller->sockets;  // copy all sockets is not good
+            }
+            for (auto m : sockets) {
+                PollCq(m);
+            }
+            {
+                std::unique_lock<butil::Mutex> lk(_cb_mutex);
+                if (_callback) {
+                    _callback();

Review Comment:
   这里带着 pthread 锁运行用户的函数,感觉会比较危险?如果用户内部调用切协程的函数,可能会死锁。
   我理解这里可以不用带着锁跑?



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1486,6 +1527,84 @@ void RdmaEndpoint::GlobalRelease() {
             delete res;
         }
     }
+
+    if (FLAGS_rdma_use_polling) {
+        PollingModeRelease();
+    }
+}
+
+std::vector<RdmaEndpoint::Poller> RdmaEndpoint::_pollers;
+std::atomic<bool> RdmaEndpoint::_running(false);
+std::function<void()> RdmaEndpoint::_callback(nullptr);
+butil::Mutex RdmaEndpoint::_cb_mutex;
+
+void RdmaEndpoint::SetCallbackFn(std::function<void()> cb) { _callback = cb; }
+
+int RdmaEndpoint::PollingModeInitialize() {
+    auto fn = [](void* args) -> void* {
+        auto poller = static_cast<Poller*>(args);
+        while (_running.load(butil::memory_order_relaxed)) {
+            std::list<Socket*> sockets;
+            {
+                std::unique_lock<butil::Mutex> lk(poller->mutex);
+                sockets = poller->sockets;  // copy all sockets is not good

Review Comment:
   这里怎么保证 socket 还是有效的?感觉还是只能保存弱引用?



##########
src/brpc/socket.cpp:
##########
@@ -2273,6 +2273,14 @@ int Socket::OnInputEvent(void* user_data, uint32_t 
events,
         attr.tag = bthread_self_tag();
         if (FLAGS_usercode_in_coroutine) {
             ProcessEvent(p);
+#if BRPC_WITH_RDMA
+        } else if (rdma::FLAGS_rdma_use_polling && p->_rdma_state == RDMA_ON) {
+            auto rc = bthread_start_background(&tid, &attr, ProcessEvent, p);

Review Comment:
   加这个分支的目的是啥



##########
src/brpc/rdma/rdma_endpoint.cpp:
##########
@@ -1486,6 +1527,84 @@ void RdmaEndpoint::GlobalRelease() {
             delete res;
         }
     }
+
+    if (FLAGS_rdma_use_polling) {
+        PollingModeRelease();
+    }
+}
+
+std::vector<RdmaEndpoint::Poller> RdmaEndpoint::_pollers;
+std::atomic<bool> RdmaEndpoint::_running(false);
+std::function<void()> RdmaEndpoint::_callback(nullptr);
+butil::Mutex RdmaEndpoint::_cb_mutex;
+
+void RdmaEndpoint::SetCallbackFn(std::function<void()> cb) { _callback = cb; }
+
+int RdmaEndpoint::PollingModeInitialize() {
+    auto fn = [](void* args) -> void* {
+        auto poller = static_cast<Poller*>(args);
+        while (_running.load(butil::memory_order_relaxed)) {
+            std::list<Socket*> sockets;
+            {
+                std::unique_lock<butil::Mutex> lk(poller->mutex);
+                sockets = poller->sockets;  // copy all sockets is not good
+            }
+            for (auto m : sockets) {
+                PollCq(m);
+            }
+            {
+                std::unique_lock<butil::Mutex> lk(_cb_mutex);
+                if (_callback) {
+                    _callback();
+                }
+            }
+            if (FLAGS_rdma_poller_yield) {
+                bthread_yield();
+            }
+        }
+        return nullptr;
+    };
+    _pollers = std::vector<Poller>(FLAGS_rdma_poller_num);
+    _running.store(true, butil::memory_order_relaxed);
+    for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
+        auto rc = bthread_start_background(
+            &_pollers[i].tid, &BTHREAD_ATTR_NORMAL, fn, &_pollers[i]);
+        if (rc != 0) {
+            LOG(ERROR) << "Fail to start rdma polling bthread";
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void RdmaEndpoint::PollingModeRelease() {
+    _running.store(false, butil::memory_order_relaxed);
+    for (int i = 0; i < FLAGS_rdma_poller_num; ++i) {
+        bthread_join(_pollers[i].tid, nullptr);
+    }
+}
+
+// Add socket to poller
+void RdmaEndpoint::PollerAddCqSocket() {
+    SocketUniquePtr s;

Review Comment:
   SocketUniquePtr 析构后,再保存 socket 指针会比较危险吧。
   要么保存 SocketUniquePtr,要么保存弱引用 id



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to