This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new f15a1ca54 [net] DiagnosticSocket wrapper for sock_diag API
f15a1ca54 is described below
commit f15a1ca5476b256671c2369dd4d9faeccfee6b7c
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jan 12 12:26:25 2024 -0800
[net] DiagnosticSocket wrapper for sock_diag API
This patch introduces DiagnosticSocket wrapper for sock_diag() netlink
subsystem [1][2]. The primary intention behind DiagnosticSocket is to
fetch information on the RX queue size for a listening IPv4 TCP socket.
A follow-up patch will use this new facility to populate a new
server-level metric: that's to track connection request backlog for a
Kudu server's RPC socket (a.k.a. listened socket backlog, pending
connections queue, etc.).
Since netlink is a Linux-specific API/subsystem, the DiagnosticSocket
API is available only on Linux, correspondingly.
This patch includes a few unit tests to provide basic coverage
for the newly introduced functionality.
[1] https://man7.org/linux/man-pages/man7/sock_diag.7.html
[2] https://man7.org/linux/man-pages/man7/netlink.7.html
Change-Id: I4a4f37ca4b0df8ca543ec383d89766cbf1b1e526
Reviewed-on: http://gerrit.cloudera.org:8080/20892
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Yingchun Lai <[email protected]>
---
src/kudu/util/CMakeLists.txt | 5 +
src/kudu/util/net/diagnostic_socket-test.cc | 165 ++++++++++++++
src/kudu/util/net/diagnostic_socket.cc | 328 ++++++++++++++++++++++++++++
src/kudu/util/net/diagnostic_socket.h | 121 ++++++++++
4 files changed, 619 insertions(+)
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index f4d8e5a09..5a4940131 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -267,6 +267,10 @@ set(UTIL_SRCS
zlib.cc
)
+if(NOT APPLE)
+ set(UTIL_SRCS ${UTIL_SRCS} net/diagnostic_socket.cc)
+endif()
+
if(NOT NO_TESTS)
set(UTIL_SRCS ${UTIL_SRCS} test_graph.cc)
endif()
@@ -592,6 +596,7 @@ ADD_KUDU_TEST(yamlreader-test)
if (NOT APPLE)
ADD_KUDU_TEST(minidump-test)
+ ADD_KUDU_TEST(net/diagnostic_socket-test)
endif()
#######################################
diff --git a/src/kudu/util/net/diagnostic_socket-test.cc
b/src/kudu/util/net/diagnostic_socket-test.cc
new file mode 100644
index 000000000..9f192a45f
--- /dev/null
+++ b/src/kudu/util/net/diagnostic_socket-test.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/diagnostic_socket.h"
+
+#include <netinet/in.h>
+
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class DiagnosticSocketTest : public KuduTest {
+ protected:
+ Socket listener_;
+ Sockaddr listen_addr_;
+
+ Status BindAndListen(const string& addr_str, uint16_t port, int
listen_backlog = 1) {
+ Sockaddr address;
+ RETURN_NOT_OK(address.ParseString(addr_str, port));
+ return BindAndListen(address, listen_backlog);
+ }
+
+ Status BindAndListen(const Sockaddr& address, int listen_backlog) {
+ RETURN_NOT_OK(listener_.Init(address.family(), 0));
+ RETURN_NOT_OK(listener_.BindAndListen(address, listen_backlog));
+ return listener_.GetSocketAddress(&listen_addr_);
+ }
+};
+
+TEST_F(DiagnosticSocketTest, Basic) {
+ DiagnosticSocket ds;
+ // Make sure it's possible to create a netlink socket.
+ ASSERT_OK(ds.Init());
+ // Call Close() on the socket explicitly.
+ ASSERT_OK(ds.Close());
+}
+
+TEST_F(DiagnosticSocketTest, ListeningSocket) {
+ constexpr const char* const kIpAddr = "127.254.254.254";
+ constexpr uint16_t kPort = 56789;
+ constexpr int kListenBacklog = 8;
+
+ ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog));
+
+ DiagnosticSocket ds;
+ ASSERT_OK(ds.Init());
+ DiagnosticSocket::TcpSocketInfo info;
+ ASSERT_OK(ds.Query(listener_, &info));
+
+ // Make sure the result matches the input parameters.
+ ASSERT_EQ(listen_addr_.ipv4_addr().sin_addr.s_addr, info.src_addr);
+ ASSERT_EQ(INADDR_ANY, info.dst_addr);
+ ASSERT_EQ(kPort, ntohs(info.src_port));
+ ASSERT_EQ(0, ntohs(info.dst_port));
+ ASSERT_EQ(DiagnosticSocket::SS_LISTEN, info.state);
+
+ // TX queue size for a listening socket is the size of the backlog queue.
+ ASSERT_EQ(kListenBacklog, info.tx_queue_size);
+
+ // Nothing is connecting to the listen port: no pending connections expected.
+ ASSERT_EQ(0, info.rx_queue_size);
+}
+
+TEST_F(DiagnosticSocketTest, SimplePattern) {
+ // Open a socket, bind and listen, and then close it. This is just to make
+ // sure the socket has valid address, but there is no open socket at the
+ // specified address.
+ constexpr const char* const kIpAddr = "127.254.254.254";
+ constexpr uint16_t kPort = 56789;
+ constexpr int kListenBacklog = 5;
+ ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog));
+
+ const auto& src_addr = listen_addr_;
+ const auto& dst_addr = Sockaddr::Wildcard();
+ const vector<DiagnosticSocket::SocketState> socket_states{
DiagnosticSocket::SS_LISTEN };
+
+ DiagnosticSocket ds;
+ ASSERT_OK(ds.Init());
+
+ // Use a pattern to match only the listened server socket.
+ {
+ vector<DiagnosticSocket::TcpSocketInfo> info;
+ // The query should return success.
+ ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info));
+ ASSERT_EQ(1, info.size());
+ const auto& entry = info.front();
+
+ // Make sure the result matches the input parameters.
+ ASSERT_EQ(src_addr.ipv4_addr().sin_addr.s_addr, entry.src_addr);
+ ASSERT_EQ(INADDR_ANY, entry.dst_addr);
+ ASSERT_EQ(kPort, ntohs(entry.src_port));
+ ASSERT_EQ(0, ntohs(entry.dst_port));
+ ASSERT_EQ(DiagnosticSocket::SS_LISTEN, entry.state);
+
+ // Verify the expected statistics on the server socket.
+ ASSERT_EQ(0, entry.rx_queue_size); // no pending connections
+ ASSERT_EQ(kListenBacklog, entry.tx_queue_size);
+ }
+
+ // Use a pattern to match any IPv4 TCP socket.
+ {
+ const auto& addr_wildcard = Sockaddr::Wildcard();
+ const auto& state_wildcard = DiagnosticSocket::SocketStateWildcard();
+ vector<DiagnosticSocket::TcpSocketInfo> info;
+ // The query should return success.
+ ASSERT_OK(ds.Query(addr_wildcard, addr_wildcard, state_wildcard, &info));
+ ASSERT_GE(info.size(), 1);
+
+ // Make sure the server's socket is one of the reported ones.
+ size_t matched_entries = 0;
+ for (const auto& entry : info) {
+ if (src_addr.ipv4_addr().sin_addr.s_addr != entry.src_addr ||
+ INADDR_ANY != entry.dst_addr ||
+ kPort != ntohs(entry.src_port) ||
+ 0 != ntohs(entry.dst_port) ||
+ DiagnosticSocket::SS_LISTEN != entry.state) {
+ continue;
+ }
+ ++matched_entries;
+ }
+ ASSERT_EQ(1, matched_entries);
+ }
+
+ // Close the socket; the socket's address in listen_addr_ still valid.
+ ASSERT_OK(listener_.Close());
+
+ {
+ vector<DiagnosticSocket::TcpSocketInfo> info;
+ // The query should return success.
+ ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info));
+ // However, the list of matching sockets should be empty since the socket
+ // that could match the pattern has been just closed.
+ ASSERT_TRUE(info.empty());
+ }
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/net/diagnostic_socket.cc
b/src/kudu/util/net/diagnostic_socket.cc
new file mode 100644
index 000000000..6a2f6a7a6
--- /dev/null
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/diagnostic_socket.h"
+
+#include <linux/inet_diag.h>
+#include <linux/netlink.h>
+#include <linux/sock_diag.h>
+#include <linux/types.h>
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+const vector<DiagnosticSocket::SocketState>&
DiagnosticSocket::SocketStateWildcard() {
+ static const vector<DiagnosticSocket::SocketState> kSocketStateWildcard {
+ SS_ESTABLISHED,
+ SS_SYN_SENT,
+ SS_SYN_RECV,
+ SS_FIN_WAIT1,
+ SS_FIN_WAIT2,
+ SS_TIME_WAIT,
+ SS_CLOSE,
+ SS_CLOSE_WAIT,
+ SS_LAST_ACK,
+ SS_LISTEN,
+ SS_CLOSING,
+ };
+ return kSocketStateWildcard;
+}
+
+DiagnosticSocket::DiagnosticSocket()
+ : fd_(-1) {
+}
+
+DiagnosticSocket::~DiagnosticSocket() {
+ WARN_NOT_OK(Close(), "errors on closing diagnostic socket");
+}
+
+Status DiagnosticSocket::Init() {
+ auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
+ if (fd < 0) {
+ int err = errno;
+ return Status::RuntimeError("unable to open diagnostic socket",
+ ErrnoToString(err), err);
+ }
+ fd_ = fd;
+
+ return Status::OK();
+}
+
+Status DiagnosticSocket::Close() {
+ if (fd_ < 0) {
+ return Status::OK();
+ }
+ int ret;
+ RETRY_ON_EINTR(ret, ::close(fd_));
+ if (ret < 0) {
+ int err = errno;
+ return Status::IOError("close error", ErrnoToString(err), err);
+ }
+ fd_ = -1;
+ return Status::OK();
+}
+
+Status DiagnosticSocket::Query(const Sockaddr& socket_src_addr,
+ const Sockaddr& socket_dst_addr,
+ const vector<SocketState>& socket_states,
+ vector<TcpSocketInfo>* info) {
+ DCHECK_GE(fd_, 0) << "requires calling Init() first";
+ DCHECK(info);
+
+ uint32_t socket_states_bitmask = 0;
+ for (auto state : socket_states) {
+ socket_states_bitmask |= (1U << state);
+ }
+
+ RETURN_NOT_OK(SendRequest(
+ socket_src_addr, socket_dst_addr, socket_states_bitmask));
+ vector<TcpSocketInfo> result;
+ RETURN_NOT_OK(ReceiveResponse(&result));
+ *info = std::move(result);
+ return Status::OK();
+}
+
+Status DiagnosticSocket::Query(const Socket& socket, TcpSocketInfo* info) {
+ DCHECK_GE(fd_, 0) << "requires calling Init() first";
+ DCHECK(info);
+
+ RETURN_NOT_OK(SendRequest(socket));
+ vector<TcpSocketInfo> result;
+ RETURN_NOT_OK(ReceiveResponse(&result));
+ if (result.empty()) {
+ return Status::NotFound("no matching IPv4 TCP socket found");
+ }
+ if (PREDICT_FALSE(result.size() > 1)) {
+ return Status::InvalidArgument("socket address is ambiguous");
+ }
+
+ *info = result.front();
+ return Status::OK();
+}
+
+// Send query about the specified socket.
+Status DiagnosticSocket::SendRequest(const Socket& socket) const {
+ DCHECK_GE(fd_, 0);
+
+ static constexpr const char* const kNonIpErrMsg =
+ "netlink diagnostics is currently supported only on IPv4 TCP sockets";
+
+ Sockaddr src_addr;
+ RETURN_NOT_OK(socket.GetSocketAddress(&src_addr));
+ if (PREDICT_FALSE(!src_addr.is_ip())) {
+ return Status::NotSupported(kNonIpErrMsg);
+ }
+
+ Sockaddr dst_addr;
+ auto s = socket.GetPeerAddress(&dst_addr);
+ if (s.ok()) {
+ if (PREDICT_FALSE(!dst_addr.is_ip())) {
+ return Status::NotSupported(kNonIpErrMsg);
+ }
+ } else {
+ if (PREDICT_TRUE(s.IsNetworkError() && s.posix_code() == ENOTCONN)) {
+ // Assuming it's a listened socket if there isn't a peer at the other
side.
+ dst_addr = Sockaddr::Wildcard();
+ } else {
+ return s;
+ }
+ }
+
+ const uint32_t socket_state_bitmask =
+ dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED);
+ return SendRequest(src_addr, dst_addr, socket_state_bitmask);
+}
+
+Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr,
+ const Sockaddr& socket_dst_addr,
+ uint32_t socket_states_bitmask) const {
+ DCHECK_GE(fd_, 0);
+
+ const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr;
+ const auto src_port = socket_src_addr.port();
+ const in_addr& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr;
+ const auto dst_port = socket_dst_addr.port();
+
+ constexpr uint32_t kWildcard = static_cast<uint32_t>(-1);
+ // All values in inet_diag_sockid are in network byte order.
+ const struct inet_diag_sockid sock_id = {
+ .idiag_sport = htons(src_port),
+ .idiag_dport = htons(dst_port),
+ .idiag_src = { src_ipv4.s_addr, 0, 0, 0, },
+ .idiag_dst = { dst_ipv4.s_addr, 0, 0, 0, },
+ .idiag_if = kWildcard,
+ .idiag_cookie = { kWildcard, kWildcard },
+ };
+
+ struct TcpSocketRequest {
+ struct nlmsghdr nlh;
+ struct inet_diag_req_v2 idr;
+ } req = {
+ .nlh = {
+ .nlmsg_len = sizeof(req),
+ .nlmsg_type = SOCK_DIAG_BY_FAMILY,
+ .nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH,
+ },
+ .idr = {
+ .sdiag_family = AF_INET,
+ .sdiag_protocol = IPPROTO_TCP,
+ .idiag_ext = INET_DIAG_MEMINFO,
+ .pad = 0,
+ .idiag_states = socket_states_bitmask,
+ .id = sock_id,
+ }
+ };
+
+ struct iovec iov = {
+ .iov_base = &req,
+ .iov_len = sizeof(req),
+ };
+ struct sockaddr_nl nladdr = {
+ .nl_family = AF_NETLINK
+ };
+ struct msghdr msg = {
+ .msg_name = &nladdr,
+ .msg_namelen = sizeof(nladdr),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+
+ int rc = -1;
+ RETRY_ON_EINTR(rc, ::sendmsg(fd_, &msg, 0));
+ if (rc < 0) {
+ int err = errno;
+ return Status::NetworkError("semdmsg() failed", ErrnoToString(err), err);
+ }
+ return Status::OK();
+}
+
+Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
+ DCHECK_GE(fd_, 0);
+
+ uint8_t buf[8192];
+ struct iovec iov = {
+ .iov_base = buf,
+ .iov_len = sizeof(buf)
+ };
+
+ while (true) {
+ struct sockaddr_nl nladdr = {};
+ struct msghdr msg = {
+ .msg_name = &nladdr,
+ .msg_namelen = sizeof(nladdr),
+ .msg_iov = &iov,
+ .msg_iovlen = 1
+ };
+
+ ssize_t ret = -1;
+ RETRY_ON_EINTR(ret, ::recvmsg(fd_, &msg, 0));
+ if (PREDICT_FALSE(ret < 0)) {
+ int err = errno;
+ return Status::IOError("recvmsg()", ErrnoToString(err), err);
+ }
+ if (ret == 0) {
+ // End of stream.
+ return Status::OK();
+ }
+
+ const struct nlmsghdr* h = reinterpret_cast<const struct nlmsghdr*>(buf);
+ if (PREDICT_FALSE(!NLMSG_OK(h, ret))) {
+ return Status::Corruption(
+ Substitute("unexpected netlink response size $0", ret));
+ }
+
+ if (PREDICT_FALSE(nladdr.nl_family != AF_NETLINK)) {
+ return Status::Corruption(Substitute(
+ "$0: unexpected address family",
static_cast<uint32_t>(nladdr.nl_family)));
+ }
+
+ for (; NLMSG_OK(h, ret); h = NLMSG_NEXT(h, ret)) {
+ if (h->nlmsg_type == NLMSG_DONE) {
+ return Status::OK();
+ }
+ if (PREDICT_FALSE(h->nlmsg_type == NLMSG_ERROR)) {
+ // Below, the NLMSG_DATA(h) macro is expanded and C-style casts
replaced
+ // with reinterpret_cast<>.
+ const struct nlmsgerr* errdata = reinterpret_cast<const struct
nlmsgerr*>(
+ reinterpret_cast<const uint8_t*>(h) + NLMSG_LENGTH(0));
+ if (PREDICT_FALSE(h->nlmsg_len < NLMSG_LENGTH(sizeof(*errdata)))) {
+ return Status::Corruption("NLMSG error message is too short");
+ }
+ const int err = -errdata->error;
+ return Status::RuntimeError("netlink error", ErrnoToString(err), err);
+ }
+
+ if (PREDICT_FALSE(h->nlmsg_type != SOCK_DIAG_BY_FAMILY)) {
+ return Status::Corruption(Substitute("$0: unexpected netlink message
type",
+
static_cast<uint32_t>(h->nlmsg_type)));
+ }
+
+ // Below, the NLMSG_DATA(h) macro is expanded and C-style casts replaced
+ // with reinterpret_cast<>.
+ const struct inet_diag_msg* msg_data = reinterpret_cast<const struct
inet_diag_msg*>(
+ reinterpret_cast<const uint8_t*>(h) + NLMSG_LENGTH(0));
+ const uint32_t msg_size = h->nlmsg_len;
+ if (PREDICT_FALSE(msg_size < NLMSG_LENGTH(sizeof(*msg_data)))) {
+ return Status::Corruption(Substitute(
+ "$0: netlink response is too short", msg_size));
+ }
+ // Only IPv4 addresses are expected due to the query pattern.
+ if (PREDICT_FALSE(msg_data->idiag_family != AF_INET)) {
+ return Status::Corruption(Substitute(
+ "$0: unexpected address family in netlink response",
+ static_cast<uint32_t>(msg_data->idiag_family)));
+ }
+
+ DCHECK_LE(SocketState::SS_ESTABLISHED, msg_data->idiag_state);
+ DCHECK_GE(SocketState::SS_CLOSING, msg_data->idiag_state);
+
+ TcpSocketInfo info;
+ info.state = static_cast<SocketState>(msg_data->idiag_state);
+ info.src_addr = msg_data->id.idiag_src[0]; // IPv4 address, network
byte order
+ info.dst_addr = msg_data->id.idiag_dst[0]; // IPv4 address, network
byte order
+ info.src_port = msg_data->id.idiag_sport;
+ info.dst_port = msg_data->id.idiag_dport;
+ info.rx_queue_size = msg_data->idiag_rqueue;
+ info.tx_queue_size = msg_data->idiag_wqueue;
+ result->emplace_back(info);
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/net/diagnostic_socket.h
b/src/kudu/util/net/diagnostic_socket.h
new file mode 100644
index 000000000..bee3ba1b1
--- /dev/null
+++ b/src/kudu/util/net/diagnostic_socket.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Sockaddr;
+class Socket;
+
+// A wrapper around Linux-specific sock_diag() API [1] based on the
+// netlink facility [2] to fetch information on IPv4 TCP sockets.
+//
+// [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html
+// [2] https://man7.org/linux/man-pages/man7/netlink.7.html
+class DiagnosticSocket final {
+ public:
+ // Enum for the socket state. This is modeled after the corresponding
+ // TCP_-prefixed enum in /usr/include/netinet/tcp.h with exact value mapping.
+ // This enum is introduced to decouple the netinet/tcp.h header and the API
+ // of this class.
+ enum SocketState {
+ SS_UNKNOWN = 0,
+ SS_ESTABLISHED,
+ SS_SYN_SENT,
+ SS_SYN_RECV,
+ SS_FIN_WAIT1,
+ SS_FIN_WAIT2,
+ SS_TIME_WAIT,
+ SS_CLOSE,
+ SS_CLOSE_WAIT,
+ SS_LAST_ACK,
+ SS_LISTEN,
+ SS_CLOSING,
+ SS_MAX,
+ };
+
+ // Diagnostic information on a TCP IPv4 socket. That's a subset of the
+ // information available via the netlink data structures.
+ //
+ // TODO(aserbin): if using this API more broadly than fetching information on
+ // a single socket, consider replacing { addr, port } pairs
for
+ // the source and the destination with Sockaddr class fields.
+ struct TcpSocketInfo {
+ SocketState state; // current state of the socket
+ uint32_t src_addr; // IPv4 source address (network byte order)
+ uint32_t dst_addr; // IPv4 destination address (network byte order)
+ uint16_t src_port; // source port number (network byte order)
+ uint16_t dst_port; // destination port number (network byte order)
+ uint32_t rx_queue_size; // RX queue size
+ uint32_t tx_queue_size; // TX queue size
+ };
+
+ // Return wildcard for all the available socket states.
+ static const std::vector<SocketState>& SocketStateWildcard();
+
+ // Construct an object.
+ DiagnosticSocket();
+
+ // Close the diagnostic socket. Errors will be logged, but ignored.
+ ~DiagnosticSocket();
+
+ // Open the diagnostic socket of the NETLINK_SOCK_DIAG protocol in the
+ // AF_NETLINK domain, so it's possible to fetch the requested information
+ // from the kernel using the netlink facility via the API of this class.
+ Status Init() WARN_UNUSED_RESULT;
+
+ // Close the Socket, checking for errors.
+ Status Close();
+
+ // Get diagnostic information on IPv4 TCP sockets of the specified states
+ // having the specified source and the destination address. Wildcard
addresses
+ // are supported.
+ Status Query(const Sockaddr& socket_src_addr,
+ const Sockaddr& socket_dst_addr,
+ const std::vector<SocketState>& socket_states,
+ std::vector<TcpSocketInfo>* info);
+
+ // Get diagnostic information on the specified socket. This is a handy
+ // shortcut to the Query() method above for a single active socket in the
+ // SS_ESTABLISHED or SS_LISTEN.
+ Status Query(const Socket& socket, TcpSocketInfo* info);
+
+ private:
+ // Build and send netlink request, writing it into the diagnostic socket.
+ Status SendRequest(const Socket& socket) const;
+ Status SendRequest(const Sockaddr& socket_src_addr,
+ const Sockaddr& socket_dst_addr,
+ uint32_t socket_states_bitmask) const;
+
+ // Receive response for a request sent by a method above.
+ Status ReceiveResponse(std::vector<TcpSocketInfo>* result) const;
+
+ // File descriptor of the diagnostic socket (AF_NETLINK domain).
+ int fd_;
+
+ DISALLOW_COPY_AND_ASSIGN(DiagnosticSocket);
+};
+
+} // namespace kudu