This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 06b5558c Add EvconnlistenerBase to avoid void* casts (#1641)
06b5558c is described below
commit 06b5558ca6e5401388639ef9130256367dcef9de
Author: Twice <[email protected]>
AuthorDate: Mon Aug 7 06:06:48 2023 +0800
Add EvconnlistenerBase to avoid void* casts (#1641)
---
src/common/event_util.h | 16 ++++++++++++++++
src/server/worker.cc | 41 +++++++++++++++++++----------------------
src/server/worker.h | 7 +++----
3 files changed, 38 insertions(+), 26 deletions(-)
diff --git a/src/common/event_util.h b/src/common/event_util.h
index 8e92c062..556f40a4 100644
--- a/src/common/event_util.h
+++ b/src/common/event_util.h
@@ -27,6 +27,7 @@
#include "event2/buffer.h"
#include "event2/bufferevent.h"
#include "event2/event.h"
+#include "event2/listener.h"
template <typename F, F *f>
struct StaticFunction {
@@ -120,3 +121,18 @@ struct EventCallbackBase {
event *NewTimer(event_base *base) { return evtimer_new(base, timerCB,
reinterpret_cast<void *>(this)); }
};
+
+template <typename Derived>
+struct EvconnlistenerBase {
+ private:
+ template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *,
int)>
+ static void callback(evconnlistener *listener, evutil_socket_t fd, sockaddr
*address, int socklen, void *ctx) {
+ return (reinterpret_cast<Derived *>(ctx)->*cb)(listener, fd, address,
socklen);
+ }
+
+ public:
+ template <void (Derived::*cb)(evconnlistener *, evutil_socket_t, sockaddr *,
int)>
+ evconnlistener *NewEvconnlistener(event_base *base, unsigned flags, int
backlog, evutil_socket_t fd) {
+ return evconnlistener_new(base, callback<cb>, this, flags, backlog, fd);
+ }
+};
diff --git a/src/server/worker.cc b/src/server/worker.cc
index befaf97a..788229cf 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -106,10 +106,9 @@ void Worker::TimerCB(int, int16_t events) {
KickoutIdleClients(config->timeout);
}
-void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx) {
- auto worker = static_cast<Worker *>(ctx);
+void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen) {
int local_port = util::GetLocalPort(fd); // NOLINT
- DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " <<
local_port << " thread #" << worker->tid_;
+ DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " <<
local_port << " thread #" << tid_;
auto s = util::SockSetTcpKeepalive(fd, 120);
if (!s.IsOK()) {
@@ -132,8 +131,8 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, sock
bufferevent *bev = nullptr;
#ifdef ENABLE_OPENSSL
SSL *ssl = nullptr;
- if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
- ssl = SSL_new(worker->svr->ssl_ctx.get());
+ if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
+ ssl = SSL_new(svr->ssl_ctx.get());
if (!ssl) {
LOG(ERROR) << "Failed to construct SSL structure for new connection: "
<< SSLErrors{};
evutil_closesocket(fd);
@@ -158,15 +157,15 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, sock
return;
}
#ifdef ENABLE_OPENSSL
- if (uint32_t(local_port) == worker->svr->GetConfig()->tls_port) {
+ if (uint32_t(local_port) == svr->GetConfig()->tls_port) {
bufferevent_openssl_set_allow_dirty_shutdown(bev, 1);
}
#endif
- auto conn = new redis::Connection(bev, worker);
+ auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);
- s = worker->AddConnection(conn);
+ s = AddConnection(conn);
if (!s.IsOK()) {
std::string err_msg = redis::Error("ERR " + s.Msg());
s = util::SockSend(fd, err_msg);
@@ -183,26 +182,24 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, sock
conn->SetAddr(ip, port);
}
- if (worker->rate_limit_group_) {
- bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
+ if (rate_limit_group_) {
+ bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}
-void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t
fd, sockaddr *address, int socklen,
- void *ctx) {
- auto worker = static_cast<Worker *>(ctx);
- DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: "
<< worker->svr->GetConfig()->unixsocket
- << " thread #" << worker->tid_;
+void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t
fd, sockaddr *address, int socklen) {
+ DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: "
<< svr->GetConfig()->unixsocket
+ << " thread #" << tid_;
event_base *base = evconnlistener_get_base(listener);
auto ev_thread_safe_flags =
BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS
| BEV_OPT_CLOSE_ON_FREE;
bufferevent *bev = bufferevent_socket_new(base, fd, ev_thread_safe_flags);
- auto conn = new redis::Connection(bev, worker);
+ auto conn = new redis::Connection(bev, this);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ);
- auto s = worker->AddConnection(conn);
+ auto s = AddConnection(conn);
if (!s.IsOK()) {
std::string err_msg = redis::Error("ERR " + s.Msg());
s = util::SockSend(fd, err_msg);
@@ -213,9 +210,9 @@ void Worker::newUnixSocketConnection(evconnlistener
*listener, evutil_socket_t f
return;
}
- conn->SetAddr(worker->svr->GetConfig()->unixsocket, 0);
- if (worker->rate_limit_group_) {
- bufferevent_add_to_rate_limit_group(bev, worker->rate_limit_group_);
+ conn->SetAddr(svr->GetConfig()->unixsocket, 0);
+ if (rate_limit_group_) {
+ bufferevent_add_to_rate_limit_group(bev, rate_limit_group_);
}
}
@@ -256,7 +253,7 @@ Status Worker::listenTCP(const std::string &host, uint32_t
port, int backlog) {
}
evutil_make_socket_nonblocking(fd);
- auto lev = evconnlistener_new(base_, newTCPConnection, this,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
+ auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
}
@@ -282,7 +279,7 @@ Status Worker::ListenUnixSocket(const std::string &path,
int perm, int backlog)
}
evutil_make_socket_nonblocking(fd);
- auto lev = evconnlistener_new(base_, newUnixSocketConnection, this,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
+ auto lev = NewEvconnlistener<&Worker::newUnixSocketConnection>(base_,
LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
if (perm != 0) {
chmod(sa.sun_path, (mode_t)perm);
diff --git a/src/server/worker.h b/src/server/worker.h
index 63561ba8..70cdace2 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -42,7 +42,7 @@
class Server;
-class Worker : EventCallbackBase<Worker> {
+class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
public:
Worker(Server *svr, Config *config);
~Worker();
@@ -76,9 +76,8 @@ class Worker : EventCallbackBase<Worker> {
private:
Status listenTCP(const std::string &host, uint32_t port, int backlog);
- static void newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen, void *ctx);
- static void newUnixSocketConnection(evconnlistener *listener,
evutil_socket_t fd, sockaddr *address, int socklen,
- void *ctx);
+ void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr
*address, int socklen);
+ void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen);
redis::Connection *removeConnection(int fd);
event_base *base_;