This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 06b5558c Add EvconnlistenerBase to avoid void* casts (#1641)
06b5558c is described below

commit 06b5558ca6e5401388639ef9130256367dcef9de
Author: Twice <[email protected]>
AuthorDate: Mon Aug 7 06:06:48 2023 +0800

    Add EvconnlistenerBase to avoid void* casts (#1641)
---
 src/common/event_util.h | 16 ++++++++++++++++
 src/server/worker.cc    | 41 +++++++++++++++++++----------------------
 src/server/worker.h     |  7 +++----
 3 files changed, 38 insertions(+), 26 deletions(-)

diff --git a/src/common/event_util.h b/src/common/event_util.h
index 8e92c062..556f40a4 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -27,6 +27,7 @@
 #include "event2/buffer.h"
 #include "event2/bufferevent.h"
 #include "event2/event.h"
+#include "event2/listener.h"
 
 template <typename F, F *f>
 struct StaticFunction {
@@ -120,3 +121,18 @@ struct EventCallbackBase {
 
   event *NewTimer(event_base *base) { return evtimer_new(base, timerCB, 
reinterpret_cast<void *>(this)); }
 };
+
+template <typename Derived>
+struct EvconnlistenerBase {
+ private:
+  template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *, 
int)>
+  static void callback(evconnlistener *listener, evutil_socket_t fd, sockaddr 
*address, int socklen, void *ctx) {
+    return (reinterpret_cast<Derived *>(ctx)->*cb)(listener, fd, address, 
socklen);
+  }
+
+ public:
+  template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *, 
int)>
+  evconnlistener *NewEvconnlistener(event_base *base, unsigned flags, int 
backlog, evutil_socket_t fd) {
+    return evconnlistener_new(base, callback<cb>, this, flags, backlog, fd);
+  }
+};
diff --git a/src/server/worker.cc b/src/server/worker.cc
index befaf97a..788229cf 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -106,10 +106,9 @@ void Worker::TimerCB(int, int16_t events) {
   KickoutIdleClients(config->timeout);
 }
 
-void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, 
sockaddr *address, int socklen, void *ctx) {
-  auto worker = static_cast<Worker *>(ctx);
+void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, 
sockaddr *address, int socklen) {
   int local_port = util::GetLocalPort(fd);  // NOLINT
-  DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << 
local_port << " thread #" << worker->tid_;
+  DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << 
local_port << " thread #" << tid_;
 
   auto s = util::SockSetTcpKeepalive(fd, 120);
   if (!s.IsOK()) {
@@ -132,8 +131,8 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, sock
   bufferevent *bev = nullptr;
 #ifdef ENABLE_OPENSSL
   SSL *ssl = nullptr;
-  if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
-    ssl = SSL_new(worker->svr->ssl_ctx.get());
+  if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
+    ssl = SSL_new(svr->ssl_ctx.get());
     if (!ssl) {
       LOG(ERROR) << "Failed to construct SSL structure for new connection: " 
<< SSLErrors{};
       evutil_closesocket(fd);
@@ -158,15 +157,15 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, sock
     return;
   }
 #ifdef ENABLE_OPENSSL
-  if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
+  if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
     bufferevent_openssl_set_allow_dirty_shutdown(bev, 1);
   }
 #endif
-  auto conn = new redis::Connection(bev, worker);
+  auto conn = new redis::Connection(bev, this);
   conn->SetCB(bev);
   bufferevent_enable(bev, EV_READ);
 
-  s = worker->AddConnection(conn);
+  s = AddConnection(conn);
   if (!s.IsOK()) {
     std::string err_msg = redis::Error("ERR " + s.Msg());
     s = util::SockSend(fd, err_msg);
@@ -183,26 +182,24 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, sock
     conn->SetAddr(ip, port);
   }
 
-  if (worker->rate_limit_group_) {
-    bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
+  if (rate_limit_group_) {
+    bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
   }
 }
 
-void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t 
fd, sockaddr *address, int socklen,
-                                     void *ctx) {
-  auto worker = static_cast<Worker *>(ctx);
-  DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " 
<< worker->svr->GetConfig()->unixsocket
-             << " thread #" << worker->tid_;
+void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t 
fd, sockaddr *address, int socklen) {
+  DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " 
<< svr->GetConfig()->unixsocket
+             << " thread #" << tid_;
   event_base *base = evconnlistener_get_base(listener);
   auto ev_thread_safe_flags =
       BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS 
| BEV_OPT_CLOSE_ON_FREE;
   bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);
 
-  auto conn = new redis::Connection(bev, worker);
+  auto conn = new redis::Connection(bev, this);
   conn->SetCB(bev);
   bufferevent_enable(bev, EV_READ);
 
-  auto s = worker->AddConnection(conn);
+  auto s = AddConnection(conn);
   if (!s.IsOK()) {
     std::string err_msg = redis::Error("ERR " + s.Msg());
     s = util::SockSend(fd, err_msg);
@@ -213,9 +210,9 @@ void Worker::newUnixSocketConnection(evconnlistener 
*listener, evutil_socket_t f
     return;
   }
 
-  conn->SetAddr(worker->svr->GetConfig()->unixsocket, 0);
-  if (worker->rate_limit_group_) {
-    bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
+  conn->SetAddr(svr->GetConfig()->unixsocket, 0);
+  if (rate_limit_group_) {
+    bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
   }
 }
 
@@ -256,7 +253,7 @@ Status Worker::listenTCP(const std::string &host, uint32_t 
port, int backlog) {
     }
 
     evutil_make_socket_nonblocking(fd);
-    auto lev = evconnlistener_new(base_, newTCPConnection, this, 
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
+    auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, 
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
     listen_events_.emplace_back(lev);
   }
 
@@ -282,7 +279,7 @@ Status Worker::ListenUnixSocket(const std::string &path, 
int perm, int backlog)
   }
 
   evutil_make_socket_nonblocking(fd);
-  auto lev = evconnlistener_new(base_, newUnixSocketConnection, this, 
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
+  auto lev = NewEvconnlistener<&Worker::newUnixSocketConnection>(base_, 
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
   listen_events_.emplace_back(lev);
   if (perm != 0) {
     chmod(sa.sun_path, (mode_t)perm);
diff --git a/src/server/worker.h b/src/server/worker.h
index 63561ba8..70cdace2 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -42,7 +42,7 @@
 
 class Server;
 
-class Worker : EventCallbackBase<Worker> {
+class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
  public:
   Worker(Server *svr, Config *config);
   ~Worker();
@@ -76,9 +76,8 @@ class Worker : EventCallbackBase<Worker> {
 
  private:
   Status listenTCP(const std::string &host, uint32_t port, int backlog);
-  static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, 
sockaddr *address, int socklen, void *ctx);
-  static void newUnixSocketConnection(evconnlistener *listener, 
evutil_socket_t fd, sockaddr *address, int socklen,
-                                      void *ctx);
+  void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr 
*address, int socklen);
+  void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, 
sockaddr *address, int socklen);
   redis::Connection *removeConnection(int fd);
 
   event_base *base_;

Reply via email to