This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new d27603ec7 [util] fine-grained versioning for diagnostic socket
d27603ec7 is described below
commit d27603ec7ea6adb4e89c25a37ab1fed314011a47
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Aug 23 20:55:33 2024 -0700
[util] fine-grained versioning for diagnostic socket
The motivation for this patch is to be able to build Kudu on
legacy Linux OS releases (kernel version < 3.3).
This patch is a follow-up to f15a1ca5476b256671c2369dd4d9faeccfee6b7c
and c0c44a8acd8d6366987af687d5665f751249a95a.
Change-Id: I4fad2ad5c5a177678f52bff080429568a20ac67c
Reviewed-on: http://gerrit.cloudera.org:8080/21722
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Yingchun Lai <[email protected]>
---
CMakeLists.txt | 19 +++++++++++
src/kudu/rpc/acceptor_pool.cc | 9 ++++--
src/kudu/rpc/messenger.cc | 4 +--
src/kudu/rpc/rpc-test.cc | 16 ++++-----
src/kudu/util/CMakeLists.txt | 3 ++
src/kudu/util/net/diagnostic_socket.cc | 59 ++++++++++++++++------------------
6 files changed, 67 insertions(+), 43 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4f0471264..ad8fb3737 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -737,6 +737,25 @@ endif()
endif()
endfunction()
+############################################################
+# Features that depend on Linux kernel version
+############################################################
+
+# The diagnostic socket is a Linux-specific functionality based on
+# the sock_diag netlink subsystem: see the DiagnosticSocket class
+# in src/kudu/util/net/diagnostic_socket.cc and its usage elsewhere.
+# For more information on the sock_diag netlink subsystem in Linux see [1].
+#
+# [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html
+if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux" AND
+ "${CMAKE_HOST_SYSTEM_VERSION}" VERSION_GREATER_EQUAL "3.3")
+ set(KUDU_HAS_DIAGNOSTIC_SOCKET 1)
+ add_definitions(-DKUDU_HAS_DIAGNOSTIC_SOCKET)
+ message(STATUS "Diagnostic Socket is AVAILABLE")
+else()
+ message(STATUS "Diagnostic Socket is NOT AVAILABLE")
+endif()
+
############################################################
# Testing
############################################################
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 63748604d..6b473f88b 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -90,7 +90,12 @@ DEFINE_int32(rpc_acceptor_listen_backlog,
"the server ride over bursts of new inbound connection
requests.");
TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
-DEFINE_int32(rpc_listen_socket_stats_every_log2, 3,
+DEFINE_int32(rpc_listen_socket_stats_every_log2,
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
+ 3,
+#else
+ -1,
+#endif
"Listening RPC socket's statistics sampling frequency. With "
"--rpc_listen_socket_stats_every_log2=N, the statistics are "
"sampled every 2^N connection request by each acceptor thread. "
@@ -155,7 +160,7 @@ AcceptorPool::~AcceptorPool() {
Status AcceptorPool::Start(int num_threads) {
RETURN_NOT_OK(socket_.Listen(listen_backlog_));
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
WARN_NOT_OK(diag_socket_.Init(), "could not initialize diagnostic socket");
#endif
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 6dbed0e3f..2d337e1c2 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -269,7 +269,7 @@ Status Messenger::AddAcceptorPool(const Sockaddr&
accept_addr,
this, &sock, addr, acceptor_listen_backlog_));
*pool = acceptor_pools_.back();
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
if (acceptor_pools_.size() == 1) {
// 'rpc_pending_connections' metric is instantiated when the messenger
// contains exactly one acceptor pool: this metric makes sense
@@ -279,7 +279,7 @@ Status Messenger::AddAcceptorPool(const Sockaddr&
accept_addr,
metric_entity_, [this]() { return this->GetPendingConnectionsNum();
})->
AutoDetachToLastValue(&metric_detacher_);
}
-#endif // #if defined(__linux__) ...
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
}
return Status::OK();
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 1efbe8ebb..704a27735 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -1482,11 +1482,11 @@ TEST_P(TestRpc, RpcPendingConnectionsMetric) {
server_messenger_->metric_entity(), []() { return -3; });
// No connection attempts have been made yet.
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
ASSERT_EQ(0, pending_connections_gauge->value());
#else
- ASSERT_EQ(-1, pending_connections_gauge->value());
-#endif
+ ASSERT_EQ(-3, pending_connections_gauge->value());
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
{
Socket socket;
@@ -1501,11 +1501,11 @@ TEST_P(TestRpc, RpcPendingConnectionsMetric) {
// At this point, there should be no connection pending: the only received
// connection request has already been handled above.
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
ASSERT_EQ(0, pending_connections_gauge->value());
#else
- ASSERT_EQ(-1, pending_connections_gauge->value());
-#endif
+ ASSERT_EQ(-3, pending_connections_gauge->value());
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
}
static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
@@ -1897,7 +1897,7 @@ TEST_P(TestRpc, TestCallId) {
}
}
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
// A test to verify collecting information on the RX queue size of a listening
// socket using the DiagnosticSocket wrapper.
class TestRpcSocketTxRxQueue : public TestRpc {
@@ -2104,7 +2104,7 @@ TEST_P(TestRpcSocketTxRxQueue,
CustomAcceptorRxQueueSamplingFrequency) {
ASSERT_EQ(2, rx_queue_size->TotalCount());
});
}
-#endif
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
} // namespace rpc
} // namespace kudu
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index e4887cf1b..acc95d822 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -595,6 +595,9 @@ ADD_KUDU_TEST(yamlreader-test)
if (NOT APPLE)
ADD_KUDU_TEST(minidump-test)
+endif()
+
+if (KUDU_HAS_DIAGNOSTIC_SOCKET)
ADD_KUDU_TEST(net/diagnostic_socket-test)
endif()
diff --git a/src/kudu/util/net/diagnostic_socket.cc
b/src/kudu/util/net/diagnostic_socket.cc
index 6c3ad3260..5eee7265b 100644
--- a/src/kudu/util/net/diagnostic_socket.cc
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -17,12 +17,12 @@
#include "kudu/util/net/diagnostic_socket.h"
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
#include <linux/inet_diag.h>
#include <linux/netlink.h>
#include <linux/sock_diag.h>
#include <linux/types.h>
-#endif
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
#include <netinet/in.h>
#include <sys/socket.h>
@@ -65,12 +65,12 @@ const vector<DiagnosticSocket::SocketState>&
DiagnosticSocket::SocketStateWildca
return kSocketStateWildcard;
}
-#if !defined(__linux__)
+#if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
namespace {
constexpr const char* const kNotSupportedMsg =
"DiagnosticSocket functionality is currently supported on Linux only";
} // anonymous namespace
-#endif // #if !defined(__linux__)
+#endif // #if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
DiagnosticSocket::DiagnosticSocket()
: fd_(-1) {
@@ -81,9 +81,7 @@ DiagnosticSocket::~DiagnosticSocket() {
}
Status DiagnosticSocket::Init() {
-#if !defined(__linux__)
- return Status::NotSupported(kNotSupportedMsg);
-#else
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
if (fd < 0) {
int err = errno;
@@ -93,7 +91,9 @@ Status DiagnosticSocket::Init() {
fd_ = fd;
return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+ return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::Close() {
@@ -114,12 +114,10 @@ Status DiagnosticSocket::Query(const Sockaddr&
socket_src_addr,
const Sockaddr& socket_dst_addr,
const vector<SocketState>& socket_states,
vector<TcpSocketInfo>* info) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0) << "requires calling Init() first";
DCHECK(info);
-#if !defined(__linux__)
- return Status::NotSupported(kNotSupportedMsg);
-#else
uint32_t socket_states_bitmask = 0;
for (auto state : socket_states) {
socket_states_bitmask |= (1U << state);
@@ -131,17 +129,17 @@ Status DiagnosticSocket::Query(const Sockaddr&
socket_src_addr,
RETURN_NOT_OK(ReceiveResponse(&result));
*info = std::move(result);
return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+ return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::Query(const Socket& socket,
TcpSocketInfo* info) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0) << "requires calling Init() first";
DCHECK(info);
-#if !defined(__linux__)
- return Status::NotSupported(kNotSupportedMsg);
-#else
RETURN_NOT_OK(SendRequest(socket));
vector<TcpSocketInfo> result;
RETURN_NOT_OK(ReceiveResponse(&result));
@@ -154,16 +152,15 @@ Status DiagnosticSocket::Query(const Socket& socket,
*info = result.front();
return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+ return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
// Send query about the specified socket.
Status DiagnosticSocket::SendRequest(const Socket& socket) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0);
-
-#if !defined(__linux__)
- return Status::NotSupported(kNotSupportedMsg);
-#else
static constexpr const char* const kNonIpErrMsg =
"netlink diagnostics is currently supported only on IPv4 TCP sockets";
@@ -191,17 +188,16 @@ Status DiagnosticSocket::SendRequest(const Socket&
socket) const {
const uint32_t socket_state_bitmask =
dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED);
return SendRequest(src_addr, dst_addr, socket_state_bitmask);
-#endif // #if !defined(__linux__) ... #else ...
+#else
+ return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr,
const Sockaddr& socket_dst_addr,
uint32_t socket_states_bitmask) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0);
-
-#if !defined(__linux__)
- return Status::NotSupported(kNotSupportedMsg);
-#else
const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr;
const auto src_port = socket_src_addr.port();
const in_addr& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr;
@@ -258,15 +254,14 @@ Status DiagnosticSocket::SendRequest(const Sockaddr&
socket_src_addr,
return Status::NetworkError("sendmsg() failed", ErrnoToString(err), err);
}
return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+ return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0);
-
-#if !defined(__linux__)
- return Status::NotSupported(kNotSupportedMsg);
-#else
uint8_t buf[8192];
struct iovec iov = {
.iov_base = buf,
@@ -356,7 +351,9 @@ Status
DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
}
}
return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+ return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
} // namespace kudu