This is an automated email from the ASF dual-hosted git repository.

achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d837c5aa [rpc] KUDU-3514 add metric to track per-RPC timeouts
0d837c5aa is described below

commit 0d837c5aac482fbe46170e26ac45e5ad951eecec
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Sep 26 19:35:58 2023 -0700

    [rpc] KUDU-3514 add metric to track per-RPC timeouts
    
    This patch introduces a new per-RPC counter to accumulate stats on the
    responses that have been sent to the client side past the deadline.
    It should help troubleshooting of particular scenarios and provide
    a glimpse of what part of requests to a particular RPC method timed out
    while being processed.
    
    This patch also includes test scenarios to cover the functionality
    of the newly introduced metric.
    
    Change-Id: I0a3cef63475c26f7936d5be7a0072a72332b7c88
    Reviewed-on: http://gerrit.cloudera.org:8080/20517
    Tested-by: Kudu Jenkins
    Reviewed-by: Zoltan Martonka <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/rpc/inbound_call.cc    |   7 ++
 src/kudu/rpc/protoc-gen-krpc.cc |  11 +++
 src/kudu/rpc/rpc-test-base.h    |   3 +
 src/kudu/rpc/rpc-test.cc        | 194 +++++++++++++++++++++++++++++++++++++++-
 src/kudu/rpc/service_if.h       |   4 +
 5 files changed, 216 insertions(+), 3 deletions(-)

diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index e2c916125..b204eae3e 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -306,6 +306,13 @@ void InboundCall::RecordHandlingCompleted() {
   DCHECK(!timing_.time_completed.Initialized());  // Protect against multiple 
calls.
   timing_.time_completed = MonoTime::Now();
 
+  // If it's now past the deadline defined by the client, the yet-to-be-sent
+  // response is already late and the RPC will be declared as "timed out"
+  // (if detected) when it arrives to the client side.
+  if (method_info_ && timing_.time_completed > deadline_) {
+    method_info_->timed_out_on_response->Increment();
+  }
+
   if (!timing_.time_handled.Initialized()) {
     // Sometimes we respond to a call before we begin handling it (e.g. due to 
queue
     // overflow, etc). These cases should not be counted against the histogram.
diff --git a/src/kudu/rpc/protoc-gen-krpc.cc b/src/kudu/rpc/protoc-gen-krpc.cc
index 2a3b1a763..cbfa87be5 100644
--- a/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/src/kudu/rpc/protoc-gen-krpc.cc
@@ -451,6 +451,14 @@ class CodeGenerator : public 
::google::protobuf::compiler::CodeGenerator {
             "    \"Number of rejected $rpc_full_name$ requests due to RPC 
queue overflow\",\n"
             "    kudu::MetricLevel::kInfo);\n"
             "\n");
+        Print(printer, *subs,
+            "METRIC_DEFINE_counter(server,\n"
+            "    timed_out_on_response_$rpc_full_name_plainchars$,\n"
+            "    \"Late $rpc_full_name$ RPC Responses\",\n"
+            "    kudu::MetricUnit::kRequests,\n"
+            "    \"Number of times $rpc_full_name$ responses were sent past 
the RPC's deadline\",\n"
+            "    kudu::MetricLevel::kInfo);\n"
+            "\n");
         subs->Pop(); // method
       }
 
@@ -508,6 +516,9 @@ class CodeGenerator : public 
::google::protobuf::compiler::CodeGenerator {
             "    mi->queue_overflow_rejections =\n"
             "        
METRIC_queue_overflow_rejections_$rpc_full_name_plainchars$.Instantiate("
             "entity);\n"
+            "    mi->timed_out_on_response =\n"
+            "        
METRIC_timed_out_on_response_$rpc_full_name_plainchars$.Instantiate("
+            "entity);\n"
             "    mi->func = [this](const Message* req, Message* resp, 
RpcContext* ctx) {\n"
             "      this->$rpc_name$(\n"
             "          static_cast<const $request$*>(req),\n"
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 745ece846..97eca8e64 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -422,6 +422,7 @@ class RpcTestBase : public KuduTest {
         n_server_reactor_threads_(3),
         n_worker_threads_(3),
         keepalive_time_ms_(1000),
+        rpc_negotiation_timeout_ms_(3000),
         service_queue_length_(200),
         metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, 
"test.rpc_test")) {
   }
@@ -467,6 +468,7 @@ class RpcTestBase : public KuduTest {
           MonoDelta::FromMilliseconds(std::min(keepalive_time_ms_ / 5, 100)));
     }
     bld.set_metric_entity(metric_entity_);
+    bld.set_rpc_negotiation_timeout_ms(rpc_negotiation_timeout_ms_);
     return bld.Build(messenger);
   }
 
@@ -667,6 +669,7 @@ static void DoTestSidecar(Proxy* p, int size1, int size2) {
   int n_server_reactor_threads_;
   int n_worker_threads_;
   int keepalive_time_ms_;
+  int rpc_negotiation_timeout_ms_;
   int service_queue_length_;
 
   std::string service_name_;
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 67fb75028..b18bc2981 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -56,6 +56,7 @@
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/rtest.pb.h"
 #include "kudu/rpc/serialization.h"
+#include "kudu/rpc/service_pool.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/security/test/test_certs.h"
 #include "kudu/util/countdown_latch.h"
@@ -80,6 +81,7 @@ class AcceptorPool;
 }  // namespace kudu
 
 
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
+METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep);
 
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
@@ -96,11 +98,9 @@ using std::thread;
 using std::unique_ptr;
 using std::unordered_map;
 using std::vector;
-
-namespace kudu {
-
 using strings::Substitute;
 
+namespace kudu {
 namespace rpc {
 
 // RPC proxies require a hostname to be passed. In this test we're just 
connecting to
@@ -1234,6 +1234,194 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
   ASSERT_TRUE(FindOrDie(metric_map, &METRIC_rpc_incoming_queue_time));
 }
 
+// Set of basic test scenarios for the per-RPC 'timed_out_on_response' metric.
+TEST_P(TestRpc, TimedOutOnResponseMetric) {
+  constexpr uint64_t kSleepMicros = 20 * 1000;
+  const string kMethodName = "Sleep";
+
+  // Set RPC connection negotiation timeout to be very high to avoid flakiness
+  // if name resolution is very slow.
+  rpc_negotiation_timeout_ms_ = 60 * 1000;
+
+  Sockaddr server_addr = bind_addr();
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
+
+  shared_ptr<Messenger> cm;
+  ASSERT_OK(CreateMessenger("client", &cm, 1/*n_reactors*/, enable_ssl()));
+  Proxy p(cm, server_addr, kRemoteHostName, 
CalculatorService::static_service_name());
+
+  // Get references to the metrics map and a couple of relevant metrics.
+  const auto& mm = 
server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
+  const auto* latency_histogram = down_cast<Histogram*>(FindOrDie(
+      mm, 
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
+  const auto* timed_out_on_response = down_cast<Counter*>(FindOrDie(
+      mm, 
&METRIC_timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep).get());
+  const auto* timed_out_in_queue = 
service_pool_->RpcsTimedOutInQueueMetricForTests();
+
+  ASSERT_EQ(0, latency_histogram->TotalCount());
+  ASSERT_EQ(0, timed_out_on_response->value());
+  ASSERT_EQ(0, timed_out_in_queue->value());
+
+  // Make a dry-run call to avoid flakiness in this test scenario if name
+  // resolution is slow. This primes the DNS resolver and its cache.
+  {
+    SleepRequestPB req;
+    req.set_sleep_micros(kSleepMicros);
+    SleepResponsePB resp;
+    RpcController ctl;
+    ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+    ASSERT_EQ(1, latency_histogram->TotalCount());
+    ASSERT_EQ(0, timed_out_on_response->value());
+  }
+
+  // Run the sequence of sub-scenarios where the requests successfully complete
+  // at the server side, but the client might mark them as timed.
+  SleepRequestPB req;
+  req.set_sleep_micros(kSleepMicros);
+
+  // Client side doesn't set the timeout for the RPC at all.
+  {
+    RpcController ctl;
+    SleepResponsePB resp;
+    ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+    ASSERT_EQ(2, latency_histogram->TotalCount());
+    ASSERT_EQ(0, timed_out_on_response->value());
+  }
+
+  // Set the timeout for the RPC much higher than the request would take
+  // to process (assuming scheduler anomalies does not spill over 30 seconds).
+  {
+    RpcController ctl;
+    ctl.set_timeout(MonoDelta::FromSeconds(30));
+    SleepResponsePB resp;
+    ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+    ASSERT_EQ(3, latency_histogram->TotalCount());
+    ASSERT_EQ(0, timed_out_on_response->value());
+  }
+
+  // Set the timeout for the RPC to the minimum recognizable by the server
+  // side value greater than 0.
+  {
+    RpcController ctl;
+    ctl.set_timeout(MonoDelta::FromMilliseconds(1));
+    SleepResponsePB resp;
+    const auto s = p.SyncRequest(kMethodName, req, &resp, &ctl);
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Timed out: Sleep RPC");
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(4, latency_histogram->TotalCount());
+    });
+    ASSERT_EQ(1, timed_out_on_response->value());
+  }
+
+  // Set the timeout for the RPC to be close to the time it takes to process
+  // the request.
+  {
+    RpcController ctl;
+    ctl.set_timeout(MonoDelta::FromMicroseconds(kSleepMicros));
+    SleepResponsePB resp;
+    const auto s = p.SyncRequest(kMethodName, req, &resp, &ctl);
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Timed out: Sleep RPC");
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(5, latency_histogram->TotalCount());
+    });
+    ASSERT_EQ(2, timed_out_on_response->value());
+  }
+
+  // Not a single RPC times out in the service queue -- all the RPCs have been
+  // successfully processed by the server, just a couple of them were responded
+  // after the client-defined deadline.
+  ASSERT_EQ(0, timed_out_in_queue->value());
+}
+
+// A special scenario for the per-RPC 'timed_out_on_response' metric when an
+// RPC times out while waiting in the queue, so it's not actually processed.
+TEST_P(TestRpc, TimedOutOnResponseMetricServiceQueue) {
+  constexpr uint64_t kSleepMicros = 20 * 1000;
+  const string kMethodName = "Sleep";
+
+  // Set RPC connection negotiation timeout to be very high to avoid flakiness
+  // if name resolution is very slow.
+  rpc_negotiation_timeout_ms_ = 60 * 1000;
+
+  // Limit the capacity of the service's thread pool, so requests are processed
+  // sequentially by a single worker thread.
+  n_worker_threads_ = 1;
+
+  Sockaddr server_addr = bind_addr();
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, enable_ssl()));
+
+  shared_ptr<Messenger> cm;
+  ASSERT_OK(CreateMessenger("client", &cm, 1/*n_reactors*/, enable_ssl()));
+  Proxy p(cm, server_addr, kRemoteHostName, 
CalculatorService::static_service_name());
+
+  // Get the reference to the metrics map and create handles to the needed 
metrics.
+  const auto& mm = 
server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
+
+  const auto* latency_histogram = down_cast<Histogram*>(FindOrDie(
+      mm, 
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
+  const auto* timed_out_on_response = down_cast<Counter*>(FindOrDie(
+      mm, 
&METRIC_timed_out_on_response_kudu_rpc_test_CalculatorService_Sleep).get());
+  const auto* timed_out_in_queue = 
service_pool_->RpcsTimedOutInQueueMetricForTests();
+
+  // In the beginning, all the related metrics should be zeroed.
+  ASSERT_EQ(0, latency_histogram->TotalCount());
+  ASSERT_EQ(0, timed_out_on_response->value());
+  ASSERT_EQ(0, timed_out_in_queue->value());
+
+  // Make a dry-run call to avoid flakiness in this test scenario if name
+  // resolution is slow. This primes the DNS resolver and its cache.
+  {
+    SleepRequestPB req;
+    req.set_sleep_micros(kSleepMicros);
+    SleepResponsePB resp;
+    RpcController ctl;
+    ASSERT_OK(p.SyncRequest(kMethodName, req, &resp, &ctl));
+  }
+  ASSERT_EQ(1, latency_histogram->TotalCount());
+  ASSERT_EQ(0, timed_out_on_response->value());
+  ASSERT_EQ(0, timed_out_in_queue->value());
+
+  CountDownLatch latch(2);
+
+  // The first RPC should be successful: sleep for the specified time.
+  SleepRequestPB req0;
+  req0.set_sleep_micros(kSleepMicros);
+  SleepResponsePB resp0;
+  RpcController ctl0;
+  p.AsyncRequest(kMethodName, req0, &resp0, &ctl0,
+                 [&latch]() { latch.CountDown(); });
+
+  // The second RPC should wait in the RPC queue while the first is being
+  // processed by the only thread in the RPC service thread pool.  Eventually,
+  // it should time out.
+  SleepRequestPB req1;
+  req1.set_return_app_error(true);
+  SleepResponsePB resp1;
+  RpcController ctl1;
+  ctl1.set_timeout(MonoDelta::FromMicroseconds(kSleepMicros));
+  p.AsyncRequest(kMethodName, req1, &resp1, &ctl1,
+                 [&latch]() { latch.CountDown(); });
+
+  // Wait for the completion of both requests sent asynchronously above.
+  latch.Wait();
+
+  // The have been 3 requests total in this scenario.
+  ASSERT_EQ(3, latency_histogram->TotalCount());
+
+  // The first RPC should return OK.
+  ASSERT_OK(ctl0.status());
+
+  // The second RPC should time out while waiting in the queue
+  // and the corresponding metrics should be incremented.
+  const auto& s = ctl1.status();
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Timed out: Sleep RPC");
+  ASSERT_EQ(1, timed_out_on_response->value());
+  ASSERT_EQ(1, timed_out_in_queue->value());
+}
+
 static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
                                      CountDownLatch* latch) {
   messenger->reset();
diff --git a/src/kudu/rpc/service_if.h b/src/kudu/rpc/service_if.h
index 185e29191..bb378b617 100644
--- a/src/kudu/rpc/service_if.h
+++ b/src/kudu/rpc/service_if.h
@@ -49,6 +49,10 @@ struct RpcMethodInfo : public 
RefCountedThreadSafe<RpcMethodInfo> {
   scoped_refptr<Histogram> handler_latency_histogram;
   scoped_refptr<Counter> queue_overflow_rejections;
 
+  // The number of times the service sent back a response (both success and
+  // failure responses are counted in) past the deadline set by the client 
side.
+  scoped_refptr<Counter> timed_out_on_response;
+
   // Whether we should track this method's result, using ResultTracker.
   bool track_result;
 

Reply via email to