This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d27603ec7 [util] fine-grained versioning for diagnostic socket
d27603ec7 is described below

commit d27603ec7ea6adb4e89c25a37ab1fed314011a47
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Aug 23 20:55:33 2024 -0700

    [util] fine-grained versioning for diagnostic socket
    
    The motivation for this patch is to be able to build Kudu on
    legacy Linux OS releases (kernel version < 3.3).
    
    This patch is a follow-up to f15a1ca5476b256671c2369dd4d9faeccfee6b7c
    and c0c44a8acd8d6366987af687d5665f751249a95a.
    
    Change-Id: I4fad2ad5c5a177678f52bff080429568a20ac67c
    Reviewed-on: http://gerrit.cloudera.org:8080/21722
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Mahesh Reddy <[email protected]>
    Reviewed-by: Yingchun Lai <[email protected]>
---
 CMakeLists.txt                         | 19 +++++++++++
 src/kudu/rpc/acceptor_pool.cc          |  9 ++++--
 src/kudu/rpc/messenger.cc              |  4 +--
 src/kudu/rpc/rpc-test.cc               | 16 ++++-----
 src/kudu/util/CMakeLists.txt           |  3 ++
 src/kudu/util/net/diagnostic_socket.cc | 59 ++++++++++++++++------------------
 6 files changed, 67 insertions(+), 43 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4f0471264..ad8fb3737 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -737,6 +737,25 @@ endif()
   endif()
 endfunction()
 
+############################################################
+# Features that depend on Linux kernel version
+############################################################
+
+# The diagnostic socket is a Linux-specific functionality based on
+# the sock_diag netlink subsystem: see the DiagnosticSocket class
+# in src/kudu/util/net/diagnostic_socket.cc and its usage elsewhere.
+# For more information on the sock_diag netlink subsystem in Linux see [1].
+#
+# [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html
+if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux" AND
+    "${CMAKE_HOST_SYSTEM_VERSION}" VERSION_GREATER_EQUAL "3.3")
+  set(KUDU_HAS_DIAGNOSTIC_SOCKET 1)
+  add_definitions(-DKUDU_HAS_DIAGNOSTIC_SOCKET)
+  message(STATUS "Diagnostic Socket is AVAILABLE")
+else()
+  message(STATUS "Diagnostic Socket is NOT AVAILABLE")
+endif()
+
 ############################################################
 # Testing
 ############################################################
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 63748604d..6b473f88b 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -90,7 +90,12 @@ DEFINE_int32(rpc_acceptor_listen_backlog,
              "the server ride over bursts of new inbound connection 
requests.");
 TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
 
-DEFINE_int32(rpc_listen_socket_stats_every_log2, 3,
+DEFINE_int32(rpc_listen_socket_stats_every_log2,
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
+             3,
+#else
+             -1,
+#endif
              "Listening RPC socket's statistics sampling frequency. With "
              "--rpc_listen_socket_stats_every_log2=N, the statistics are "
              "sampled every 2^N connection request by each acceptor thread. "
@@ -155,7 +160,7 @@ AcceptorPool::~AcceptorPool() {
 
 Status AcceptorPool::Start(int num_threads) {
   RETURN_NOT_OK(socket_.Listen(listen_backlog_));
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   WARN_NOT_OK(diag_socket_.Init(), "could not initialize diagnostic socket");
 #endif
 
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 6dbed0e3f..2d337e1c2 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -269,7 +269,7 @@ Status Messenger::AddAcceptorPool(const Sockaddr& 
accept_addr,
         this, &sock, addr, acceptor_listen_backlog_));
     *pool = acceptor_pools_.back();
 
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
     if (acceptor_pools_.size() == 1) {
       // 'rpc_pending_connections' metric is instantiated when the messenger
       // contains exactly one acceptor pool: this metric makes sense
@@ -279,7 +279,7 @@ Status Messenger::AddAcceptorPool(const Sockaddr& 
accept_addr,
           metric_entity_, [this]() { return this->GetPendingConnectionsNum(); 
})->
           AutoDetachToLastValue(&metric_detacher_);
     }
-#endif // #if defined(__linux__) ...
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
   }
 
   return Status::OK();
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 1efbe8ebb..704a27735 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -1482,11 +1482,11 @@ TEST_P(TestRpc, RpcPendingConnectionsMetric) {
           server_messenger_->metric_entity(), []() { return -3; });
 
   // No connection attempts have been made yet.
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   ASSERT_EQ(0, pending_connections_gauge->value());
 #else
-  ASSERT_EQ(-1, pending_connections_gauge->value());
-#endif
+  ASSERT_EQ(-3, pending_connections_gauge->value());
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 
   {
     Socket socket;
@@ -1501,11 +1501,11 @@ TEST_P(TestRpc, RpcPendingConnectionsMetric) {
 
   // At this point, there should be no connection pending: the only received
   // connection request has already been handled above.
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   ASSERT_EQ(0, pending_connections_gauge->value());
 #else
-  ASSERT_EQ(-1, pending_connections_gauge->value());
-#endif
+  ASSERT_EQ(-3, pending_connections_gauge->value());
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 }
 
 static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
@@ -1897,7 +1897,7 @@ TEST_P(TestRpc, TestCallId) {
   }
 }
 
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
 // A test to verify collecting information on the RX queue size of a listening
 // socket using the DiagnosticSocket wrapper.
 class TestRpcSocketTxRxQueue : public TestRpc {
@@ -2104,7 +2104,7 @@ TEST_P(TestRpcSocketTxRxQueue, 
CustomAcceptorRxQueueSamplingFrequency) {
     ASSERT_EQ(2, rx_queue_size->TotalCount());
   });
 }
-#endif
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 
 } // namespace rpc
 } // namespace kudu
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index e4887cf1b..acc95d822 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -595,6 +595,9 @@ ADD_KUDU_TEST(yamlreader-test)
 
 if (NOT APPLE)
   ADD_KUDU_TEST(minidump-test)
+endif()
+
+if (KUDU_HAS_DIAGNOSTIC_SOCKET)
   ADD_KUDU_TEST(net/diagnostic_socket-test)
 endif()
 
diff --git a/src/kudu/util/net/diagnostic_socket.cc 
b/src/kudu/util/net/diagnostic_socket.cc
index 6c3ad3260..5eee7265b 100644
--- a/src/kudu/util/net/diagnostic_socket.cc
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -17,12 +17,12 @@
 
 #include "kudu/util/net/diagnostic_socket.h"
 
-#if defined(__linux__)
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
 #include <linux/inet_diag.h>
 #include <linux/netlink.h>
 #include <linux/sock_diag.h>
 #include <linux/types.h>
-#endif
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 
 #include <netinet/in.h>
 #include <sys/socket.h>
@@ -65,12 +65,12 @@ const vector<DiagnosticSocket::SocketState>& 
DiagnosticSocket::SocketStateWildca
   return kSocketStateWildcard;
 }
 
-#if !defined(__linux__)
+#if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
 namespace {
   constexpr const char* const kNotSupportedMsg =
       "DiagnosticSocket functionality is currently supported on Linux only";
 } // anonymous namespace
-#endif // #if !defined(__linux__)
+#endif // #if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 
 DiagnosticSocket::DiagnosticSocket()
     : fd_(-1) {
@@ -81,9 +81,7 @@ DiagnosticSocket::~DiagnosticSocket() {
 }
 
 Status DiagnosticSocket::Init() {
-#if !defined(__linux__)
-  return Status::NotSupported(kNotSupportedMsg);
-#else
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
   if (fd < 0) {
     int err = errno;
@@ -93,7 +91,9 @@ Status DiagnosticSocket::Init() {
   fd_ = fd;
 
   return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+  return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::Close() {
@@ -114,12 +114,10 @@ Status DiagnosticSocket::Query(const Sockaddr& 
socket_src_addr,
                                const Sockaddr& socket_dst_addr,
                                const vector<SocketState>& socket_states,
                                vector<TcpSocketInfo>* info) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
-#if !defined(__linux__)
-  return Status::NotSupported(kNotSupportedMsg);
-#else
   uint32_t socket_states_bitmask = 0;
   for (auto state : socket_states) {
     socket_states_bitmask |= (1U << state);
@@ -131,17 +129,17 @@ Status DiagnosticSocket::Query(const Sockaddr& 
socket_src_addr,
   RETURN_NOT_OK(ReceiveResponse(&result));
   *info = std::move(result);
   return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+  return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::Query(const Socket& socket,
                                TcpSocketInfo* info) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
-#if !defined(__linux__)
-  return Status::NotSupported(kNotSupportedMsg);
-#else
   RETURN_NOT_OK(SendRequest(socket));
   vector<TcpSocketInfo> result;
   RETURN_NOT_OK(ReceiveResponse(&result));
@@ -154,16 +152,15 @@ Status DiagnosticSocket::Query(const Socket& socket,
 
   *info = result.front();
   return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+  return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 // Send query about the specified socket.
 Status DiagnosticSocket::SendRequest(const Socket& socket) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0);
-
-#if !defined(__linux__)
-  return Status::NotSupported(kNotSupportedMsg);
-#else
   static constexpr const char* const kNonIpErrMsg =
       "netlink diagnostics is currently supported only on IPv4 TCP sockets";
 
@@ -191,17 +188,16 @@ Status DiagnosticSocket::SendRequest(const Socket& 
socket) const {
   const uint32_t socket_state_bitmask =
       dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED);
   return SendRequest(src_addr, dst_addr, socket_state_bitmask);
-#endif // #if !defined(__linux__) ... #else ...
+#else
+  return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr,
                                      const Sockaddr& socket_dst_addr,
                                      uint32_t socket_states_bitmask) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0);
-
-#if !defined(__linux__)
-  return Status::NotSupported(kNotSupportedMsg);
-#else
   const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr;
   const auto src_port = socket_src_addr.port();
   const in_addr& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr;
@@ -258,15 +254,14 @@ Status DiagnosticSocket::SendRequest(const Sockaddr& 
socket_src_addr,
     return Status::NetworkError("sendmsg() failed", ErrnoToString(err), err);
   }
   return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+  return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0);
-
-#if !defined(__linux__)
-  return Status::NotSupported(kNotSupportedMsg);
-#else
   uint8_t buf[8192];
   struct iovec iov = {
     .iov_base = buf,
@@ -356,7 +351,9 @@ Status 
DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
     }
   }
   return Status::OK();
-#endif // #if !defined(__linux__) ... #else ...
+#else
+  return Status::NotSupported(kNotSupportedMsg);
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 } // namespace kudu

Reply via email to