This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new 8c51e2768 KUDU-1457 [3/n] add IPv6 support for diagnostic socket 8c51e2768 is described below commit 8c51e27684532d1c88d7e812a8a94804a5446667 Author: Ashwani Raina <ara...@cloudera.com> AuthorDate: Fri Jun 13 20:35:20 2025 +0530 KUDU-1457 [3/n] add IPv6 support for diagnostic socket As part of adding IPv6 support, this patch adds support for IPv6 in diagnostic socket implementation and its corresponding unit tests. - Modify TcpSocketInfo structure fields to enable src_addr and dst_addr to store IPv6 addresses as well. - Add changes in SendRequest and ReceiveResponse to accommodate IPv6 addresses in request structure. - Add corresponding unit tests i.e. ListeningSocket and SimplePattern for IPv6 addresses. Depends-On: I22c773ffb2ff44b9cd765b546d6724ec5543586f Change-Id: I2d78e241a8bb794465a613e7c0a11eea8f628849 Reviewed-on: http://gerrit.cloudera.org:8080/23021 Reviewed-by: Marton Greber <greber...@gmail.com> Tested-by: Marton Greber <greber...@gmail.com> Reviewed-by: Alexey Serbin <ale...@apache.org> --- src/kudu/util/net/diagnostic_socket-test.cc | 225 +++++++++++++++++----------- src/kudu/util/net/diagnostic_socket.cc | 82 ++++++---- src/kudu/util/net/diagnostic_socket.h | 20 ++- 3 files changed, 204 insertions(+), 123 deletions(-) diff --git a/src/kudu/util/net/diagnostic_socket-test.cc b/src/kudu/util/net/diagnostic_socket-test.cc index 6e407a3e2..47772bc49 100644 --- a/src/kudu/util/net/diagnostic_socket-test.cc +++ b/src/kudu/util/net/diagnostic_socket-test.cc @@ -18,12 +18,15 @@ #include "kudu/util/net/diagnostic_socket.h" #include <netinet/in.h> +#include <sys/socket.h> +#include <sys/un.h> -#include <cstddef> #include <cstdint> +#include <cstring> #include <string> #include <vector> +#include <glog/logging.h> #include <gtest/gtest.h> #include "kudu/util/net/sockaddr.h" @@ -53,6 +56,119 @@ class DiagnosticSocketTest : public KuduTest { RETURN_NOT_OK(listener_.BindAndListen(address, listen_backlog)); return listener_.GetSocketAddress(&listen_addr_); } + + void GetListeningSocketInfo(const string& ip_addr, DiagnosticSocket::TcpSocketInfo* info) { + constexpr uint16_t kPort = 56789; + constexpr int kListenBacklog = 8; + + ASSERT_OK(BindAndListen(ip_addr, kPort, kListenBacklog)); + + DiagnosticSocket ds; + ASSERT_OK(ds.Init()); + ASSERT_OK(ds.Query(listener_, info)); + + // Make sure the result matches the input parameters. + ASSERT_EQ(kPort, ntohs(info->src_port)); + ASSERT_EQ(0, ntohs(info->dst_port)); + ASSERT_EQ(DiagnosticSocket::SS_LISTEN, info->state); + + // TX queue size for a listening socket is the size of the backlog queue. + ASSERT_EQ(kListenBacklog, info->tx_queue_size); + + // Nothing is connecting to the listen port: no pending connections expected. + ASSERT_EQ(0, info->rx_queue_size); + } + + void MatchSimplePattern(const string& ip_addr, sa_family_t family) { + // Open a socket, bind and listen, and then close it. This is just to make + // sure the socket has valid address, but there is no open socket at the + // specified address. + DCHECK(family == AF_INET || family == AF_INET6); + constexpr uint16_t kPort = 56789; + constexpr int kListenBacklog = 5; + ASSERT_OK(BindAndListen(ip_addr, kPort, kListenBacklog)); + + const auto& src_addr = listen_addr_; + const auto& dst_addr = Sockaddr::Wildcard(family); + const DiagnosticSocket::SocketStates socket_states{ DiagnosticSocket::SS_LISTEN }; + + DiagnosticSocket ds; + ASSERT_OK(ds.Init()); + + // Use a pattern to match only the listened server socket. + { + vector<DiagnosticSocket::TcpSocketInfo> info; + // The query should return success. + ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info)); + ASSERT_EQ(1, info.size()); + const auto& entry = info.front(); + + // Make sure the result matches the input parameters. + if (family == AF_INET) { + ASSERT_EQ(src_addr.ipv4_addr().sin_addr.s_addr, entry.src_addr[0]); + ASSERT_EQ(INADDR_ANY, entry.dst_addr[0]); + } else { + ASSERT_FALSE(memcmp(src_addr.ipv6_addr().sin6_addr.s6_addr, + entry.src_addr, sizeof(entry.src_addr))); + ASSERT_FALSE(memcmp(&in6addr_any, entry.dst_addr, sizeof(entry.dst_addr))); + } + ASSERT_EQ(kPort, ntohs(entry.src_port)); + ASSERT_EQ(0, ntohs(entry.dst_port)); + ASSERT_EQ(DiagnosticSocket::SS_LISTEN, entry.state); + + // Verify the expected statistics on the server socket. + ASSERT_EQ(0, entry.rx_queue_size); // no pending connections + ASSERT_EQ(kListenBacklog, entry.tx_queue_size); + } + + // Use a pattern to match any IPv4 or IPv6 TCP socket. + { + const auto& addr_wildcard = Sockaddr::Wildcard(family); + const auto& state_wildcard = DiagnosticSocket::SocketStateWildcard(); + vector<DiagnosticSocket::TcpSocketInfo> info; + // The query should return success. + ASSERT_OK(ds.Query(addr_wildcard, addr_wildcard, state_wildcard, &info)); + ASSERT_GE(info.size(), 1); + + const auto compare_addresses = [&](DiagnosticSocket::TcpSocketInfo entry) { + if (family == AF_INET) { + // IPv4 comparison + return (src_addr.ipv4_addr().sin_addr.s_addr == entry.src_addr[0] && + INADDR_ANY == entry.dst_addr[0]); + } + // IPv6 comparison + return (memcmp(src_addr.ipv6_addr().sin6_addr.s6_addr, + entry.src_addr, sizeof(entry.src_addr)) == 0 && + memcmp(&in6addr_any, entry.dst_addr, + sizeof(entry.dst_addr)) == 0); + }; + + // Make sure the server's socket is one of the reported ones. + size_t matched_entries = 0; + for (const auto& entry : info) { + if (!compare_addresses(entry) || + kPort != ntohs(entry.src_port) || + 0 != ntohs(entry.dst_port) || + DiagnosticSocket::SS_LISTEN != entry.state) { + continue; + } + ++matched_entries; + } + ASSERT_EQ(1, matched_entries); + } + + // Close the socket; the socket's address in listen_addr_ still valid. + ASSERT_OK(listener_.Close()); + + { + vector<DiagnosticSocket::TcpSocketInfo> info; + // The query should return success. + ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info)); + // However, the list of matching sockets should be empty since the socket + // that could match the pattern has been just closed. + ASSERT_TRUE(info.empty()); + } + } }; TEST_F(DiagnosticSocketTest, Basic) { @@ -63,103 +179,34 @@ TEST_F(DiagnosticSocketTest, Basic) { ASSERT_OK(ds.Close()); } -TEST_F(DiagnosticSocketTest, ListeningSocket) { - constexpr const char* const kIpAddr = "127.254.254.254"; - constexpr uint16_t kPort = 56789; - constexpr int kListenBacklog = 8; - - ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog)); - - DiagnosticSocket ds; - ASSERT_OK(ds.Init()); +TEST_F(DiagnosticSocketTest, ListeningSocketIpV4) { DiagnosticSocket::TcpSocketInfo info; - ASSERT_OK(ds.Query(listener_, &info)); - // Make sure the result matches the input parameters. - ASSERT_EQ(listen_addr_.ipv4_addr().sin_addr.s_addr, info.src_addr); - ASSERT_EQ(INADDR_ANY, info.dst_addr); - ASSERT_EQ(kPort, ntohs(info.src_port)); - ASSERT_EQ(0, ntohs(info.dst_port)); - ASSERT_EQ(DiagnosticSocket::SS_LISTEN, info.state); + NO_FATALS(GetListeningSocketInfo("127.254.254.254", &info)); - // TX queue size for a listening socket is the size of the backlog queue. - ASSERT_EQ(kListenBacklog, info.tx_queue_size); - - // Nothing is connecting to the listen port: no pending connections expected. - ASSERT_EQ(0, info.rx_queue_size); + // Make sure the result matches the input parameters. + ASSERT_EQ(listen_addr_.ipv4_addr().sin_addr.s_addr, info.src_addr[0]); + ASSERT_EQ(INADDR_ANY, info.dst_addr[0]); } -TEST_F(DiagnosticSocketTest, SimplePattern) { - // Open a socket, bind and listen, and then close it. This is just to make - // sure the socket has valid address, but there is no open socket at the - // specified address. - constexpr const char* const kIpAddr = "127.254.254.254"; - constexpr uint16_t kPort = 56789; - constexpr int kListenBacklog = 5; - ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog)); - - const auto& src_addr = listen_addr_; - const auto& dst_addr = Sockaddr::Wildcard(); - const DiagnosticSocket::SocketStates socket_states{ DiagnosticSocket::SS_LISTEN }; - - DiagnosticSocket ds; - ASSERT_OK(ds.Init()); - - // Use a pattern to match only the listened server socket. - { - vector<DiagnosticSocket::TcpSocketInfo> info; - // The query should return success. - ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info)); - ASSERT_EQ(1, info.size()); - const auto& entry = info.front(); +TEST_F(DiagnosticSocketTest, SimplePatternIpV4) { + MatchSimplePattern("127.254.254.254", AF_INET); +} - // Make sure the result matches the input parameters. - ASSERT_EQ(src_addr.ipv4_addr().sin_addr.s_addr, entry.src_addr); - ASSERT_EQ(INADDR_ANY, entry.dst_addr); - ASSERT_EQ(kPort, ntohs(entry.src_port)); - ASSERT_EQ(0, ntohs(entry.dst_port)); - ASSERT_EQ(DiagnosticSocket::SS_LISTEN, entry.state); - - // Verify the expected statistics on the server socket. - ASSERT_EQ(0, entry.rx_queue_size); // no pending connections - ASSERT_EQ(kListenBacklog, entry.tx_queue_size); - } +TEST_F(DiagnosticSocketTest, ListeningSocketIpV6) { + DiagnosticSocket::TcpSocketInfo info; - // Use a pattern to match any IPv4 TCP socket. - { - const auto& addr_wildcard = Sockaddr::Wildcard(); - const auto& state_wildcard = DiagnosticSocket::SocketStateWildcard(); - vector<DiagnosticSocket::TcpSocketInfo> info; - // The query should return success. - ASSERT_OK(ds.Query(addr_wildcard, addr_wildcard, state_wildcard, &info)); - ASSERT_GE(info.size(), 1); - - // Make sure the server's socket is one of the reported ones. - size_t matched_entries = 0; - for (const auto& entry : info) { - if (src_addr.ipv4_addr().sin_addr.s_addr != entry.src_addr || - INADDR_ANY != entry.dst_addr || - kPort != ntohs(entry.src_port) || - 0 != ntohs(entry.dst_port) || - DiagnosticSocket::SS_LISTEN != entry.state) { - continue; - } - ++matched_entries; - } - ASSERT_EQ(1, matched_entries); - } + NO_FATALS(GetListeningSocketInfo("::1", &info)); - // Close the socket; the socket's address in listen_addr_ still valid. - ASSERT_OK(listener_.Close()); + // Make sure the result matches the input parameters. + ASSERT_EQ(0, memcmp(listen_addr_.ipv6_addr().sin6_addr.s6_addr, + info.src_addr, sizeof(info.src_addr))); + ASSERT_EQ(0, memcmp(in6addr_any.s6_addr, info.dst_addr, + sizeof(info.dst_addr))); +} - { - vector<DiagnosticSocket::TcpSocketInfo> info; - // The query should return success. - ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info)); - // However, the list of matching sockets should be empty since the socket - // that could match the pattern has been just closed. - ASSERT_TRUE(info.empty()); - } +TEST_F(DiagnosticSocketTest, SimplePatternIpV6) { + MatchSimplePattern("::1", AF_INET6); } } // namespace kudu diff --git a/src/kudu/util/net/diagnostic_socket.cc b/src/kudu/util/net/diagnostic_socket.cc index 879860a1b..a9d215f7b 100644 --- a/src/kudu/util/net/diagnostic_socket.cc +++ b/src/kudu/util/net/diagnostic_socket.cc @@ -21,13 +21,13 @@ #include <linux/netlink.h> #include <linux/sock_diag.h> #include <linux/types.h> - #include <netinet/in.h> #include <sys/socket.h> #include <unistd.h> #include <array> #include <cerrno> +#include <cstring> #include <ostream> #include <string> #include <utility> @@ -47,8 +47,8 @@ using strings::Substitute; namespace kudu { -static constexpr const char* const kNonIpV4ErrMsg = - "netlink diagnostics is currently supported only on IPv4 TCP sockets"; +static constexpr const char* const kNonIpErrMsg = + "netlink diagnostics is currently supported only on IPv4 and IPv6 TCP sockets"; const DiagnosticSocket::SocketStates& DiagnosticSocket::SocketStateWildcard() { static constexpr const SocketStates kSocketStateWildcard { @@ -133,7 +133,7 @@ Status DiagnosticSocket::Query(const Socket& socket, vector<TcpSocketInfo> result; RETURN_NOT_OK(ReceiveResponse(&result)); if (result.empty()) { - return Status::NotFound("no matching IPv4 TCP socket found"); + return Status::NotFound("no matching IPv4 or IPv6 TCP socket found"); } if (PREDICT_FALSE(result.size() > 1)) { return Status::InvalidArgument("socket address is ambiguous"); @@ -149,20 +149,21 @@ Status DiagnosticSocket::SendRequest(const Socket& socket) const { Sockaddr src_addr; RETURN_NOT_OK(socket.GetSocketAddress(&src_addr)); - if (PREDICT_FALSE(src_addr.family() != AF_INET)) { - return Status::NotSupported(kNonIpV4ErrMsg); + if (PREDICT_FALSE(!src_addr.is_ip())) { + return Status::NotSupported(kNonIpErrMsg); } Sockaddr dst_addr; auto s = socket.GetPeerAddress(&dst_addr); if (s.ok()) { - if (PREDICT_FALSE(dst_addr.family() != AF_INET)) { - return Status::NotSupported(kNonIpV4ErrMsg); + DCHECK(src_addr.family() == dst_addr.family()); + if (PREDICT_FALSE(!dst_addr.is_ip())) { + return Status::NotSupported(kNonIpErrMsg); } } else { if (PREDICT_TRUE(s.IsNetworkError() && s.posix_code() == ENOTCONN)) { // Assuming it's a listened socket if there isn't a peer at the other side. - dst_addr = Sockaddr::Wildcard(); + dst_addr = Sockaddr::Wildcard(src_addr.family()); } else { return s; } @@ -176,28 +177,47 @@ Status DiagnosticSocket::SendRequest(const Socket& socket) const { Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr, const Sockaddr& socket_dst_addr, uint32_t socket_states_bitmask) const { - // TODO(araina): Remove this once diagnostic socket handling is added for IPv6. - if (socket_src_addr.family() == AF_INET6 || socket_dst_addr.family() == AF_INET6) { - return Status::NotSupported(kNonIpV4ErrMsg); - } - + // Communication is supported between same address family socket interfaces. + DCHECK((socket_src_addr.family() == AF_INET && socket_dst_addr.family() == AF_INET) || + (socket_src_addr.family() == AF_INET6 && socket_dst_addr.family() == AF_INET6)); DCHECK_GE(fd_, 0); - const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr; - const auto src_port = socket_src_addr.port(); - const in_addr& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr; - const auto dst_port = socket_dst_addr.port(); constexpr uint32_t kWildcard = static_cast<uint32_t>(-1); + struct inet_diag_sockid sock_id; + // All values in inet_diag_sockid are in network byte order. - const struct inet_diag_sockid sock_id = { - .idiag_sport = htons(src_port), - .idiag_dport = htons(dst_port), - .idiag_src = { src_ipv4.s_addr, 0, 0, 0, }, - .idiag_dst = { dst_ipv4.s_addr, 0, 0, 0, }, + sock_id = { + .idiag_sport = htons(socket_src_addr.port()), + .idiag_dport = htons(socket_dst_addr.port()), + .idiag_src = {0, 0, 0, 0}, + .idiag_dst = {0, 0, 0, 0}, .idiag_if = kWildcard, - .idiag_cookie = { kWildcard, kWildcard }, + .idiag_cookie = {kWildcard, kWildcard}, }; + if (socket_src_addr.family() == AF_INET && socket_dst_addr.family() == AF_INET) { + // All values in inet_diag_sockid and in_addr are in network byte order. + const auto& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr; + const auto& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr; + + // For IPv4, only the first element of idiag_src/idiag_dst array is used. + // It's already zero-initialized above, so we just set the first element. + sock_id.idiag_src[0] = src_ipv4.s_addr; + sock_id.idiag_dst[0] = dst_ipv4.s_addr; + } else if (socket_src_addr.family() == AF_INET6 && socket_dst_addr.family() == AF_INET6) { + // All values in inet_diag_sockid and in6_addr are in network byte order. + const auto& src_ipv6 = socket_src_addr.ipv6_addr().sin6_addr; + const auto& dst_ipv6 = socket_dst_addr.ipv6_addr().sin6_addr; + + // For IPv6, copy the entire 128-bit address (4 x 32-bit words) using memcpy. + // This is more robust than individual assignments for arrays. + memcpy(sock_id.idiag_src, src_ipv6.s6_addr32, sizeof(sock_id.idiag_src)); + memcpy(sock_id.idiag_dst, dst_ipv6.s6_addr32, sizeof(sock_id.idiag_dst)); + } else { + return Status::NotSupported( + "Source and destination sockets belong to different protocol families"); + } + struct TcpSocketRequest { struct nlmsghdr nlh; struct inet_diag_req_v2 idr; @@ -208,7 +228,7 @@ Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr, .nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH, }, .idr = { - .sdiag_family = AF_INET, + .sdiag_family = static_cast<__u8>(socket_src_addr.family()), .sdiag_protocol = IPPROTO_TCP, .idiag_ext = INET_DIAG_MEMINFO, .pad = 0, @@ -309,8 +329,9 @@ Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const { return Status::Corruption(Substitute( "$0: netlink response is too short", msg_size)); } - // Only IPv4 addresses are expected due to the query pattern. - if (PREDICT_FALSE(msg_data->idiag_family != AF_INET)) { + // Only IPv4 or IPv6 addresses are expected due to the query pattern. + if (PREDICT_FALSE(msg_data->idiag_family != AF_INET && + msg_data->idiag_family != AF_INET6)) { return Status::Corruption(Substitute( "$0: unexpected address family in netlink response", static_cast<uint32_t>(msg_data->idiag_family))); @@ -321,8 +342,11 @@ Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const { TcpSocketInfo info; info.state = static_cast<SocketState>(msg_data->idiag_state); - info.src_addr = msg_data->id.idiag_src[0]; // IPv4 address, network byte order - info.dst_addr = msg_data->id.idiag_dst[0]; // IPv4 address, network byte order + + // IPv4 or IPv6 address, network byte order + memcpy(info.src_addr, msg_data->id.idiag_src, sizeof(info.src_addr)); + memcpy(info.dst_addr, msg_data->id.idiag_dst, sizeof(info.dst_addr)); + info.src_port = msg_data->id.idiag_sport; info.dst_port = msg_data->id.idiag_dport; info.rx_queue_size = msg_data->idiag_rqueue; diff --git a/src/kudu/util/net/diagnostic_socket.h b/src/kudu/util/net/diagnostic_socket.h index c64b6951f..d3f4ab673 100644 --- a/src/kudu/util/net/diagnostic_socket.h +++ b/src/kudu/util/net/diagnostic_socket.h @@ -31,7 +31,7 @@ class Sockaddr; class Socket; // A wrapper around Linux-specific sock_diag() API [1] based on the -// netlink facility [2] to fetch information on IPv4 TCP sockets. +// netlink facility [2] to fetch information on IPv4 or IPv6 TCP sockets. // // [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html // [2] https://man7.org/linux/man-pages/man7/netlink.7.html @@ -59,7 +59,7 @@ class DiagnosticSocket final { typedef std::array<SocketState, SocketState::SS_MAX> SocketStates; - // Diagnostic information on a TCP IPv4 socket. That's a subset of the + // Diagnostic information on a TCP IPv4 or IPv6 socket. That's a subset of the // information available via the netlink data structures. // // TODO(aserbin): if using this API more broadly than fetching information on @@ -67,12 +67,22 @@ class DiagnosticSocket final { // the source and the destination with Sockaddr class fields. struct TcpSocketInfo { SocketState state; // current state of the socket - uint32_t src_addr; // IPv4 source address (network byte order) - uint32_t dst_addr; // IPv4 destination address (network byte order) + uint32_t src_addr[4]; // IPv4 or IPv6 source address (network byte order) + uint32_t dst_addr[4]; // IPv4 or IPv6 destination address (network byte order) uint16_t src_port; // source port number (network byte order) uint16_t dst_port; // destination port number (network byte order) uint32_t rx_queue_size; // RX queue size uint32_t tx_queue_size; // TX queue size + + TcpSocketInfo() : + state(SS_UNKNOWN), + src_addr{}, + dst_addr{}, + src_port(0), + dst_port(0), + rx_queue_size(0), + tx_queue_size(0) { + } }; // Return wildcard for all valid socket states. @@ -96,7 +106,7 @@ class DiagnosticSocket final { // Close the Socket, checking for errors. Status Close(); - // Get diagnostic information on IPv4 TCP sockets of the specified states + // Get diagnostic information on IPv4 or IPv6 TCP sockets of the specified states // having the specified source and the destination address. Wildcard addresses // are supported. Status Query(const Sockaddr& socket_src_addr,