This is an automated email from the ASF dual-hosted git repository.
achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 4648c9603 [rpc] introduce rpc_pending_connections metric
4648c9603 is described below
commit 4648c96037d90438b2ac955d3f2ddfe37e260263
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jan 23 22:47:40 2024 -0800
[rpc] introduce rpc_pending_connections metric
One of the prior patches introduced 'rpc_listen_socket_rx_queue_size'
histogram-type metric. That one provides information on the history of
the RPC listening sockets' backlog of pending connections, but this new
'rpc_pending_connections' metric reports on the current total number of
pending RPC connections across all the RPC endpoints of a Kudu server.
The newly introduced metric is useful for assessing the current status
of the listening RPC sockets. It's also useful for post-mortem analysis
using the diagnostic logs that store snapshots of this new metric tied
to the timestamps when they were captured.
Change-Id: I0bfdaf9047f43495df85edba3200286f743330a8
Reviewed-on: http://gerrit.cloudera.org:8080/20949
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/rpc/acceptor_pool.cc | 11 +++++++++++
src/kudu/rpc/acceptor_pool.h | 5 +++++
src/kudu/rpc/messenger.cc | 45 ++++++++++++++++++++++++++++++++++++++-----
src/kudu/rpc/messenger.h | 8 ++++++++
src/kudu/rpc/rpc-test.cc | 30 +++++++++++++++++++++++++++++
5 files changed, 94 insertions(+), 5 deletions(-)
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index fca0f1e85..a5038526e 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -220,6 +220,17 @@ int64_t AcceptorPool::num_rpc_connections_accepted() const
{
return rpc_connections_accepted_->value();
}
+Status AcceptorPool::GetPendingConnectionsNum(uint32_t* result) const {
+ DiagnosticSocket ds;
+ RETURN_NOT_OK(ds.Init());
+
+ DiagnosticSocket::TcpSocketInfo info;
+ RETURN_NOT_OK(ds.Query(socket_, &info));
+ *result = info.rx_queue_size;
+
+ return Status::OK();
+}
+
void AcceptorPool::RunThread() {
const int64_t kCyclesPerSecond =
static_cast<int64_t>(base::CyclesPerSecond());
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 440c81903..91ebe3b56 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -70,6 +70,11 @@ class AcceptorPool {
// Return the number of connections accepted by this messenger. Thread-safe.
int64_t num_rpc_connections_accepted() const;
+ // Upon success, return Status::OK() and write the current size of the
+ // listening socket's RX queue into the 'result' out parameter. Otherwise,
+ // return corresponding status and leave the 'result' out parameter
untouched.
+ Status GetPendingConnectionsNum(uint32_t* result) const;
+
private:
void RunThread();
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index df1046005..663a27cb3 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -46,6 +46,7 @@
#include "kudu/security/tls_context.h"
#include "kudu/security/token_verifier.h"
#include "kudu/util/flags.h"
+#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/socket.h"
@@ -54,6 +55,14 @@
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/threadpool.h"
+METRIC_DEFINE_gauge_int32(server, rpc_pending_connections,
+ "Pending RPC Connections",
+ kudu::MetricUnit::kUnits,
+ "The current size of the longest backlog of pending "
+ "connections among all the listening sockets "
+ "of this RPC server",
+ kudu::MetricLevel::kInfo);
+
using kudu::security::RpcAuthentication;
using kudu::security::RpcEncryption;
using std::string;
@@ -156,6 +165,23 @@ void Messenger::AllExternalReferencesDropped() {
retain_self_.reset();
}
+int32_t Messenger::GetPendingConnectionsNum() {
+ auto pool_reports_num = 0;
+ int32_t total_count = 0;
+ for (const auto& p : acceptor_pools_) {
+ uint32_t count;
+ if (auto s = p->GetPendingConnectionsNum(&count); !s.ok()) {
+ KLOG_EVERY_N_SECS(WARNING, 60) << Substitute(
+ "$0: no data on pending connections for acceptor pool at $1",
+ s.ToString(), p->bind_address().ToString()) << THROTTLE_MSG;
+ continue;
+ }
+ ++pool_reports_num;
+ total_count += static_cast<int32_t>(count);
+ }
+ return pool_reports_num == 0 ? -1 : total_count;
+}
+
void Messenger::Shutdown() {
ShutdownInternal(ShutdownMode::SYNC);
}
@@ -223,12 +249,21 @@ Status Messenger::AddAcceptorPool(const Sockaddr&
accept_addr,
RETURN_NOT_OK(sock.Bind(accept_addr));
Sockaddr addr;
RETURN_NOT_OK(sock.GetSocketAddress(&addr));
- auto acceptor_pool(std::make_shared<AcceptorPool>(
- this, &sock, addr, acceptor_listen_backlog_));
- std::lock_guard<percpu_rwlock> guard(lock_);
- acceptor_pools_.push_back(acceptor_pool);
- pool->swap(acceptor_pool);
+ {
+ std::lock_guard<percpu_rwlock> guard(lock_);
+ acceptor_pools_.emplace_back(std::make_shared<AcceptorPool>(
+ this, &sock, addr, acceptor_listen_backlog_));
+ *pool = acceptor_pools_.back();
+ }
+
+ // 'rpc_pending_connections' metric is instantiated only when a messenger
+ // contains at least one acceptor pool. So, this metric is instantiated
+ // only for a server-side messenger.
+ METRIC_rpc_pending_connections.InstantiateFunctionGauge(
+ metric_entity_, [this]() { return this->GetPendingConnectionsNum(); })->
+ AutoDetachToLastValue(&metric_detacher_);
+
return Status::OK();
}
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index a8044fca4..65dcfc01f 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -491,6 +491,12 @@ class Messenger {
// any references. See 'retain_self_' for more info.
void AllExternalReferencesDropped();
+ // Get the total number of currently pending connections across all the RPC
+ // endpoints this messenger is bound to. This utility method returns -1
+ // if the information on the listened socket's backlog cannot be retrieved
+ // from all of the RPC endpoints.
+ int32_t GetPendingConnectionsNum();
+
const std::string name_;
// Protects closing_, acceptor_pools_, rpc_services_.
@@ -618,6 +624,8 @@ class Messenger {
// within a Reactor thread itself.
std::shared_ptr<Messenger> retain_self_;
+ FunctionGaugeDetacher metric_detacher_;
+
DISALLOW_COPY_AND_ASSIGN(Messenger);
};
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index f90130aaf..ab9bdbb85 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -84,6 +84,7 @@ class AcceptorPool;
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_gauge_int32(rpc_pending_connections);
METRIC_DECLARE_histogram(acceptor_dispatch_times);
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
@@ -1457,6 +1458,35 @@ TEST_P(TestRpc, AcceptorDispatchingTimesMetric) {
});
}
+// Basic verification of the 'rpc_pending_connections' metric.
+TEST_P(TestRpc, RpcPendingConnectionsMetric) {
+ Sockaddr server_addr;
+ ASSERT_OK(StartTestServer(&server_addr));
+
+ {
+ Socket socket;
+ ASSERT_OK(socket.Init(server_addr.family(), /*flags=*/0));
+ ASSERT_OK(socket.Connect(server_addr));
+ }
+
+ // Get the reference to already registered metric with the proper callback
+ // to fetch the necessary information. The { 'return -3'; } fake callback
+ // is to make sure the actual gauge returns a proper value,
+ // which is verified below.
+ auto pending_connections_gauge =
+ METRIC_rpc_pending_connections.InstantiateFunctionGauge(
+ server_messenger_->metric_entity(), []() { return -3; });
+
+ // There should be no connection pending -- the only received connection
+ // request has been handled already above. The number of pending connections
+ // is properly reported at Linux only as of now; on macOS it should report
-1.
+#if defined(__linux__)
+ ASSERT_EQ(0, pending_connections_gauge->value());
+#else
+ ASSERT_EQ(-1, pending_connections_gauge->value());
+#endif
+}
+
static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
CountDownLatch* latch) {
messenger->reset();