This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 924244c  [rpc] per-method counter of dropped RPC requests
924244c is described below

commit 924244c303edd2fc4917204fa0a30163feea0f99
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jul 28 20:03:12 2020 -0700

    [rpc] per-method counter of dropped RPC requests
    
    This patch adds a new metric: number of RPC requests of a particular
    type dropped due to RPC service queue overflow.  It also contains
    a test to verify the newly introduced functionality.  In addition,
    I did other minor cleanup of the related code.
    
    Change-Id: I5bdbdaee726ab3ec7c74228734014cfa884e1e24
    Reviewed-on: http://gerrit.cloudera.org:8080/16250
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/rpc/mt-rpc-test.cc     | 76 +++++++++++++++++++++++++++++++++++++++++
 src/kudu/rpc/protoc-gen-krpc.cc | 18 +++++++---
 src/kudu/rpc/rpc-test-base.h    | 31 +++++++++--------
 src/kudu/rpc/rpc-test.cc        | 10 ++++--
 src/kudu/rpc/rpc_service.h      |  8 +++--
 src/kudu/rpc/service_if.h       |  3 +-
 src/kudu/rpc/service_pool.cc    |  5 +++
 src/kudu/rpc/service_pool.h     |  4 +--
 8 files changed, 126 insertions(+), 29 deletions(-)

diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc
index d16c761..09bda4b 100644
--- a/src/kudu/rpc/mt-rpc-test.cc
+++ b/src/kudu/rpc/mt-rpc-test.cc
@@ -32,9 +32,12 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/proxy.h"
 #include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/rtest.pb.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_pool.h"
+#include "kudu/util/barrier.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -44,6 +47,8 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 
+METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Add);
+METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_counter(rpc_connections_accepted);
 METRIC_DECLARE_counter(rpcs_queue_overflow);
 
@@ -263,6 +268,77 @@ TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
   ASSERT_EQ(1, rpcs_queue_overflow->value());
 }
 
+TEST_F(MultiThreadedRpcTest, PerMethodQueueOverflowRejectionCounter) {
+  n_acceptor_pool_threads_ = 1;
+  n_server_reactor_threads_ = 1;
+  n_worker_threads_ = 1;
+  service_queue_length_ = 1;
+
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, false 
/*enable_ssl*/));
+
+  auto* rpcs_queue_overflow = METRIC_rpcs_queue_overflow.Instantiate(
+      server_messenger_->metric_entity()).get();
+  ASSERT_EQ(0, rpcs_queue_overflow->value());
+
+  auto* queue_overflow_rejections_add =
+      
METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Add.Instantiate(
+          server_messenger_->metric_entity()).get();
+  ASSERT_EQ(0, queue_overflow_rejections_add->value());
+
+  auto* queue_overflow_rejections_sleep =
+      
METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep.Instantiate(
+          server_messenger_->metric_entity()).get();
+  ASSERT_EQ(0, queue_overflow_rejections_sleep->value());
+
+  // It seems that even in case of scheduling anomalies, 16 concurrent
+  // requests should be enough to overflow the RPC service queue of size 1,
+  // where the queue is served by a single worker thread and it takes at least
+  // 100ms for a request to complete.
+  constexpr size_t kNumThreads = 16;
+  vector<thread> threads;
+  threads.reserve(kNumThreads);
+  vector<Status> status(kNumThreads);
+  Barrier barrier(kNumThreads);
+  for (size_t i = 0; i < kNumThreads; ++i) {
+    const size_t idx = i;
+    threads.emplace_back([&, idx]() {
+      shared_ptr<Messenger> client_messenger;
+      CHECK_OK(CreateMessenger("ClientMessenger", &client_messenger));
+      Proxy p(client_messenger, server_addr, server_addr.host(),
+              CalculatorService::static_service_name());
+      SleepRequestPB req;
+      req.set_sleep_micros(100 * 1000); // 100ms
+      SleepResponsePB resp;
+      RpcController controller;
+      controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+      barrier.Wait();
+      status[idx] = p.SyncRequest(
+          GenericCalculatorService::kSleepMethodName, req, &resp, &controller);
+    });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  size_t num_errors = 0;
+  for (const auto& s : status) {
+    if (!s.ok()) {
+      ++num_errors;
+    }
+  }
+
+  // Check that there were some RPC queue overflows.
+  const auto queue_overflow_num = rpcs_queue_overflow->value();
+  ASSERT_GT(num_errors, 0);
+  ASSERT_EQ(num_errors, queue_overflow_num);
+
+  // Check corresponding per-method rejection counters.
+  ASSERT_EQ(0, queue_overflow_rejections_add->value());
+  ASSERT_EQ(queue_overflow_num, queue_overflow_rejections_sleep->value());
+}
+
 static void HammerServerWithTCPConns(const Sockaddr& addr) {
   while (true) {
     Socket socket;
diff --git a/src/kudu/rpc/protoc-gen-krpc.cc b/src/kudu/rpc/protoc-gen-krpc.cc
index 840e549..8c64e2a 100644
--- a/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/src/kudu/rpc/protoc-gen-krpc.cc
@@ -268,7 +268,7 @@ class CodeGenerator : public 
::google::protobuf::compiler::CodeGenerator {
         const string&/* parameter */,
         google::protobuf::compiler::GeneratorContext* gen_context,
         string* error) const override {
-    unique_ptr<FileSubstitutions> name_info(new FileSubstitutions());
+    unique_ptr<FileSubstitutions> name_info(new FileSubstitutions);
     bool ret = name_info->Init(file, error);
     if (!ret) {
       return false;
@@ -439,6 +439,13 @@ class CodeGenerator : public 
::google::protobuf::compiler::CodeGenerator {
           "  kudu::MetricLevel::kInfo,\n"
           "  60000000LU, 2);\n"
           "\n");
+        Print(printer, *subs,
+          "METRIC_DEFINE_counter(server, 
queue_overflow_rejections_$rpc_full_name_plainchars$,\n"
+          "  \"$rpc_full_name$ RPC Rejections\",\n"
+          "  kudu::MetricUnit::kRequests,\n"
+          "  \"Number of rejected $rpc_full_name$() requests due to RPC queue 
overflow\",\n"
+          "  kudu::MetricLevel::kInfo);\n"
+          "\n");
         subs->Pop(); // method
       }
 
@@ -473,9 +480,9 @@ class CodeGenerator : public 
::google::protobuf::compiler::CodeGenerator {
 
         Print(printer, *subs,
               "  {\n"
-              "    scoped_refptr<RpcMethodInfo> mi(new RpcMethodInfo());\n"
-              "    mi->req_prototype.reset(new $request$());\n"
-              "    mi->resp_prototype.reset(new $response$());\n"
+              "    scoped_refptr<RpcMethodInfo> mi(new RpcMethodInfo);\n"
+              "    mi->req_prototype.reset(new $request$);\n"
+              "    mi->resp_prototype.reset(new $response$);\n"
               "    mi->authz_method = [this](const Message* req, Message* 
resp,\n"
               "                              RpcContext* ctx) {\n"
               "      return this->$authz_method$(static_cast<const 
$request$*>(req),\n"
@@ -485,6 +492,9 @@ class CodeGenerator : public 
::google::protobuf::compiler::CodeGenerator {
               "    mi->track_result = $track_result$;\n"
               "    mi->handler_latency_histogram =\n"
               "        
METRIC_handler_latency_$rpc_full_name_plainchars$.Instantiate(entity);\n"
+              "    mi->queue_overflow_rejections =\n"
+              "        
METRIC_queue_overflow_rejections_$rpc_full_name_plainchars$.Instantiate(\n"
+              "            entity);\n"
               "    mi->func = [this](const Message* req, Message* resp, 
RpcContext* ctx) {\n"
               "      this->$rpc_name$(static_cast<const $request$*>(req),\n"
               "                       static_cast<$response$*>(resp),\n"
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index fb88a79..3bfb391 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -415,11 +415,12 @@ const char *GenericCalculatorService::kSecondString =
 class RpcTestBase : public KuduTest {
  public:
   RpcTestBase()
-    : n_worker_threads_(3),
-      service_queue_length_(200),
-      n_server_reactor_threads_(3),
-      keepalive_time_ms_(1000),
-      metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, 
"test.rpc_test")) {
+      : n_acceptor_pool_threads_(2),
+        n_server_reactor_threads_(3),
+        n_worker_threads_(3),
+        keepalive_time_ms_(1000),
+        service_queue_length_(200),
+        metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, 
"test.rpc_test")) {
   }
 
   void TearDown() override {
@@ -624,7 +625,7 @@ class RpcTestBase : public KuduTest {
   }
 
   template<class ServiceClass>
-  Status DoStartTestServer(Sockaddr *server_addr,
+  Status DoStartTestServer(Sockaddr* server_addr,
                            bool enable_ssl = false,
                            const std::string& rpc_certificate_file = "",
                            const std::string& rpc_private_key_file = "",
@@ -645,7 +646,7 @@ class RpcTestBase : public KuduTest {
     }
     std::shared_ptr<AcceptorPool> pool;
     RETURN_NOT_OK(server_messenger_->AddAcceptorPool(*server_addr, &pool));
-    RETURN_NOT_OK(pool->Start(2));
+    RETURN_NOT_OK(pool->Start(n_acceptor_pool_threads_));
     *server_addr = pool->bind_address();
     mem_tracker_ = MemTracker::CreateTracker(-1, "result_tracker");
     result_tracker_.reset(new ResultTracker(mem_tracker_));
@@ -654,22 +655,22 @@ class RpcTestBase : public KuduTest {
     service_name_ = service->service_name();
     scoped_refptr<MetricEntity> metric_entity = 
server_messenger_->metric_entity();
     service_pool_ = new ServicePool(std::move(service), metric_entity, 
service_queue_length_);
-    server_messenger_->RegisterService(service_name_, service_pool_);
-    RETURN_NOT_OK(service_pool_->Init(n_worker_threads_));
-
-    return Status::OK();
+    RETURN_NOT_OK(server_messenger_->RegisterService(service_name_, 
service_pool_));
+    return service_pool_->Init(n_worker_threads_);
   }
 
  protected:
+  int n_acceptor_pool_threads_;
+  int n_server_reactor_threads_;
+  int n_worker_threads_;
+  int keepalive_time_ms_;
+  int service_queue_length_;
+
   std::string service_name_;
   std::shared_ptr<Messenger> server_messenger_;
   scoped_refptr<ServicePool> service_pool_;
   std::shared_ptr<kudu::MemTracker> mem_tracker_;
   scoped_refptr<ResultTracker> result_tracker_;
-  int n_worker_threads_;
-  int service_queue_length_;
-  int n_server_reactor_threads_;
-  int keepalive_time_ms_;
 
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 7415877..92279f2 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -71,6 +71,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
 
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
@@ -1154,8 +1155,7 @@ TEST_P(TestRpc, TestServerShutsDown) {
 
 // Test handler latency metric.
 TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
-
-  const uint64_t sleep_micros = 20 * 1000;
+  constexpr uint64_t sleep_micros = 20 * 1000;
 
   // Set up server.
   Sockaddr server_addr = bind_addr();
@@ -1177,9 +1177,12 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
   const unordered_map<const MetricPrototype*, scoped_refptr<Metric> > 
metric_map =
     server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
 
-  scoped_refptr<Histogram> latency_histogram = down_cast<Histogram *>(
+  scoped_refptr<Histogram> latency_histogram = down_cast<Histogram*>(
       FindOrDie(metric_map,
                 
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
+  scoped_refptr<Counter> queue_overflow_rejections = down_cast<Counter*>(
+      FindOrDie(metric_map,
+                
&METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep).get());
 
   LOG(INFO) << "Sleep() min lat: " << latency_histogram->MinValueForTests();
   LOG(INFO) << "Sleep() mean lat: " << latency_histogram->MeanValueForTests();
@@ -1187,6 +1190,7 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
   LOG(INFO) << "Sleep() #calls: " << latency_histogram->TotalCount();
 
   ASSERT_EQ(1, latency_histogram->TotalCount());
+  ASSERT_EQ(0, queue_overflow_rejections->value());
   ASSERT_GE(latency_histogram->MaxValueForTests(), sleep_micros);
   ASSERT_TRUE(latency_histogram->MinValueForTests() == 
latency_histogram->MaxValueForTests());
 
diff --git a/src/kudu/rpc/rpc_service.h b/src/kudu/rpc/rpc_service.h
index 42decc7..82bbcd0 100644
--- a/src/kudu/rpc/rpc_service.h
+++ b/src/kudu/rpc/rpc_service.h
@@ -37,9 +37,11 @@ class RpcService : public RefCountedThreadSafe<RpcService> {
   // responsible for responding to the client with a failure message.
   virtual Status QueueInboundCall(std::unique_ptr<InboundCall> call) = 0;
 
-  virtual RpcMethodInfo* LookupMethod(const RemoteMethod& /*method*/) {
-    return nullptr;
-  }
+  // Look up RPC method information.
+  //
+  // If this returns nullptr, then certain functionality like
+  // metrics collection will not be performed for this call.
+  virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) = 0;
 };
 
 } // namespace rpc
diff --git a/src/kudu/rpc/service_if.h b/src/kudu/rpc/service_if.h
index 686580e..2fc5f09 100644
--- a/src/kudu/rpc/service_if.h
+++ b/src/kudu/rpc/service_if.h
@@ -47,6 +47,7 @@ struct RpcMethodInfo : public 
RefCountedThreadSafe<RpcMethodInfo> {
   std::unique_ptr<google::protobuf::Message> resp_prototype;
 
   scoped_refptr<Histogram> handler_latency_histogram;
+  scoped_refptr<Counter> queue_overflow_rejections;
 
   // Whether we should track this method's result, using ResultTracker.
   bool track_result;
@@ -80,7 +81,7 @@ class ServiceIf {
   //
   // If this returns nullptr, then certain functionality like
   // metrics collection will not be performed for this call.
-  virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) {
+  virtual RpcMethodInfo* LookupMethod(const RemoteMethod& /*method*/) {
     return nullptr;
   }
 
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index 0e7d5b8..a510937 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -28,6 +28,7 @@
 #include <glog/logging.h>
 
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -128,6 +129,10 @@ void ServicePool::RejectTooBusy(InboundCall* c) {
                  c->remote_address().ToString(),
                  service_queue_.max_size());
   rpcs_queue_overflow_->Increment();
+  auto* minfo = c->method_info();
+  if (minfo) {
+    minfo->queue_overflow_rejections->Increment();
+  }
   KLOG_EVERY_N_SECS(WARNING, 1) << err_msg << THROTTLE_MSG;
   c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
                     Status::ServiceUnavailable(err_msg));
diff --git a/src/kudu/rpc/service_pool.h b/src/kudu/rpc/service_pool.h
index 3e7229a..a6a3da4 100644
--- a/src/kudu/rpc/service_pool.h
+++ b/src/kudu/rpc/service_pool.h
@@ -24,7 +24,6 @@
 #include <vector>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/rpc_service.h"
 #include "kudu/rpc/service_queue.h"
@@ -43,7 +42,6 @@ namespace rpc {
 class InboundCall;
 class RemoteMethod;
 class ServiceIf;
-
 struct RpcMethodInfo;
 
 // A pool of threads that handle new incoming RPC calls.
@@ -74,7 +72,7 @@ class ServicePool : public RpcService {
 
   RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
 
-  virtual Status QueueInboundCall(std::unique_ptr<InboundCall> call) OVERRIDE;
+  Status QueueInboundCall(std::unique_ptr<InboundCall> call) override;
 
   const Counter* RpcsTimedOutInQueueMetricForTests() const {
     return rpcs_timed_out_in_queue_.get();

Reply via email to