This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 576c4314 feat(worker): Allow passing a socket FD to dup and listen on 
(#2598)
576c4314 is described below

commit 576c43145b4e78e475957d60cc71444fdf96ab94
Author: Nathan <[email protected]>
AuthorDate: Thu Oct 17 21:46:42 2024 -0400

    feat(worker): Allow passing a socket FD to dup and listen on (#2598)
---
 kvrocks.conf         | 10 ++++++++++
 src/cli/main.cc      | 10 ++++++----
 src/config/config.cc |  1 +
 src/config/config.h  |  1 +
 src/server/worker.cc | 44 ++++++++++++++++++++++++++++++++------------
 src/server/worker.h  |  4 ++--
 6 files changed, 52 insertions(+), 18 deletions(-)

diff --git a/kvrocks.conf b/kvrocks.conf
index f6b6b31c..b6a2e027 100644
--- a/kvrocks.conf
+++ b/kvrocks.conf
@@ -20,6 +20,16 @@ bind 127.0.0.1
 # unixsocket /tmp/kvrocks.sock
 # unixsocketperm 777
 
+# Allows a parent process to open a socket and pass its FD down to kvrocks as 
a child 
+# process. Useful to reserve a port and prevent race conditions.
+# 
+# PLEASE NOTE: 
+# If this is overridden to a value other than -1, the bind and tls* directives 
will be 
+# ignored.
+# 
+# Default: -1 (not overridden, defer to creating a connection to the specified 
port)
+socket-fd -1
+
 # Accept connections on the specified port, default is 6666.
 port 6666
 
diff --git a/src/cli/main.cc b/src/cli/main.cc
index a6ed2025..3eaf4b13 100644
--- a/src/cli/main.cc
+++ b/src/cli/main.cc
@@ -30,7 +30,6 @@
 #include <iomanip>
 #include <ostream>
 
-#include "config.h"
 #include "daemon_util.h"
 #include "io_util.h"
 #include "pid_util.h"
@@ -40,9 +39,7 @@
 #include "storage/storage.h"
 #include "string_util.h"
 #include "time_util.h"
-#include "unique_fd.h"
 #include "vendor/crc64.h"
-#include "version.h"
 #include "version_util.h"
 
 Server *srv = nullptr;
@@ -136,6 +133,11 @@ int main(int argc, char *argv[]) {
     std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
     return 1;
   }
+  const auto socket_fd_exit = MakeScopeExit([&config] {
+    if (config.socket_fd != -1) {
+      close(config.socket_fd);
+    }
+  });
 
   crc64_init();
   InitGoogleLog(&config);
@@ -143,7 +145,7 @@ int main(int argc, char *argv[]) {
   // Tricky: We don't expect that different instances running on the same port,
   // but the server use REUSE_PORT to support the multi listeners. So we 
connect
   // the listen port to check if the port has already listened or not.
-  if (!config.binds.empty()) {
+  if (config.socket_fd == -1 && !config.binds.empty()) {
     uint32_t ports[] = {config.port, config.tls_port, 0};
     for (uint32_t *port = ports; *port; ++port) {
       if (util::IsPortInUse(*port)) {
diff --git a/src/config/config.cc b/src/config/config.cc
index 2ced3d08..38b8fbac 100644
--- a/src/config/config.cc
+++ b/src/config/config.cc
@@ -110,6 +110,7 @@ Config::Config() {
       {"daemonize", true, new YesNoField(&daemonize, false)},
       {"bind", true, new StringField(&binds_str_, "")},
       {"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)},
+      {"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)},
 #ifdef ENABLE_OPENSSL
       {"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)},
       {"tls-cert-file", false, new StringField(&tls_cert_file, "")},
diff --git a/src/config/config.h b/src/config/config.h
index 8a2ebfc4..1b7311e4 100644
--- a/src/config/config.h
+++ b/src/config/config.h
@@ -70,6 +70,7 @@ struct Config {
   Config();
   ~Config() = default;
   uint32_t port = 0;
+  int socket_fd = -1;
 
   uint32_t tls_port = 0;
   std::string tls_cert_file;
diff --git a/src/server/worker.cc b/src/server/worker.cc
index d5a751e1..4ddf31ad 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -22,6 +22,7 @@
 
 #include <event2/util.h>
 #include <glog/logging.h>
+#include <unistd.h>
 
 #include <stdexcept>
 #include <string>
@@ -30,7 +31,6 @@
 #include "io_util.h"
 #include "scope_exit.h"
 #include "thread_util.h"
-#include "time_util.h"
 
 #ifdef ENABLE_OPENSSL
 #include <event2/bufferevent_ssl.h>
@@ -44,7 +44,6 @@
 #include <sys/un.h>
 
 #include <algorithm>
-#include <list>
 #include <utility>
 
 #include "redis_connection.h"
@@ -59,17 +58,22 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), 
base_(event_base_new())
   timeval tm = {10, 0};
   evtimer_add(timer_.get(), &tm);
 
-  uint32_t ports[3] = {config->port, config->tls_port, 0};
-  auto binds = config->binds;
-
-  for (uint32_t *port = ports; *port; ++port) {
-    for (const auto &bind : binds) {
-      Status s = listenTCP(bind, *port, config->backlog);
-      if (!s.IsOK()) {
-        LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port 
<< ". Error: " << s.Msg();
-        exit(1);
+  if (config->socket_fd != -1) {
+    if (const Status s = listenFD(config->socket_fd, config->port, 
config->backlog); !s.IsOK()) {
+      LOG(ERROR) << "[worker] Failed to listen to socket with fd: " << 
config->socket_fd << ". Error: " << s.Msg();
+      exit(1);
+    }
+  } else {
+    const uint32_t ports[3] = {config->port, config->tls_port, 0};
+
+    for (const uint32_t *port = ports; *port; ++port) {
+      for (const auto &bind : config->binds) {
+        if (const Status s = listenTCP(bind, *port, config->backlog); 
!s.IsOK()) {
+          LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << 
*port << ". Error: " << s.Msg();
+          exit(1);
+        }
+        LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
       }
-      LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
     }
   }
   lua_ = lua::CreateState(srv);
@@ -216,6 +220,22 @@ void Worker::newUnixSocketConnection(evconnlistener 
*listener, evutil_socket_t f
   }
 }
 
+Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) {
+  const uint32_t port = util::GetLocalPort(fd);
+  if (port != expected_port) {
+    return {Status::NotOK, "The port of the provided socket fd doesn't match 
the configured port"};
+  }
+  const int dup_fd = dup(fd);
+  if (dup_fd == -1) {
+    return {Status::NotOK, 
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
+  }
+  evconnlistener *lev =
+      NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | 
LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
+  listen_events_.emplace_back(lev);
+  LOG(INFO) << "Listening on dup'ed fd: " << dup_fd;
+  return Status::OK();
+}
+
 Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
   bool ipv6_used = strchr(host.data(), ':');
 
diff --git a/src/server/worker.h b/src/server/worker.h
index b6918ba9..ac88ec8a 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -27,7 +27,6 @@
 
 #include <cstdint>
 #include <cstring>
-#include <iostream>
 #include <lua.hpp>
 #include <map>
 #include <memory>
@@ -36,9 +35,9 @@
 #include <utility>
 #include <vector>
 
+#include "config/config.h"
 #include "event_util.h"
 #include "redis_connection.h"
-#include "storage/storage.h"
 
 class Server;
 
@@ -79,6 +78,7 @@ class Worker : EventCallbackBase<Worker>, 
EvconnlistenerBase<Worker> {
   Server *srv;
 
  private:
+  Status listenFD(int fd, uint32_t expected_port, int backlog);
   Status listenTCP(const std::string &host, uint32_t port, int backlog);
   void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr 
*address, int socklen);
   void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, 
sockaddr *address, int socklen);

Reply via email to