This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new f17ae0804 [rpc] instantiate DiagnosticSocket only if sock_diag
available
f17ae0804 is described below
commit f17ae080413f4f62046d6732d05ea4f5fdddefb0
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Aug 26 12:47:01 2024 -0700
[rpc] instantiate DiagnosticSocket only if sock_diag available
In the review feedback on [1], Yingchun Lai pointed to the fact that
the DiagnosticSocket field would be instantiated in AcceptorPool even if
the sock_diag netlink facility was not available. An alternative
approach would be not instantiating it at all in that case. This patch
addresses the point.
This is a follow-up to d27603ec7ea6adb4e89c25a37ab1fed314011a47.
[1] http://gerrit.cloudera.org:8080/21722
Change-Id: Ic70b6cca94b50f070efbc159a767d207c3a7b230
Reviewed-on: http://gerrit.cloudera.org:8080/21727
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Yingchun Lai <[email protected]>
---
src/kudu/rpc/acceptor_pool.cc | 28 +++++++++++++++++++++++++---
src/kudu/rpc/acceptor_pool.h | 2 ++
src/kudu/util/CMakeLists.txt | 5 ++++-
src/kudu/util/net/diagnostic_socket.cc | 33 ---------------------------------
4 files changed, 31 insertions(+), 37 deletions(-)
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 6b473f88b..009031b19 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -36,13 +36,16 @@
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
-#include "kudu/util/net/diagnostic_socket.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
+#include "kudu/util/net/diagnostic_socket.h"
+#endif
+
using std::string;
using strings::Substitute;
@@ -205,7 +208,9 @@ void AcceptorPool::Shutdown() {
}
threads_.clear();
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
WARN_NOT_OK(diag_socket_.Close(), "error closing diagnostic socket");
+#endif
// Close the socket: keeping the descriptor open and, possibly, receiving
late
// not-to-be-read messages from the peer does not make much sense. The
@@ -231,11 +236,16 @@ int64_t AcceptorPool::num_rpc_connections_accepted()
const {
}
Status AcceptorPool::GetPendingConnectionsNum(uint32_t* result) const {
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DiagnosticSocket::TcpSocketInfo info;
RETURN_NOT_OK(diag_socket_.Query(socket_, &info));
*result = info.rx_queue_size;
return Status::OK();
+#else // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
+ return Status::NotSupported(
+ "pending connections metric is not available for this platform");
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... else ...
}
void AcceptorPool::RunThread() {
@@ -249,11 +259,14 @@ void AcceptorPool::RunThread() {
"unable to get address info on RPC socket");
const auto& cur_addr_str = cur_addr.ToString();
- DiagnosticSocket ds;
const int ds_query_freq_log2 = FLAGS_rpc_listen_socket_stats_every_log2;
const bool ds_query_enabled = (ds_query_freq_log2 >= 0);
const uint64_t ds_query_freq_mask =
ds_query_enabled ? (1ULL << ds_query_freq_log2) - 1 : 0;
+
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
+ DiagnosticSocket ds;
+ uint64_t counter = 0;
if (ds_query_enabled) {
if (const auto s = ds.Init(); s.ok()) {
LOG(INFO) << Substitute(
@@ -267,8 +280,15 @@ void AcceptorPool::RunThread() {
"collecting diagnostics on the listening RPC socket $0 is disabled",
cur_addr_str);
}
+#else
+ if (ds_query_enabled) {
+ LOG(WARNING) << Substitute(
+ "--rpc_listen_socket_stats_every_log2 is set to $0, but collecting "
+ "stats on listening RPC sockets is not supported on this platform",
+ ds_query_freq_log2);
+ }
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... else ...
- uint64_t counter = 0;
while (true) {
Socket new_sock;
Sockaddr remote;
@@ -277,6 +297,7 @@ void AcceptorPool::RunThread() {
const auto s = socket_.Accept(&new_sock, &remote,
Socket::FLAG_NONBLOCKING);
const auto accepted_at = CycleClock::Now();
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
if (ds_query_enabled && ds.IsInitialized() &&
(counter & ds_query_freq_mask) == ds_query_freq_mask) {
VLOG(2) << "getting stats on the listening socket";
@@ -293,6 +314,7 @@ void AcceptorPool::RunThread() {
}
}
++counter;
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
const auto dispatch_times_recorder = MakeScopedCleanup([&]() {
// The timings are captured for both success and failure paths, so the
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 3ffcfe5e7..cf1086da8 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -84,7 +84,9 @@ class AcceptorPool {
const Sockaddr bind_address_;
const int listen_backlog_;
std::vector<scoped_refptr<Thread>> threads_;
+#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DiagnosticSocket diag_socket_;
+#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
std::atomic<bool> closing_;
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index acc95d822..501ac5ec1 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -222,7 +222,6 @@ set(UTIL_SRCS
minidump.cc
monotime.cc
mutex.cc
- net/diagnostic_socket.cc
net/dns_resolver.cc
net/net_util.cc
net/sockaddr.cc
@@ -269,6 +268,10 @@ set(UTIL_SRCS
zlib.cc
)
+if (KUDU_HAS_DIAGNOSTIC_SOCKET)
+ set(UTIL_SRCS ${UTIL_SRCS} net/diagnostic_socket.cc)
+endif()
+
if(NOT NO_TESTS)
set(UTIL_SRCS ${UTIL_SRCS} test_graph.cc)
endif()
diff --git a/src/kudu/util/net/diagnostic_socket.cc
b/src/kudu/util/net/diagnostic_socket.cc
index 5eee7265b..da75ce062 100644
--- a/src/kudu/util/net/diagnostic_socket.cc
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -17,12 +17,10 @@
#include "kudu/util/net/diagnostic_socket.h"
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
#include <linux/inet_diag.h>
#include <linux/netlink.h>
#include <linux/sock_diag.h>
#include <linux/types.h>
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
#include <netinet/in.h>
#include <sys/socket.h>
@@ -65,13 +63,6 @@ const vector<DiagnosticSocket::SocketState>&
DiagnosticSocket::SocketStateWildca
return kSocketStateWildcard;
}
-#if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
-namespace {
- constexpr const char* const kNotSupportedMsg =
- "DiagnosticSocket functionality is currently supported on Linux only";
-} // anonymous namespace
-#endif // #if !defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ...
-
DiagnosticSocket::DiagnosticSocket()
: fd_(-1) {
}
@@ -81,7 +72,6 @@ DiagnosticSocket::~DiagnosticSocket() {
}
Status DiagnosticSocket::Init() {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
if (fd < 0) {
int err = errno;
@@ -91,9 +81,6 @@ Status DiagnosticSocket::Init() {
fd_ = fd;
return Status::OK();
-#else
- return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::Close() {
@@ -114,7 +101,6 @@ Status DiagnosticSocket::Query(const Sockaddr&
socket_src_addr,
const Sockaddr& socket_dst_addr,
const vector<SocketState>& socket_states,
vector<TcpSocketInfo>* info) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0) << "requires calling Init() first";
DCHECK(info);
@@ -129,14 +115,10 @@ Status DiagnosticSocket::Query(const Sockaddr&
socket_src_addr,
RETURN_NOT_OK(ReceiveResponse(&result));
*info = std::move(result);
return Status::OK();
-#else
- return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::Query(const Socket& socket,
TcpSocketInfo* info) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0) << "requires calling Init() first";
DCHECK(info);
@@ -152,14 +134,10 @@ Status DiagnosticSocket::Query(const Socket& socket,
*info = result.front();
return Status::OK();
-#else
- return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
// Send query about the specified socket.
Status DiagnosticSocket::SendRequest(const Socket& socket) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0);
static constexpr const char* const kNonIpErrMsg =
"netlink diagnostics is currently supported only on IPv4 TCP sockets";
@@ -188,15 +166,11 @@ Status DiagnosticSocket::SendRequest(const Socket&
socket) const {
const uint32_t socket_state_bitmask =
dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED);
return SendRequest(src_addr, dst_addr, socket_state_bitmask);
-#else
- return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr,
const Sockaddr& socket_dst_addr,
uint32_t socket_states_bitmask) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0);
const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr;
const auto src_port = socket_src_addr.port();
@@ -254,13 +228,9 @@ Status DiagnosticSocket::SendRequest(const Sockaddr&
socket_src_addr,
return Status::NetworkError("sendmsg() failed", ErrnoToString(err), err);
}
return Status::OK();
-#else
- return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
-#if defined(KUDU_HAS_DIAGNOSTIC_SOCKET)
DCHECK_GE(fd_, 0);
uint8_t buf[8192];
struct iovec iov = {
@@ -351,9 +321,6 @@ Status
DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
}
}
return Status::OK();
-#else
- return Status::NotSupported(kNotSupportedMsg);
-#endif // #if defined(KUDU_HAS_DIAGNOSTIC_SOCKET) ... #else ...
}
} // namespace kudu