This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 576c4314 feat(worker): Allow passing a socket FD to dup and listen on
(#2598)
576c4314 is described below
commit 576c43145b4e78e475957d60cc71444fdf96ab94
Author: Nathan <[email protected]>
AuthorDate: Thu Oct 17 21:46:42 2024 -0400
feat(worker): Allow passing a socket FD to dup and listen on (#2598)
---
kvrocks.conf | 10 ++++++++++
src/cli/main.cc | 10 ++++++----
src/config/config.cc | 1 +
src/config/config.h | 1 +
src/server/worker.cc | 44 ++++++++++++++++++++++++++++++++------------
src/server/worker.h | 4 ++--
6 files changed, 52 insertions(+), 18 deletions(-)
diff --git a/kvrocks.conf b/kvrocks.conf
index f6b6b31c..b6a2e027 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -20,6 +20,16 @@ bind 127.0.0.1
# unixsocket /tmp/kvrocks.sock
# unixsocketperm 777
+# Allows a parent process to open a socket and pass its FD down to kvrocks as
a child
+# process. Useful to reserve a port and prevent race conditions.
+#
+# PLEASE NOTE:
+# If this is overridden to a value other than -1, the bind and tls* directives
will be
+# ignored.
+#
+# Default: -1 (not overridden, defer to creating a connection to the specified
port)
+socket-fd -1
+
# Accept connections on the specified port, default is 6666.
port 6666
diff --git a/src/cli/main.cc b/src/cli/main.cc
index a6ed2025..3eaf4b13 100644
--- a/src/cli/main.cc
+++ b/src/cli/main.cc
@@ -30,7 +30,6 @@
#include <iomanip>
#include <ostream>
-#include "config.h"
#include "daemon_util.h"
#include "io_util.h"
#include "pid_util.h"
@@ -40,9 +39,7 @@
#include "storage/storage.h"
#include "string_util.h"
#include "time_util.h"
-#include "unique_fd.h"
#include "vendor/crc64.h"
-#include "version.h"
#include "version_util.h"
Server *srv = nullptr;
@@ -136,6 +133,11 @@ int main(int argc, char *argv[]) {
std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
return 1;
}
+ const auto socket_fd_exit = MakeScopeExit([&config] {
+ if (config.socket_fd != -1) {
+ close(config.socket_fd);
+ }
+ });
crc64_init();
InitGoogleLog(&config);
@@ -143,7 +145,7 @@ int main(int argc, char *argv[]) {
// Tricky: We don't expect that different instances running on the same port,
// but the server use REUSE_PORT to support the multi listeners. So we
connect
// the listen port to check if the port has already listened or not.
- if (!config.binds.empty()) {
+ if (config.socket_fd == -1 && !config.binds.empty()) {
uint32_t ports[] = {config.port, config.tls_port, 0};
for (uint32_t *port = ports; *port; ++port) {
if (util::IsPortInUse(*port)) {
diff --git a/src/config/config.cc b/src/config/config.cc
index 2ced3d08..38b8fbac 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -110,6 +110,7 @@ Config::Config() {
{"daemonize", true, new YesNoField(&daemonize, false)},
{"bind", true, new StringField(&binds_str_, "")},
{"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)},
+ {"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)},
#ifdef ENABLE_OPENSSL
{"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)},
{"tls-cert-file", false, new StringField(&tls_cert_file, "")},
diff --git a/src/config/config.h b/src/config/config.h
index 8a2ebfc4..1b7311e4 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -70,6 +70,7 @@ struct Config {
Config();
~Config() = default;
uint32_t port = 0;
+ int socket_fd = -1;
uint32_t tls_port = 0;
std::string tls_cert_file;
diff --git a/src/server/worker.cc b/src/server/worker.cc
index d5a751e1..4ddf31ad 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -22,6 +22,7 @@
#include <event2/util.h>
#include <glog/logging.h>
+#include <unistd.h>
#include <stdexcept>
#include <string>
@@ -30,7 +31,6 @@
#include "io_util.h"
#include "scope_exit.h"
#include "thread_util.h"
-#include "time_util.h"
#ifdef ENABLE_OPENSSL
#include <event2/bufferevent_ssl.h>
@@ -44,7 +44,6 @@
#include <sys/un.h>
#include <algorithm>
-#include <list>
#include <utility>
#include "redis_connection.h"
@@ -59,17 +58,22 @@ Worker::Worker(Server *srv, Config *config) : srv(srv),
base_(event_base_new())
timeval tm = {10, 0};
evtimer_add(timer_.get(), &tm);
- uint32_t ports[3] = {config->port, config->tls_port, 0};
- auto binds = config->binds;
-
- for (uint32_t *port = ports; *port; ++port) {
- for (const auto &bind : binds) {
- Status s = listenTCP(bind, *port, config->backlog);
- if (!s.IsOK()) {
- LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port
<< ". Error: " << s.Msg();
- exit(1);
+ if (config->socket_fd != -1) {
+ if (const Status s = listenFD(config->socket_fd, config->port,
config->backlog); !s.IsOK()) {
+ LOG(ERROR) << "[worker] Failed to listen to socket with fd: " <<
config->socket_fd << ". Error: " << s.Msg();
+ exit(1);
+ }
+ } else {
+ const uint32_t ports[3] = {config->port, config->tls_port, 0};
+
+ for (const uint32_t *port = ports; *port; ++port) {
+ for (const auto &bind : config->binds) {
+ if (const Status s = listenTCP(bind, *port, config->backlog);
!s.IsOK()) {
+ LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" <<
*port << ". Error: " << s.Msg();
+ exit(1);
+ }
+ LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
}
- LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
}
}
lua_ = lua::CreateState(srv);
@@ -216,6 +220,22 @@ void Worker::newUnixSocketConnection(evconnlistener
*listener, evutil_socket_t f
}
}
+Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) {
+ const uint32_t port = util::GetLocalPort(fd);
+ if (port != expected_port) {
+ return {Status::NotOK, "The port of the provided socket fd doesn't match
the configured port"};
+ }
+ const int dup_fd = dup(fd);
+ if (dup_fd == -1) {
+ return {Status::NotOK,
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
+ }
+ evconnlistener *lev =
+ NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE |
LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
+ listen_events_.emplace_back(lev);
+ LOG(INFO) << "Listening on dup'ed fd: " << dup_fd;
+ return Status::OK();
+}
+
Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
bool ipv6_used = strchr(host.data(), ':');
diff --git a/src/server/worker.h b/src/server/worker.h
index b6918ba9..ac88ec8a 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -27,7 +27,6 @@
#include <cstdint>
#include <cstring>
-#include <iostream>
#include <lua.hpp>
#include <map>
#include <memory>
@@ -36,9 +35,9 @@
#include <utility>
#include <vector>
+#include "config/config.h"
#include "event_util.h"
#include "redis_connection.h"
-#include "storage/storage.h"
class Server;
@@ -79,6 +78,7 @@ class Worker : EventCallbackBase<Worker>,
EvconnlistenerBase<Worker> {
Server *srv;
private:
+ Status listenFD(int fd, uint32_t expected_port, int backlog);
Status listenTCP(const std::string &host, uint32_t port, int backlog);
void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr
*address, int socklen);
void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd,
sockaddr *address, int socklen);