This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6c02274ce [rpc] increase listened socket backlog up to 512
6c02274ce is described below
commit 6c02274ce275615fbbc83703b6f695a0a53c87f1
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Dec 14 10:36:31 2023 -0800
[rpc] increase listened socket backlog up to 512
This patch updates the default value for --rpc_acceptor_listen_backlog,
setting it up to 512 by default. This is to help busy Kudu servers
accommodating of larger bursts of incoming requests for new RPC
connections. In addition, a warning message is output into the log
if the setting for the flag is effectively capped by the system-level
limit.
I manually verified that the warning is issued as expected on Linux and
macOS, correspondingly:
--rpc_acceptor_listen_backlog setting 512 is capped at 128 by
/proc/sys/net/core/somaxconn
--rpc_acceptor_listen_backlog setting 512 is capped at 256 by
kern.ipc.somaxconn
Change-Id: Ib6f5791acad6ea0787e23d4c71ab2a7ac4c8c1f2
Reviewed-on: http://gerrit.cloudera.org:8080/20797
Tested-by: Kudu Jenkins
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/rpc/acceptor_pool.cc | 38 +++++++++++++++-----
src/kudu/rpc/acceptor_pool.h | 10 +++++-
src/kudu/rpc/messenger.cc | 9 +++--
src/kudu/rpc/messenger.h | 11 ++++++
src/kudu/server/server_base.cc | 80 ++++++++++++++++++++++++++++++++++++++++++
5 files changed, 135 insertions(+), 13 deletions(-)
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index ec7c86bf7..c20807a93 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -42,6 +42,8 @@
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
+using std::string;
+using strings::Substitute;
METRIC_DEFINE_counter(server, rpc_connections_accepted,
"RPC Connections Accepted",
@@ -66,27 +68,45 @@ METRIC_DEFINE_histogram(server, acceptor_dispatch_times,
kudu::MetricLevel::kInfo,
1000000, 2);
-DEFINE_int32(rpc_acceptor_listen_backlog, 128,
+DEFINE_int32(rpc_acceptor_listen_backlog,
+ kudu::rpc::AcceptorPool::kDefaultListenBacklog,
"Socket backlog parameter used when listening for RPC
connections. "
"This defines the maximum length to which the queue of pending "
- "TCP connections inbound to the RPC server may grow. If a
connection "
- "request arrives when the queue is full, the client may receive "
- "an error. Higher values may help the server ride over bursts of "
- "new inbound connection requests.");
+ "TCP connections inbound to the RPC server may grow. The value "
+ "might be silently capped by the system-level limit on the
listened "
+ "socket's backlog. The value of -1 has the semantics of the "
+ "longest possible queue with the length up to the system-level "
+ "limit. If a connection request arrives when the queue is full, "
+ "the client may receive an error. Higher values may help "
+ "the server ride over bursts of new inbound connection
requests.");
TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
-using std::string;
-using strings::Substitute;
+namespace {
+bool ValidateListenBacklog(const char* flagname, int value) {
+ if (value >= -1) {
+ return true;
+ }
+ LOG(ERROR) << Substitute(
+ "$0: invalid setting for $1; regular setting must be at least 0, and -1 "
+ "is a special value with the semantics of maximum possible setting "
+ "capped at the system-wide limit", value, flagname);
+ return false;
+}
+} // anonymous namespace
+DEFINE_validator(rpc_acceptor_listen_backlog, &ValidateListenBacklog);
+
namespace kudu {
namespace rpc {
AcceptorPool::AcceptorPool(Messenger* messenger,
Socket* socket,
- const Sockaddr& bind_address)
+ const Sockaddr& bind_address,
+ int listen_backlog)
: messenger_(messenger),
socket_(socket->Release()),
bind_address_(bind_address),
+ listen_backlog_(listen_backlog),
closing_(false) {
const auto& metric_entity = messenger->metric_entity();
auto& connections_accepted = bind_address.is_ip()
@@ -101,7 +121,7 @@ AcceptorPool::~AcceptorPool() {
}
Status AcceptorPool::Start(int num_threads) {
- RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
+ RETURN_NOT_OK(socket_.Listen(listen_backlog_));
for (int i = 0; i < num_threads; i++) {
scoped_refptr<Thread> new_thread;
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 14b2740e1..50e15e201 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -41,10 +41,17 @@ class Messenger;
// shut down, if Shutdown() is called, or if the pool object is destructed.
class AcceptorPool {
public:
+ // Default size of the pending connections queue for a socket listened by
+ // AcceptorPool::Start().
+ static constexpr int kDefaultListenBacklog = 512;
+
// Create a new acceptor pool. Calls socket::Release to take ownership of
the
// socket.
// 'socket' must be already bound, but should not yet be listening.
- AcceptorPool(Messenger* messenger, Socket* socket, const Sockaddr&
bind_address);
+ AcceptorPool(Messenger* messenger,
+ Socket* socket,
+ const Sockaddr& bind_address,
+ int listen_backlog = kDefaultListenBacklog);
~AcceptorPool();
// Start listening and accepting connections.
@@ -69,6 +76,7 @@ class AcceptorPool {
Messenger* messenger_;
Socket socket_;
const Sockaddr bind_address_;
+ const int listen_backlog_;
std::vector<scoped_refptr<Thread>> threads_;
std::atomic<bool> closing_;
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 923d85f7f..6820490b3 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -69,6 +69,7 @@ const int64_t MessengerBuilder::kRpcNegotiationTimeoutMs =
3000;
MessengerBuilder::MessengerBuilder(string name)
: name_(std::move(name)),
connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
+ acceptor_listen_backlog_(AcceptorPool::kDefaultListenBacklog),
num_reactors_(4),
min_negotiation_threads_(0),
max_negotiation_threads_(4),
@@ -220,9 +221,10 @@ Status Messenger::AddAcceptorPool(const Sockaddr&
accept_addr,
RETURN_NOT_OK(sock.SetReusePort(true));
}
RETURN_NOT_OK(sock.Bind(accept_addr));
- Sockaddr remote;
- RETURN_NOT_OK(sock.GetSocketAddress(&remote));
- auto acceptor_pool(std::make_shared<AcceptorPool>(this, &sock, remote));
+ Sockaddr addr;
+ RETURN_NOT_OK(sock.GetSocketAddress(&addr));
+ auto acceptor_pool(std::make_shared<AcceptorPool>(
+ this, &sock, addr, acceptor_listen_backlog_));
std::lock_guard<percpu_rwlock> guard(lock_);
acceptor_pools_.push_back(acceptor_pool);
@@ -330,6 +332,7 @@ Messenger::Messenger(const MessengerBuilder &bld)
sasl_proto_name_(bld.sasl_proto_name_),
keytab_file_(bld.keytab_file_),
reuseport_(bld.reuseport_),
+ acceptor_listen_backlog_(bld.acceptor_listen_backlog_),
retain_self_(this) {
for (int i = 0; i < bld.num_reactors_; i++) {
reactors_.push_back(new Reactor(retain_self_, i, bld));
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index d8e37e99c..753681d46 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -97,6 +97,11 @@ class MessengerBuilder {
return *this;
}
+ MessengerBuilder& set_acceptor_listen_backlog(int max_queue_len) {
+ acceptor_listen_backlog_ = max_queue_len;
+ return *this;
+ }
+
// Set the number of reactor threads that will be used for sending and
// receiving.
MessengerBuilder& set_num_reactors(int num_reactors) {
@@ -270,6 +275,7 @@ class MessengerBuilder {
private:
const std::string name_;
MonoDelta connection_keepalive_time_;
+ int acceptor_listen_backlog_;
int num_reactors_;
int min_negotiation_threads_;
int max_negotiation_threads_;
@@ -562,6 +568,11 @@ class Messenger {
// Whether to set SO_REUSEPORT on the listening sockets.
const bool reuseport_;
+ // Acceptor's listened socket backlog: the capacity of the queue to
+ // accommodate incoming (but not accepted yet) connection requests to the
+ // messenger's listening sockets.
+ const int acceptor_listen_backlog_;
+
// The ownership of the Messenger object is somewhat subtle. The pointer
graph
// looks like this:
//
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 84d6a7b24..8b5f554b0 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -17,9 +17,15 @@
#include "kudu/server/server_base.h"
+#if defined(__APPLE__)
+#include <sys/sysctl.h>
+#endif
+
#include <algorithm>
+#include <cerrno> // IWYU pragma: keep
#include <cstdint>
#include <functional>
+#include <limits>
#include <mutex>
#include <optional>
#include <set>
@@ -72,6 +78,7 @@
#include "kudu/util/cloud/instance_detector.h"
#include "kudu/util/cloud/instance_metadata.h"
#include "kudu/util/env.h"
+#include "kudu/util/errno.h" // IWYU pragma: keep
#include "kudu/util/faststring.h"
#include "kudu/util/file_cache.h"
#include "kudu/util/flag_tags.h"
@@ -303,6 +310,7 @@ DECLARE_uint32(dns_resolver_cache_capacity_mb);
DECLARE_uint32(dns_resolver_cache_ttl_sec);
DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
DECLARE_int32(fs_wal_dir_available_space_cache_seconds);
+DECLARE_int32(rpc_acceptor_listen_backlog);
DECLARE_int64(fs_wal_dir_reserved_bytes);
DECLARE_int64(fs_data_dirs_reserved_bytes);
DECLARE_string(log_filename);
@@ -624,6 +632,68 @@ int64_t GetFileCacheCapacity(Env* env) {
return FLAGS_server_max_open_files;
}
+#if defined(__linux__)
+// See https://www.kernel.org/doc/Documentation/networking/ip-sysctl.txt
+// and https://man7.org/linux/man-pages/man2/listen.2.html for details.
+constexpr const char* const kListenBacklogMax = "/proc/sys/net/core/somaxconn";
+#endif
+
+#if defined(__APPLE__)
+// See https://man.freebsd.org/cgi/man.cgi?query=listen for details.
+// NOTE: it might be not exactly relevant to Darwin/macOS, but so far nothing
+// indicates it's not applicable at least for Darwin 20.6.0/macOS 11.7.
+constexpr const char* const kListenBacklogMax = "kern.ipc.somaxconn";
+#endif
+
+int32_t GetEffectiveListenSocketBacklog(Env* env, int backlog) {
+#if defined(__APPLE__)
+ uint32_t buf_val;
+ size_t len = sizeof(buf_val);
+ if (sysctlbyname(kListenBacklogMax, &buf_val, &len, nullptr, 0) == -1) {
+ int err = errno;
+ LOG(WARNING) << Substitute(
+ "could not retrieve $0 to get listened socket queue size limit: $1",
+ kListenBacklogMax, ErrnoToString(err));
+ return backlog;
+ }
+ DCHECK_EQ(sizeof(buf_val), len);
+#elif defined(__linux__)
+ faststring buf;
+ if (auto s = ReadFileToString(env, kListenBacklogMax, &buf); !s.ok()) {
+ LOG(WARNING) << Substitute(
+ "could not read $0 to get listened socket queue size limit: $1",
+ kListenBacklogMax, s.ToString());
+ return backlog;
+ }
+ uint32_t buf_val;
+ if (!safe_strtou32(buf.ToString(), &buf_val)) {
+ LOG(WARNING) << Substitute(
+ "could not parse contents of $0 ('$1') to get listened socket queue
size limit",
+ kListenBacklogMax, buf.ToString());
+ return backlog;
+ }
+#else
+ return backlog;
+#endif
+
+ // 'rpc_acceptor_listen_backlog == -1' means the highest possible value
+ // for the backlog size.
+ uint32_t effective_backlog =
+ (backlog >= 0) ? backlog : std::numeric_limits<uint32_t>::max();
+
+ // The system-wide limit caps the 'backlog' parameter of the listen()
+ // system call.
+ effective_backlog = std::min<uint32_t>(effective_backlog, buf_val);
+
+ // A bit of paranoia w.r.t. the system-level limit:
+ // * for a debug build, use DCHECK to make sure the value is sane
+ // * for a release build, cap it to INT32_MAX
+ DCHECK_LE(effective_backlog, std::numeric_limits<int32_t>::max());
+ effective_backlog = std::min<uint32_t>(effective_backlog,
+ std::numeric_limits<int32_t>::max());
+ return effective_backlog;
+}
+
} // anonymous namespace
ServerBase::ServerBase(string name, const ServerBaseOptions& options,
@@ -812,6 +882,15 @@ Status ServerBase::Init() {
vector<string> rpc_tls_excluded_protocols = strings::Split(
FLAGS_rpc_tls_excluded_protocols, ",", strings::SkipEmpty());
+ const auto listen_backlog = FLAGS_rpc_acceptor_listen_backlog;
+ const auto effective_listen_backlog = GetEffectiveListenSocketBacklog(
+ Env::Default(), listen_backlog);
+ if (effective_listen_backlog != listen_backlog) {
+ LOG(WARNING) << Substitute(
+ "--rpc_acceptor_listen_backlog setting $0 is capped at $1 by $2",
+ listen_backlog, effective_listen_backlog, kListenBacklogMax);
+ }
+
// Create the Messenger.
rpc::MessengerBuilder builder(name_);
builder.set_num_reactors(FLAGS_num_reactor_threads)
@@ -831,6 +910,7 @@ Status ServerBase::Init() {
.set_epki_private_password_key_cmd(FLAGS_rpc_private_key_password_cmd)
.set_keytab_file(FLAGS_keytab_file)
.set_hostname(hostname)
+ .set_acceptor_listen_backlog(listen_backlog)
.enable_inbound_tls();
auto username = kudu::security::GetLoggedInUsernameFromKeytab();