This is an automated email from the ASF dual-hosted git repository.
achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 2a07d9598 [rpc] add metric for AcceptorPool's dispatch timing
2a07d9598 is described below
commit 2a07d95986f125b9186bff183f03d544a441d64d
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Dec 12 19:38:15 2023 -0800
[rpc] add metric for AcceptorPool's dispatch timing
This patch adds 'acceptor_dispatch_times' histogram metric to track
dispatching times of newly accepted connections by AcceptorPool along
with a very basic unit test for the newly added metric.
Change-Id: I018ddd14414c8d13aaf488fa9eb4db1bf1248cc4
Reviewed-on: http://gerrit.cloudera.org:8080/20790
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/rpc/acceptor_pool.cc | 40 +++++++++++++++++++++++++++++++++++-----
src/kudu/rpc/acceptor_pool.h | 7 +++++--
src/kudu/rpc/rpc-test.cc | 23 +++++++++++++++++++++++
3 files changed, 63 insertions(+), 7 deletions(-)
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index a753f34f8..ec7c86bf7 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -30,12 +30,15 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/gutil/walltime.h"
#include "kudu/rpc/messenger.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
@@ -52,6 +55,17 @@ METRIC_DEFINE_counter(server,
rpc_connections_accepted_unix_domain_socket,
"Number of incoming UNIX Domain Socket connections made
to the RPC server",
kudu::MetricLevel::kInfo);
+METRIC_DEFINE_histogram(server, acceptor_dispatch_times,
+ "Acceptor Dispatch Times",
+ kudu::MetricUnit::kMicroseconds,
+ "A histogram of dispatching timings for accepted "
+ "connections. Outliers in this histogram contribute "
+ "to the latency of handling incoming connection "
+ "requests and growing the backlog of pending TCP "
+ "connections to the server.",
+ kudu::MetricLevel::kInfo,
+ 1000000, 2);
+
DEFINE_int32(rpc_acceptor_listen_backlog, 128,
"Socket backlog parameter used when listening for RPC
connections. "
"This defines the maximum length to which the queue of pending "
@@ -74,10 +88,12 @@ AcceptorPool::AcceptorPool(Messenger* messenger,
socket_(socket->Release()),
bind_address_(bind_address),
closing_(false) {
- auto& accept_metric = bind_address.is_ip() ?
- METRIC_rpc_connections_accepted :
- METRIC_rpc_connections_accepted_unix_domain_socket;
- rpc_connections_accepted_ =
accept_metric.Instantiate(messenger->metric_entity());
+ const auto& metric_entity = messenger->metric_entity();
+ auto& connections_accepted = bind_address.is_ip()
+ ? METRIC_rpc_connections_accepted
+ : METRIC_rpc_connections_accepted_unix_domain_socket;
+ rpc_connections_accepted_ = connections_accepted.Instantiate(metric_entity);
+ dispatch_times_ = METRIC_acceptor_dispatch_times.Instantiate(metric_entity);
}
AcceptorPool::~AcceptorPool() {
@@ -152,12 +168,26 @@ int64_t AcceptorPool::num_rpc_connections_accepted()
const {
}
void AcceptorPool::RunThread() {
+ const int64_t kCyclesPerSecond =
static_cast<int64_t>(base::CyclesPerSecond());
+
while (true) {
Socket new_sock;
Sockaddr remote;
VLOG(2) << Substitute("calling accept() on socket $0 listening on $1",
socket_.GetFd(), bind_address_.ToString());
const auto s = socket_.Accept(&new_sock, &remote,
Socket::FLAG_NONBLOCKING);
+ const auto accepted_at = CycleClock::Now();
+ const auto dispatch_times_recorder = MakeScopedCleanup([&]() {
+ // The timings are captured for both success and failure paths, so the
+ // 'dispatch_times_' histogram accounts for all the connection attempts
+ // that lead to successfully extracting an item from the queue of pending
+ // connections for the listened RPC socket. Meanwhile, the
+ // 'rpc_connection_accepted_' counter accounts only for connections that
+ // were successfully dispatched to the messenger for further processing.
+ dispatch_times_->Increment(
+ (CycleClock::Now() - accepted_at) * 1000000 / kCyclesPerSecond);
+ });
+
if (PREDICT_FALSE(!s.ok())) {
if (closing_) {
break;
@@ -177,8 +207,8 @@ void AcceptorPool::RunThread() {
continue;
}
}
- rpc_connections_accepted_->Increment();
messenger_->RegisterInboundSocket(&new_sock, remote);
+ rpc_connections_accepted_->Increment();
}
VLOG(1) << "AcceptorPool shutting down";
}
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index aa00df66f..14b2740e1 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -29,6 +29,7 @@
namespace kudu {
class Counter;
+class Histogram;
class Thread;
namespace rpc {
@@ -70,10 +71,12 @@ class AcceptorPool {
const Sockaddr bind_address_;
std::vector<scoped_refptr<Thread>> threads_;
- scoped_refptr<Counter> rpc_connections_accepted_;
-
std::atomic<bool> closing_;
+ // Metrics.
+ scoped_refptr<Counter> rpc_connections_accepted_;
+ scoped_refptr<Histogram> dispatch_times_;
+
DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
};
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index b18bc2981..52d9b89ea 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -82,6 +82,7 @@ class AcceptorPool;
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_histogram(acceptor_dispatch_times);
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
@@ -1422,6 +1423,28 @@ TEST_P(TestRpc, TimedOutOnResponseMetricServiceQueue) {
ASSERT_EQ(1, timed_out_in_queue->value());
}
+// Basic verification for the numbers reported by 'acceptor_dispatch_times'.
+TEST_P(TestRpc, AcceptorDispatchingTimesMetric) {
+ Sockaddr server_addr;
+ ASSERT_OK(StartTestServer(&server_addr));
+
+ {
+ Socket socket;
+ ASSERT_OK(socket.Init(server_addr.family(), 0));
+ ASSERT_OK(socket.Connect(server_addr));
+ }
+
+ scoped_refptr<Histogram> dispatch_times =
+
METRIC_acceptor_dispatch_times.Instantiate(server_messenger_->metric_entity());
+ // Using ASSERT_EVENTUALLY below because of relaxed memory ordering when
+ // fetching metrics' values. Eventually, metrics reports readings that are
+ // consistent with the expected numbers.
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(1, dispatch_times->TotalCount());
+ ASSERT_GT(dispatch_times->MaxValueForTests(), 0);
+ });
+}
+
static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
CountDownLatch* latch) {
messenger->reset();