This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8ae4fe1b0 KUDU-1457 [5/n] Enable IPv6-only mode for rpc
8ae4fe1b0 is described below
commit 8ae4fe1b04a0625aae59269f675cee424196603d
Author: Ashwani Raina <[email protected]>
AuthorDate: Sat Nov 8 20:07:43 2025 +0530
KUDU-1457 [5/n] Enable IPv6-only mode for rpc
As part of adding IPv6 support, a config flag was added to represent
the modes in which a Kudu cluster can be configured. Out of the three
possible modes, if user selects 'ipv6' mode, it should essentially
entertain tcp requests from only 'ipv6' clients and any incoming 'ipv4'
request should fail with 'address family not supported' error.
- This patch takes care of that by setting a socket option (IPV6_V6ONLY)
at protocol level.
- Add a few unit tests to test the added code for different modes and
clients from different families.
- Miscellaneous changes to improve readability.
Change-Id: Ic84ba301d0faa36e928eec54872d02f28a4f793f
Reviewed-on: http://gerrit.cloudera.org:8080/23651
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Alexey Serbin <[email protected]>
---
src/kudu/rpc/messenger.cc | 11 +++++
src/kudu/rpc/rpc-test.cc | 100 ++++++++++++++++++++++++++++++++++++++---
src/kudu/server/server_base.cc | 23 +++++++---
src/kudu/util/net/sockaddr.cc | 19 ++++++++
src/kudu/util/net/sockaddr.h | 5 ++-
src/kudu/util/net/socket.cc | 11 ++++-
src/kudu/util/net/socket.h | 3 ++
7 files changed, 159 insertions(+), 13 deletions(-)
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 87c2bc1be..a4020da58 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -17,6 +17,8 @@
#include "kudu/rpc/messenger.h"
+#include <sys/socket.h>
+
#include <cstdlib>
#include <functional>
#include <ostream>
@@ -50,6 +52,7 @@
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/openssl_util.h"
#include "kudu/util/status.h"
@@ -260,6 +263,14 @@ Status Messenger::AddAcceptorPool(const Sockaddr&
accept_addr,
Socket sock;
RETURN_NOT_OK(sock.Init(accept_addr.family(), 0));
RETURN_NOT_OK(sock.SetReuseAddr(true));
+ if (GetIPFamily() == AF_INET6) {
+ // IPV6_V6ONLY socket option is not applicable to Unix domain sockets.
+ if (PREDICT_FALSE(accept_addr.is_unix())) {
+ return Status::ConfigurationError(
+ "IPV6_V6ONLY socket option is not applicable to Unix domain
sockets.");
+ }
+ RETURN_NOT_OK(sock.SetIPv6Only(true));
+ }
if (reuseport_) {
// SO_REUSEPORT socket option is not applicable to Unix domain sockets.
if (PREDICT_FALSE(accept_addr.is_unix())) {
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 35ba6c163..a4bdf1723 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -31,7 +31,6 @@
#include <set>
#include <string>
#include <thread>
-#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <utility>
@@ -67,6 +66,7 @@
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/diagnostic_socket.h"
+#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/net/socket_info.pb.h"
@@ -100,9 +100,9 @@ DECLARE_int32(rpc_negotiation_inject_delay_ms);
DECLARE_int32(tcp_keepalive_probe_period_s);
DECLARE_int32(tcp_keepalive_retry_period_s);
DECLARE_int32(tcp_keepalive_retry_count);
+DECLARE_string(ip_config_mode);
using std::map;
-using std::tuple;
using std::shared_ptr;
using std::string;
using std::thread;
@@ -159,9 +159,6 @@ string ModeEnumToString(enum RpcSocketMode mode) {
class TestRpc: public RpcTestBase,
public ::testing::WithParamInterface<RpcSocketMode> {
protected:
- TestRpc() {
- }
-
static bool enable_ssl() {
switch (GetParam()) {
case TCP_IPv4_SSL:
@@ -2236,6 +2233,99 @@ TEST_P(TestRpcSocketTxRxQueue,
CustomAcceptorRxQueueSamplingFrequency) {
ASSERT_EQ(2, rx_queue_size->TotalCount());
});
}
+
+class TestRpcWithIpModes: public RpcTestBase,
+ public ::testing::WithParamInterface<string> {
+protected:
+ void SetUp() override {
+ RpcTestBase::SetUp();
+ FLAGS_ip_config_mode = GetParam();
+ ASSERT_OK(ParseIPModeFlag(FLAGS_ip_config_mode, &mode_));
+ }
+ Sockaddr bind_ip_addr() const {
+ switch (mode_) {
+ case IPMode::IPV6:
+ case IPMode::DUAL:
+ return Sockaddr::Wildcard(AF_INET6);
+ default:
+ return Sockaddr::Wildcard(AF_INET);
+ }
+ }
+
+ IPMode mode_;
+};
+
+// This is used to run all parameterized tests with every
+// possible ip_config_mode options.
+INSTANTIATE_TEST_SUITE_P(Parameters, TestRpcWithIpModes,
+ testing::Values("ipv4", "ipv6", "dual"));
+
+TEST_P(TestRpcWithIpModes, TestRpcWithDifferentIpConfigModes) {
+ // Set up server with wildcard address.
+ Sockaddr server_addr = bind_ip_addr();
+ // Request OS to choose port.
+ server_addr.set_port(0);
+
+ MessengerBuilder mb("TestRpc.TestRpcWithDifferentIpConfigModes");
+ mb.set_metric_entity(metric_entity_);
+
+ shared_ptr<Messenger> messenger;
+ ASSERT_OK(mb.Build(&messenger));
+
+ // Start server on IP address based on ip_config_mode flag.
+ ASSERT_OK(StartTestServerWithCustomMessenger(&server_addr, messenger));
+
+ // IPv4 socket client tests.
+ {
+ Socket s4;
+ ASSERT_OK(s4.Init(AF_INET, 0));
+
+ // Target address is required mainly for dual mode. This is required to
rule out
+ // any possibility of 'connect' call failing due to invalid target address.
+ Sockaddr target_addr = server_addr;
+
+ // Determine the specific target address based on the server's mode.
+ // For DUAL mode, we explicitly target the IPv4 loopback (127.0.0.1) for
clarity.
+ if (mode_ == IPMode::DUAL) {
+ target_addr = Sockaddr::Loopback(AF_INET);
+ target_addr.set_port(server_addr.port());
+ }
+
+ Status s = s4.Connect(target_addr);
+
+ if (mode_ == IPMode::IPV6) {
+ // IPv4 socket's connect call to 'IPv6 only' server should fail.
+ ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Address family not supported by
protocol");
+ } else {
+ ASSERT_OK(s);
+ }
+ }
+
+ // IPv6 socket client tests.
+ {
+ Socket s6;
+ ASSERT_OK(s6.Init(AF_INET6, 0));
+
+ // Determine the specific target address based on the server's mode.
+ // For DUAL mode, we explicitly target the IPv6 loopback (::1) for clarity.
+ Sockaddr target_addr = server_addr;
+ if (mode_ == IPMode::DUAL) {
+ target_addr = Sockaddr::Loopback(AF_INET6);
+ target_addr.set_port(server_addr.port());
+ }
+
+ Status s = s6.Connect(target_addr);
+ if (mode_ == IPMode::IPV4) {
+ // IPv6 socket's connect call to 'IPv4 only' server should fail.
+ ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Invalid argument");
+ } else {
+ ASSERT_OK(s);
+ }
+ }
+}
+
#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
} // namespace rpc
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 7798742a4..dc38a030e 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -17,6 +17,8 @@
#include "kudu/server/server_base.h"
+#include <sys/socket.h>
+
#if defined(__APPLE__)
#include <sys/sysctl.h>
#endif
@@ -514,16 +516,27 @@ bool ValidateEitherJWKSFilePathOrUrlSet() {
GROUP_FLAG_VALIDATOR(jwks_file_or_url_set_validator,
&ValidateEitherJWKSFilePathOrUrlSet);
bool ValidateSocketOptionCompatibility() {
- if (FLAGS_rpc_listen_on_unix_domain_socket && FLAGS_rpc_reuseport) {
- LOG(ERROR) << Substitute(
- "Either --rpc_listen_on_unix_domain_socket or --rpc_reuseport should "
- "be set, reuse of port is not applicable to Unix domain sockets.");
+ // If we aren't using Unix domain sockets, all other options are compatible.
+ if (!FLAGS_rpc_listen_on_unix_domain_socket) {
+ return true;
+ }
+
+ // At this point, we know we are using Unix domain sockets.
+ // Check for incompatible flags.
+ if (FLAGS_rpc_reuseport) {
+ LOG(ERROR) << "Either --rpc_listen_on_unix_domain_socket or
--rpc_reuseport "
+ "should be set, reuse of port is not applicable to Unix domain
sockets.";
+ return false;
+ }
+
+ if (GetIPFamily() == AF_INET6) {
+ LOG(ERROR) << "Either --rpc_listen_on_unix_domain_socket or
--ip_config_mode should be set "
+ "to ipv6, restriction to only IPv6 traffic is not applicable to Unix
domain sockets.";
return false;
}
return true;
}
-
GROUP_FLAG_VALIDATOR(socket_options_compatibility,
ValidateSocketOptionCompatibility);
} // namespace
diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc
index fa1c2f895..6f19f5e58 100644
--- a/src/kudu/util/net/sockaddr.cc
+++ b/src/kudu/util/net/sockaddr.cc
@@ -81,6 +81,25 @@ Sockaddr Sockaddr::Wildcard(sa_family_t family) {
return Sockaddr(addr);
}
+Sockaddr Sockaddr::Loopback(sa_family_t family) {
+ // IPv4.
+ if (family == AF_INET) {
+ struct sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
+ return Sockaddr(addr);
+ }
+
+ // IPv6.
+ DCHECK(family == AF_INET6);
+ struct sockaddr_in6 addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin6_family = AF_INET6;
+ inet_pton(AF_INET6, "::1", &addr.sin6_addr);
+ return Sockaddr(addr);
+}
+
Sockaddr& Sockaddr::operator=(const Sockaddr& other) noexcept {
if (&other == this) {
return *this;
diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h
index 911a33ca5..82a1eac6c 100644
--- a/src/kudu/util/net/sockaddr.h
+++ b/src/kudu/util/net/sockaddr.h
@@ -53,9 +53,12 @@ class Sockaddr {
// Construct from a generic socket address.
explicit Sockaddr(const struct sockaddr& addr, socklen_t len);
- // Return an IP wildcard address.
+ // Return an IP wildcard address for a given family.
static Sockaddr Wildcard(sa_family_t family = AF_INET);
+ // Return loopback IP address for a given family.
+ static Sockaddr Loopback(sa_family_t family);
+
// Assignment operators.
Sockaddr& operator=(const Sockaddr& other) noexcept;
Sockaddr& operator=(const struct sockaddr_in& addr);
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index 78cda06eb..488f0ee3d 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -53,8 +53,8 @@
DEFINE_string(local_ip_for_outbound_sockets, "",
"IP to bind to when making outgoing socket connections. "
- "This must be an IP address of the form A.B.C.D, not a hostname.
"
- "Advanced parameter, subject to change.");
+ "This must be an IP address of the form A.B.C.D or
A:B:C:D:E:F:G:H, "
+ "not a hostname. Advanced parameter, subject to change.");
TAG_FLAG(local_ip_for_outbound_sockets, experimental);
DEFINE_bool(socket_inject_short_recvs, false,
@@ -448,6 +448,13 @@ Status Socket::SetReusePort(bool flag) {
return Status::OK();
}
+Status Socket::SetIPv6Only(bool flag) {
+ int int_flag = flag ? 1 : 0;
+ RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, int_flag),
+ "failed to set IPV6_V6ONLY");
+ return Status::OK();
+}
+
Status Socket::BindAndListen(const Sockaddr& sockaddr,
int listen_queue_size) {
RETURN_NOT_OK(SetReuseAddr(true));
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 1ab3c74ba..779c1c64e 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -98,6 +98,9 @@ class Socket {
// Sets SO_REUSEPORT to 'flag'. Should be used prior to Bind().
Status SetReusePort(bool flag);
+ // Sets IPV6_V6ONLY to 'flag'. Should be used prior to Bind().
+ Status SetIPv6Only(bool flag);
+
// Convenience method to invoke the common sequence:
// 1) SetReuseAddr(true)
// 2) Bind()