This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f17ae0804 [rpc] instantiate DiagnosticSocket only if sock_diag 
available
f17ae0804 is described below

commit f17ae080413f4f62046d6732d05ea4f5fdddefb0
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Aug 26 12:47:01 2024 -0700

    [rpc] instantiate DiagnosticSocket only if sock_diag available
    
    In the review feedback on [1], Yingchun Lai pointed to the fact that
    the DiagnosticSocket field would be instantiated in AcceptorPool even if
    the sock_diag netlink facility was not available.  An alternative
    approach would be not instantiating it at all in that case.  This patch
    addresses the point.
    
    This is a follow-up to d27603ec7ea6adb4e89c25a37ab1fed314011a47.
    
    [1] http://gerrit.cloudera.org:8080/21722
    
    Change-Id: Ic70b6cca94b50f070efbc159a767d207c3a7b230
    Reviewed-on: http://gerrit.cloudera.org:8080/21727
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Yingchun Lai <[email protected]>
---
 src/kudu/rpc/acceptor_pool.cc          | 28 +++++++++++++++++++++++++---
 src/kudu/rpc/acceptor_pool.h           |  2 ++
 src/kudu/util/CMakeLists.txt           |  5 ++++-
 src/kudu/util/net/diagnostic_socket.cc | 33 ---------------------------------
 4 files changed, 31 insertions(+), 37 deletions(-)

diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 6b473f88b..009031b19 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -36,13 +36,16 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
-#include "kudu/util/net/diagnostic_socket.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
+#include "kudu/util/net/diagnostic_socket.h"
+#endif
+
 using std::string;
 using strings::Substitute;
 
@@ -205,7 +208,9 @@ void AcceptorPool::Shutdown() {
   }
   threads_.clear();
 
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   WARN_NOT_OK(diag_socket_.Close(), "error closing diagnostic socket");
+#endif
 
   // Close the socket: keeping the descriptor open and, possibly, receiving 
late
   // not-to-be-read messages from the peer does not make much sense. The
@@ -231,11 +236,16 @@ int64_t AcceptorPool::num_rpc_connections_accepted() 
const {
 }
 
 Status AcceptorPool::GetPendingConnectionsNum(uint32_t* result) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DiagnosticSocket::TcpSocketInfo info;
   RETURN_NOT_OK(diag_socket_.Query(socket_, &info));
   *result = info.rx_queue_size;
 
   return Status::OK();
+#else // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
+  return Status::NotSupported(
+      "pending connections metric is not available for this platform");
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... else ...
 }
 
 void AcceptorPool::RunThread() {
@@ -249,11 +259,14 @@ void AcceptorPool::RunThread() {
               "unable to get address info on RPC socket");
   const auto& cur_addr_str = cur_addr.ToString();
 
-  DiagnosticSocket ds;
   const int ds_query_freq_log2 = FLAGS_rpc_listen_socket_stats_every_log2;
   const bool ds_query_enabled = (ds_query_freq_log2 >= 0);
   const uint64_t ds_query_freq_mask =
       ds_query_enabled ? (1ULL << ds_query_freq_log2) - 1 : 0;
+
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
+  DiagnosticSocket ds;
+  uint64_t counter = 0;
   if (ds_query_enabled) {
     if (const auto s = ds.Init(); s.ok()) {
       LOG(INFO) << Substitute(
@@ -267,8 +280,15 @@ void AcceptorPool::RunThread() {
         "collecting diagnostics on the listening RPC socket $0 is disabled",
         cur_addr_str);
   }
+#else
+  if (ds_query_enabled) {
+    LOG(WARNING) << Substitute(
+        "--rpc_listen_socket_stats_every_log2 is set to $0, but collecting "
+        "stats on listening RPC sockets is not supported on this platform",
+        ds_query_freq_log2);
+  }
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... else ...
 
-  uint64_t counter = 0;
   while (true) {
     Socket new_sock;
     Sockaddr remote;
@@ -277,6 +297,7 @@ void AcceptorPool::RunThread() {
     const auto s = socket_.Accept(&new_sock, &remote, 
Socket::FLAG_NONBLOCKING);
     const auto accepted_at = CycleClock::Now();
 
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
     if (ds_query_enabled && ds.IsInitialized() &&
         (counter & ds_query_freq_mask) == ds_query_freq_mask) {
       VLOG(2) << "getting stats on the listening socket";
@@ -293,6 +314,7 @@ void AcceptorPool::RunThread() {
       }
     }
     ++counter;
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 
     const auto dispatch_times_recorder = MakeScopedCleanup([&]() {
       // The timings are captured for both success and failure paths, so the
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 3ffcfe5e7..cf1086da8 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -84,7 +84,9 @@ class AcceptorPool {
   const Sockaddr bind_address_;
   const int listen_backlog_;
   std::vector<scoped_refptr<Thread>> threads_;
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DiagnosticSocket diag_socket_;
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 
   std::atomic<bool> closing_;
 
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index acc95d822..501ac5ec1 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -222,7 +222,6 @@ set(UTIL_SRCS
   minidump.cc
   monotime.cc
   mutex.cc
-  net/diagnostic_socket.cc
   net/dns_resolver.cc
   net/net_util.cc
   net/sockaddr.cc
@@ -269,6 +268,10 @@ set(UTIL_SRCS
   zlib.cc
 )
 
+if (KUDU_HAS_DIAGNOSTIC_SOCKET)
+  set(UTIL_SRCS ${UTIL_SRCS} net/diagnostic_socket.cc)
+endif()
+
 if(NOT NO_TESTS)
   set(UTIL_SRCS ${UTIL_SRCS} test_graph.cc)
 endif()
diff --git a/src/kudu/util/net/diagnostic_socket.cc 
b/src/kudu/util/net/diagnostic_socket.cc
index 5eee7265b..da75ce062 100644
--- a/src/kudu/util/net/diagnostic_socket.cc
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -17,12 +17,10 @@
 
 #include "kudu/util/net/diagnostic_socket.h"
 
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
 #include <linux/inet_diag.h>
 #include <linux/netlink.h>
 #include <linux/sock_diag.h>
 #include <linux/types.h>
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
 
 #include <netinet/in.h>
 #include <sys/socket.h>
@@ -65,13 +63,6 @@ const vector<DiagnosticSocket::SocketState>& 
DiagnosticSocket::SocketStateWildca
   return kSocketStateWildcard;
 }
 
-#if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
-namespace {
-  constexpr const char* const kNotSupportedMsg =
-      "DiagnosticSocket functionality is currently supported on Linux only";
-} // anonymous namespace
-#endif // #if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
-
 DiagnosticSocket::DiagnosticSocket()
     : fd_(-1) {
 }
@@ -81,7 +72,6 @@ DiagnosticSocket::~DiagnosticSocket() {
 }
 
 Status DiagnosticSocket::Init() {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
   if (fd < 0) {
     int err = errno;
@@ -91,9 +81,6 @@ Status DiagnosticSocket::Init() {
   fd_ = fd;
 
   return Status::OK();
-#else
-  return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::Close() {
@@ -114,7 +101,6 @@ Status DiagnosticSocket::Query(const Sockaddr& 
socket_src_addr,
                                const Sockaddr& socket_dst_addr,
                                const vector<SocketState>& socket_states,
                                vector<TcpSocketInfo>* info) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
@@ -129,14 +115,10 @@ Status DiagnosticSocket::Query(const Sockaddr& 
socket_src_addr,
   RETURN_NOT_OK(ReceiveResponse(&result));
   *info = std::move(result);
   return Status::OK();
-#else
-  return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::Query(const Socket& socket,
                                TcpSocketInfo* info) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
@@ -152,14 +134,10 @@ Status DiagnosticSocket::Query(const Socket& socket,
 
   *info = result.front();
   return Status::OK();
-#else
-  return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 // Send query about the specified socket.
 Status DiagnosticSocket::SendRequest(const Socket& socket) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0);
   static constexpr const char* const kNonIpErrMsg =
       "netlink diagnostics is currently supported only on IPv4 TCP sockets";
@@ -188,15 +166,11 @@ Status DiagnosticSocket::SendRequest(const Socket& 
socket) const {
   const uint32_t socket_state_bitmask =
       dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED);
   return SendRequest(src_addr, dst_addr, socket_state_bitmask);
-#else
-  return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr,
                                      const Sockaddr& socket_dst_addr,
                                      uint32_t socket_states_bitmask) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0);
   const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr;
   const auto src_port = socket_src_addr.port();
@@ -254,13 +228,9 @@ Status DiagnosticSocket::SendRequest(const Sockaddr& 
socket_src_addr,
     return Status::NetworkError("sendmsg() failed", ErrnoToString(err), err);
   }
   return Status::OK();
-#else
-  return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
   DCHECK_GE(fd_, 0);
   uint8_t buf[8192];
   struct iovec iov = {
@@ -351,9 +321,6 @@ Status 
DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
     }
   }
   return Status::OK();
-#else
-  return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
 }
 
 } // namespace kudu

Reply via email to