This is an automated email from the ASF dual-hosted git repository.
achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 0d837c5aa [rpc] KUDU-3514 add metric to track per-RPC timeouts
0d837c5aa is described below
commit 0d837c5aac482fbe46170e26ac45e5ad951eecec
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Sep 26 19:35:58 2023 -0700
[rpc] KUDU-3514 add metric to track per-RPC timeouts
This patch introduces a new per-RPC counter to accumulate stats on the
responses that have been sent to the client side past the deadline.
It should help troubleshooting of particular scenarios and provide
a glimpse of what part of requests to a particular RPC method timed out
while being processed.
This patch also includes test scenarios to cover the functionality
of the newly introduced metric.
Change-Id: I0a3cef63475c26f7936d5be7a0072a72332b7c88
Reviewed-on: http://gerrit.cloudera.org:8080/20517
Tested-by: Kudu Jenkins
Reviewed-by: Zoltan Martonka <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/rpc/inbound_call.cc | 7 ++
src/kudu/rpc/protoc-gen-krpc.cc | 11 +++
src/kudu/rpc/rpc-test-base.h | 3 +
src/kudu/rpc/rpc-test.cc | 194 +++++++++++++++++++++++++++++++++++++++-
src/kudu/rpc/service_if.h | 4 +
5 files changed, 216 insertions(+), 3 deletions(-)
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index e2c916125..b204eae3e 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -306,6 +306,13 @@ void InboundCall::RecordHandlingCompleted() {
DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple
calls.
timing_.time_completed = MonoTime::Now();
+ // If it's now past the deadline defined by the client, the yet-to-be-sent
+ // response is already late and the RPC will be declared as "timed out"
+ // (if detected) when it arrives to the client side.
+ if (method_info_ && timing_.time_completed > deadline_) {
+ method_info_->timed_out_on_response->Increment();
+ }
+
if (!timing_.time_handled.Initialized()) {
// Sometimes we respond to a call before we begin handling it (e.g. due to
queue
// overflow, etc). These cases should not be counted against the histogram.
diff --git a/src/kudu/rpc/protoc-gen-krpc.cc b/src/kudu/rpc/protoc-gen-krpc.cc
index 2a3b1a763..cbfa87be5 100644
--- a/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/src/kudu/rpc/protoc-gen-krpc.cc
@@ -451,6 +451,14 @@ class CodeGenerator : public
::google::protobuf::compiler::CodeGenerator {
" \"Number of rejected $rpc_full_name$ requests due to RPC
queue overflow\",\n"
" kudu::MetricLevel::kInfo);\n"
"\n");
+ Print(printer, *subs,
+ "METRIC_DEFINE_counter(server,\n"
+ " timed_out_on_response_$rpc_full_name_plainchars$,\n"
+ " \"Late $rpc_full_name$ RPC Responses\",\n"
+ " kudu::MetricUnit::kRequests,\n"
+ " \"Number of times $rpc_full_name$ responses were sent past
the RPC's deadline\",\n"
+ " kudu::MetricLevel::kInfo);\n"
+ "\n");
subs->Pop(); // method
}
@@ -508,6 +516,9 @@ class CodeGenerator : public
::google::protobuf::compiler::CodeGenerator {
" mi->queue_overflow_rejections =\n"
"
METRIC_queue_overflow_rejections_$rpc_full_name_plainchars$.Instantiate("
"entity);\n"
+ " mi->timed_out_on_response =\n"
+ "
METRIC_timed_out_on_response_$rpc_full_name_plainchars$.Instantiate("
+ "entity);\n"
" mi->func = [this](const Message* req, Message* resp,
RpcContext* ctx) {\n"
" this->$rpc_name$(\n"
" static_cast<const $request$*>(req),\n"
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 745ece846..97eca8e64 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -422,6 +422,7 @@ class RpcTestBase : public KuduTest {
n_server_reactor_threads_(3),
n_worker_threads_(3),
keepalive_time_ms_(1000),
+ rpc_negotiation_timeout_ms_(3000),
service_queue_length_(200),
metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
"test.rpc_test")) {
}
@@ -467,6 +468,7 @@ class RpcTestBase : public KuduTest {
MonoDelta::FromMilliseconds(std::min(keepalive_time_ms_ / 5, 100)));
}
bld.set_metric_entity(metric_entity_);
+ bld.set_rpc_negotiation_timeout_ms(rpc_negotiation_timeout_ms_);
return bld.Build(messenger);
}
@@ -667,6 +669,7 @@ static void DoTestSidecar(Proxy* p, int size1, int size2) {
int n_server_reactor_threads_;
int n_worker_threads_;
int keepalive_time_ms_;
+ int rpc_negotiation_timeout_ms_;
int service_queue_length_;
std::string service_name_;
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 67fb75028..b18bc2981 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -56,6 +56,7 @@
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/rtest.pb.h"
#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/service_pool.h"
#include "kudu/rpc/transfer.h"
#include "kudu/security/test/test_certs.h"
#include "kudu/util/countdown_latch.h"
@@ -80,6 +81,7 @@ class AcceptorPool;
} // namespace kudu
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
@@ -96,11 +98,9 @@ using std::thread;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
-
-namespace kudu {
-
using strings::Substitute;
+namespace kudu {
namespace rpc {
// RPC proxies require a hostname to be passed. In this test we're just
connecting to
@@ -1234,6 +1234,194 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
ASSERT_TRUE(FindOrDie(metric_map, &METRIC_rpc_incoming_queue_time));
}
+// Set of basic test scenarios for the per-RPC 'timed_out_on_response' metric.
+TEST_P(TestRpc, TimedOutOnResponseMetric) {
+ constexpr uint64_t kSleepMicros = 20 * 1000;
+ const string kMethodName = "Sleep";
+
+ // Set RPC connection negotiation timeout to be very high to avoid flakiness
+ // if name resolution is very slow.
+ rpc_negotiation_timeout_ms_ = 60 * 1000;
+
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
+
+ shared_ptr<Messenger> cm;
+ ASSERT_OK(CreateMessenger("client", &cm, 1/*n_reactors*/, enable_ssl()));
+ Proxy p(cm, server_addr, kRemoteHostName,
CalculatorService::static_service_name());
+
+ // Get references to the metrics map and a couple of relevant metrics.
+ const auto& mm =
server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
+ const auto* latency_histogram = down_cast<Histogram*>(FindOrDie(
+ mm,
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
+ const auto* timed_out_on_response = down_cast<Counter*>(FindOrDie(
+ mm,
&METRIC_timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep).get());
+ const auto* timed_out_in_queue =
service_pool_->RpcsTimedOutInQueueMetricForTests();
+
+ ASSERT_EQ(0, latency_histogram->TotalCount());
+ ASSERT_EQ(0, timed_out_on_response->value());
+ ASSERT_EQ(0, timed_out_in_queue->value());
+
+ // Make a dry-run call to avoid flakiness in this test scenario if name
+ // resolution is slow. This primes the DNS resolver and its cache.
+ {
+ SleepRequestPB req;
+ req.set_sleep_micros(kSleepMicros);
+ SleepResponsePB resp;
+ RpcController ctl;
+ ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+ ASSERT_EQ(1, latency_histogram->TotalCount());
+ ASSERT_EQ(0, timed_out_on_response->value());
+ }
+
+ // Run the sequence of sub-scenarios where the requests successfully complete
+ // at the server side, but the client might mark them as timed.
+ SleepRequestPB req;
+ req.set_sleep_micros(kSleepMicros);
+
+ // Client side doesn't set the timeout for the RPC at all.
+ {
+ RpcController ctl;
+ SleepResponsePB resp;
+ ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+ ASSERT_EQ(2, latency_histogram->TotalCount());
+ ASSERT_EQ(0, timed_out_on_response->value());
+ }
+
+ // Set the timeout for the RPC much higher than the request would take
+ // to process (assuming scheduler anomalies does not spill over 30 seconds).
+ {
+ RpcController ctl;
+ ctl.set_timeout(MonoDelta::FromSeconds(30));
+ SleepResponsePB resp;
+ ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+ ASSERT_EQ(3, latency_histogram->TotalCount());
+ ASSERT_EQ(0, timed_out_on_response->value());
+ }
+
+ // Set the timeout for the RPC to the minimum recognizable by the server
+ // side value greater than 0.
+ {
+ RpcController ctl;
+ ctl.set_timeout(MonoDelta::FromMilliseconds(1));
+ SleepResponsePB resp;
+ const auto s = p.SyncRequest(kMethodName, req, &resp, &ctl);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Timed out: Sleep RPC");
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(4, latency_histogram->TotalCount());
+ });
+ ASSERT_EQ(1, timed_out_on_response->value());
+ }
+
+ // Set the timeout for the RPC to be close to the time it takes to process
+ // the request.
+ {
+ RpcController ctl;
+ ctl.set_timeout(MonoDelta::FromMicroseconds(kSleepMicros));
+ SleepResponsePB resp;
+ const auto s = p.SyncRequest(kMethodName, req, &resp, &ctl);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Timed out: Sleep RPC");
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(5, latency_histogram->TotalCount());
+ });
+ ASSERT_EQ(2, timed_out_on_response->value());
+ }
+
+ // Not a single RPC times out in the service queue -- all the RPCs have been
+ // successfully processed by the server, just a couple of them were responded
+ // after the client-defined deadline.
+ ASSERT_EQ(0, timed_out_in_queue->value());
+}
+
+// A special scenario for the per-RPC 'timed_out_on_response' metric when an
+// RPC times out while waiting in the queue, so it's not actually processed.
+TEST_P(TestRpc, TimedOutOnResponseMetricServiceQueue) {
+ constexpr uint64_t kSleepMicros = 20 * 1000;
+ const string kMethodName = "Sleep";
+
+ // Set RPC connection negotiation timeout to be very high to avoid flakiness
+ // if name resolution is very slow.
+ rpc_negotiation_timeout_ms_ = 60 * 1000;
+
+ // Limit the capacity of the service's thread pool, so requests are processed
+ // sequentially by a single worker thread.
+ n_worker_threads_ = 1;
+
+ Sockaddr server_addr = bind_addr();
+ ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
+
+ shared_ptr<Messenger> cm;
+ ASSERT_OK(CreateMessenger("client", &cm, 1/*n_reactors*/, enable_ssl()));
+ Proxy p(cm, server_addr, kRemoteHostName,
CalculatorService::static_service_name());
+
+ // Get the reference to the metrics map and create handles to the needed
metrics.
+ const auto& mm =
server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
+
+ const auto* latency_histogram = down_cast<Histogram*>(FindOrDie(
+ mm,
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
+ const auto* timed_out_on_response = down_cast<Counter*>(FindOrDie(
+ mm,
&METRIC_timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep).get());
+ const auto* timed_out_in_queue =
service_pool_->RpcsTimedOutInQueueMetricForTests();
+
+ // In the beginning, all the related metrics should be zeroed.
+ ASSERT_EQ(0, latency_histogram->TotalCount());
+ ASSERT_EQ(0, timed_out_on_response->value());
+ ASSERT_EQ(0, timed_out_in_queue->value());
+
+ // Make a dry-run call to avoid flakiness in this test scenario if name
+ // resolution is slow. This primes the DNS resolver and its cache.
+ {
+ SleepRequestPB req;
+ req.set_sleep_micros(kSleepMicros);
+ SleepResponsePB resp;
+ RpcController ctl;
+ ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+ }
+ ASSERT_EQ(1, latency_histogram->TotalCount());
+ ASSERT_EQ(0, timed_out_on_response->value());
+ ASSERT_EQ(0, timed_out_in_queue->value());
+
+ CountDownLatch latch(2);
+
+ // The first RPC should be successful: sleep for the specified time.
+ SleepRequestPB req0;
+ req0.set_sleep_micros(kSleepMicros);
+ SleepResponsePB resp0;
+ RpcController ctl0;
+ p.AsyncRequest(kMethodName, req0, &resp0, &ctl0,
+ [&latch]() { latch.CountDown(); });
+
+ // The second RPC should wait in the RPC queue while the first is being
+ // processed by the only thread in the RPC service thread pool. Eventually,
+ // it should time out.
+ SleepRequestPB req1;
+ req1.set_return_app_error(true);
+ SleepResponsePB resp1;
+ RpcController ctl1;
+ ctl1.set_timeout(MonoDelta::FromMicroseconds(kSleepMicros));
+ p.AsyncRequest(kMethodName, req1, &resp1, &ctl1,
+ [&latch]() { latch.CountDown(); });
+
+ // Wait for the completion of both requests sent asynchronously above.
+ latch.Wait();
+
+ // The have been 3 requests total in this scenario.
+ ASSERT_EQ(3, latency_histogram->TotalCount());
+
+ // The first RPC should return OK.
+ ASSERT_OK(ctl0.status());
+
+ // The second RPC should time out while waiting in the queue
+ // and the corresponding metrics should be incremented.
+ const auto& s = ctl1.status();
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Timed out: Sleep RPC");
+ ASSERT_EQ(1, timed_out_on_response->value());
+ ASSERT_EQ(1, timed_out_in_queue->value());
+}
+
static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
CountDownLatch* latch) {
messenger->reset();
diff --git a/src/kudu/rpc/service_if.h b/src/kudu/rpc/service_if.h
index 185e29191..bb378b617 100644
--- a/src/kudu/rpc/service_if.h
+++ b/src/kudu/rpc/service_if.h
@@ -49,6 +49,10 @@ struct RpcMethodInfo : public
RefCountedThreadSafe<RpcMethodInfo> {
scoped_refptr<Histogram> handler_latency_histogram;
scoped_refptr<Counter> queue_overflow_rejections;
+ // The number of times the service sent back a response (both success and
+ // failure responses are counted in) past the deadline set by the client
side.
+ scoped_refptr<Counter> timed_out_on_response;
+
// Whether we should track this method's result, using ResultTracker.
bool track_result;