This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 924244c [rpc] per-method counter of dropped RPC requests
924244c is described below
commit 924244c303edd2fc4917204fa0a30163feea0f99
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Jul 28 20:03:12 2020 -0700
[rpc] per-method counter of dropped RPC requests
This patch adds a new metric: number of RPC requests of a particular
type dropped due to RPC service queue overflow. It also contains
a test to verify the newly introduced functionality. In addition,
I did other minor cleanup of the related code.
Change-Id: I5bdbdaee726ab3ec7c74228734014cfa884e1e24
Reviewed-on: http://gerrit.cloudera.org:8080/16250
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
---
src/kudu/rpc/mt-rpc-test.cc | 76 +++++++++++++++++++++++++++++++++++++++++
src/kudu/rpc/protoc-gen-krpc.cc | 18 +++++++---
src/kudu/rpc/rpc-test-base.h | 31 +++++++++--------
src/kudu/rpc/rpc-test.cc | 10 ++++--
src/kudu/rpc/rpc_service.h | 8 +++--
src/kudu/rpc/service_if.h | 3 +-
src/kudu/rpc/service_pool.cc | 5 +++
src/kudu/rpc/service_pool.h | 4 +--
8 files changed, 126 insertions(+), 29 deletions(-)
diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc
index d16c761..09bda4b 100644
--- a/src/kudu/rpc/mt-rpc-test.cc
+++ b/src/kudu/rpc/mt-rpc-test.cc
@@ -32,9 +32,12 @@
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/proxy.h"
#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/rtest.pb.h"
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/service_pool.h"
+#include "kudu/util/barrier.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
@@ -44,6 +47,8 @@
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
+METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Add);
+METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_counter(rpc_connections_accepted);
METRIC_DECLARE_counter(rpcs_queue_overflow);
@@ -263,6 +268,77 @@ TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
ASSERT_EQ(1, rpcs_queue_overflow->value());
}
+TEST_F(MultiThreadedRpcTest, PerMethodQueueOverflowRejectionCounter) {
+ n_acceptor_pool_threads_ = 1;
+ n_server_reactor_threads_ = 1;
+ n_worker_threads_ = 1;
+ service_queue_length_ = 1;
+
+ Sockaddr server_addr;
+ ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, false
/*enable_ssl*/));
+
+ auto* rpcs_queue_overflow = METRIC_rpcs_queue_overflow.Instantiate(
+ server_messenger_->metric_entity()).get();
+ ASSERT_EQ(0, rpcs_queue_overflow->value());
+
+ auto* queue_overflow_rejections_add =
+
METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Add.Instantiate(
+ server_messenger_->metric_entity()).get();
+ ASSERT_EQ(0, queue_overflow_rejections_add->value());
+
+ auto* queue_overflow_rejections_sleep =
+
METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep.Instantiate(
+ server_messenger_->metric_entity()).get();
+ ASSERT_EQ(0, queue_overflow_rejections_sleep->value());
+
+ // It seems that even in case of scheduling anomalies, 16 concurrent
+ // requests should be enough to overflow the RPC service queue of size 1,
+ // where the queue is served by a single worker thread and it takes at least
+ // 100ms for a request to complete.
+ constexpr size_t kNumThreads = 16;
+ vector<thread> threads;
+ threads.reserve(kNumThreads);
+ vector<Status> status(kNumThreads);
+ Barrier barrier(kNumThreads);
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ const size_t idx = i;
+ threads.emplace_back([&, idx]() {
+ shared_ptr<Messenger> client_messenger;
+ CHECK_OK(CreateMessenger("ClientMessenger", &client_messenger));
+ Proxy p(client_messenger, server_addr, server_addr.host(),
+ CalculatorService::static_service_name());
+ SleepRequestPB req;
+ req.set_sleep_micros(100 * 1000); // 100ms
+ SleepResponsePB resp;
+ RpcController controller;
+ controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+ barrier.Wait();
+ status[idx] = p.SyncRequest(
+ GenericCalculatorService::kSleepMethodName, req, &resp, &controller);
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ size_t num_errors = 0;
+ for (const auto& s : status) {
+ if (!s.ok()) {
+ ++num_errors;
+ }
+ }
+
+ // Check that there were some RPC queue overflows.
+ const auto queue_overflow_num = rpcs_queue_overflow->value();
+ ASSERT_GT(num_errors, 0);
+ ASSERT_EQ(num_errors, queue_overflow_num);
+
+ // Check corresponding per-method rejection counters.
+ ASSERT_EQ(0, queue_overflow_rejections_add->value());
+ ASSERT_EQ(queue_overflow_num, queue_overflow_rejections_sleep->value());
+}
+
static void HammerServerWithTCPConns(const Sockaddr& addr) {
while (true) {
Socket socket;
diff --git a/src/kudu/rpc/protoc-gen-krpc.cc b/src/kudu/rpc/protoc-gen-krpc.cc
index 840e549..8c64e2a 100644
--- a/src/kudu/rpc/protoc-gen-krpc.cc
+++ b/src/kudu/rpc/protoc-gen-krpc.cc
@@ -268,7 +268,7 @@ class CodeGenerator : public
::google::protobuf::compiler::CodeGenerator {
const string&/* parameter */,
google::protobuf::compiler::GeneratorContext* gen_context,
string* error) const override {
- unique_ptr<FileSubstitutions> name_info(new FileSubstitutions());
+ unique_ptr<FileSubstitutions> name_info(new FileSubstitutions);
bool ret = name_info->Init(file, error);
if (!ret) {
return false;
@@ -439,6 +439,13 @@ class CodeGenerator : public
::google::protobuf::compiler::CodeGenerator {
" kudu::MetricLevel::kInfo,\n"
" 60000000LU, 2);\n"
"\n");
+ Print(printer, *subs,
+ "METRIC_DEFINE_counter(server,
queue_overflow_rejections_$rpc_full_name_plainchars$,\n"
+ " \"$rpc_full_name$ RPC Rejections\",\n"
+ " kudu::MetricUnit::kRequests,\n"
+ " \"Number of rejected $rpc_full_name$() requests due to RPC queue
overflow\",\n"
+ " kudu::MetricLevel::kInfo);\n"
+ "\n");
subs->Pop(); // method
}
@@ -473,9 +480,9 @@ class CodeGenerator : public
::google::protobuf::compiler::CodeGenerator {
Print(printer, *subs,
" {\n"
- " scoped_refptr<RpcMethodInfo> mi(new RpcMethodInfo());\n"
- " mi->req_prototype.reset(new $request$());\n"
- " mi->resp_prototype.reset(new $response$());\n"
+ " scoped_refptr<RpcMethodInfo> mi(new RpcMethodInfo);\n"
+ " mi->req_prototype.reset(new $request$);\n"
+ " mi->resp_prototype.reset(new $response$);\n"
" mi->authz_method = [this](const Message* req, Message*
resp,\n"
" RpcContext* ctx) {\n"
" return this->$authz_method$(static_cast<const
$request$*>(req),\n"
@@ -485,6 +492,9 @@ class CodeGenerator : public
::google::protobuf::compiler::CodeGenerator {
" mi->track_result = $track_result$;\n"
" mi->handler_latency_histogram =\n"
"
METRIC_handler_latency_$rpc_full_name_plainchars$.Instantiate(entity);\n"
+ " mi->queue_overflow_rejections =\n"
+ "
METRIC_queue_overflow_rejections_$rpc_full_name_plainchars$.Instantiate(\n"
+ " entity);\n"
" mi->func = [this](const Message* req, Message* resp,
RpcContext* ctx) {\n"
" this->$rpc_name$(static_cast<const $request$*>(req),\n"
" static_cast<$response$*>(resp),\n"
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index fb88a79..3bfb391 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -415,11 +415,12 @@ const char *GenericCalculatorService::kSecondString =
class RpcTestBase : public KuduTest {
public:
RpcTestBase()
- : n_worker_threads_(3),
- service_queue_length_(200),
- n_server_reactor_threads_(3),
- keepalive_time_ms_(1000),
- metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
"test.rpc_test")) {
+ : n_acceptor_pool_threads_(2),
+ n_server_reactor_threads_(3),
+ n_worker_threads_(3),
+ keepalive_time_ms_(1000),
+ service_queue_length_(200),
+ metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
"test.rpc_test")) {
}
void TearDown() override {
@@ -624,7 +625,7 @@ class RpcTestBase : public KuduTest {
}
template<class ServiceClass>
- Status DoStartTestServer(Sockaddr *server_addr,
+ Status DoStartTestServer(Sockaddr* server_addr,
bool enable_ssl = false,
const std::string& rpc_certificate_file = "",
const std::string& rpc_private_key_file = "",
@@ -645,7 +646,7 @@ class RpcTestBase : public KuduTest {
}
std::shared_ptr<AcceptorPool> pool;
RETURN_NOT_OK(server_messenger_->AddAcceptorPool(*server_addr, &pool));
- RETURN_NOT_OK(pool->Start(2));
+ RETURN_NOT_OK(pool->Start(n_acceptor_pool_threads_));
*server_addr = pool->bind_address();
mem_tracker_ = MemTracker::CreateTracker(-1, "result_tracker");
result_tracker_.reset(new ResultTracker(mem_tracker_));
@@ -654,22 +655,22 @@ class RpcTestBase : public KuduTest {
service_name_ = service->service_name();
scoped_refptr<MetricEntity> metric_entity =
server_messenger_->metric_entity();
service_pool_ = new ServicePool(std::move(service), metric_entity,
service_queue_length_);
- server_messenger_->RegisterService(service_name_, service_pool_);
- RETURN_NOT_OK(service_pool_->Init(n_worker_threads_));
-
- return Status::OK();
+ RETURN_NOT_OK(server_messenger_->RegisterService(service_name_,
service_pool_));
+ return service_pool_->Init(n_worker_threads_);
}
protected:
+ int n_acceptor_pool_threads_;
+ int n_server_reactor_threads_;
+ int n_worker_threads_;
+ int keepalive_time_ms_;
+ int service_queue_length_;
+
std::string service_name_;
std::shared_ptr<Messenger> server_messenger_;
scoped_refptr<ServicePool> service_pool_;
std::shared_ptr<kudu::MemTracker> mem_tracker_;
scoped_refptr<ResultTracker> result_tracker_;
- int n_worker_threads_;
- int service_queue_length_;
- int n_server_reactor_threads_;
- int keepalive_time_ms_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 7415877..92279f2 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -71,6 +71,7 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
@@ -1154,8 +1155,7 @@ TEST_P(TestRpc, TestServerShutsDown) {
// Test handler latency metric.
TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
-
- const uint64_t sleep_micros = 20 * 1000;
+ constexpr uint64_t sleep_micros = 20 * 1000;
// Set up server.
Sockaddr server_addr = bind_addr();
@@ -1177,9 +1177,12 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
const unordered_map<const MetricPrototype*, scoped_refptr<Metric> >
metric_map =
server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
- scoped_refptr<Histogram> latency_histogram = down_cast<Histogram *>(
+ scoped_refptr<Histogram> latency_histogram = down_cast<Histogram*>(
FindOrDie(metric_map,
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
+ scoped_refptr<Counter> queue_overflow_rejections = down_cast<Counter*>(
+ FindOrDie(metric_map,
+
&METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep).get());
LOG(INFO) << "Sleep() min lat: " << latency_histogram->MinValueForTests();
LOG(INFO) << "Sleep() mean lat: " << latency_histogram->MeanValueForTests();
@@ -1187,6 +1190,7 @@ TEST_P(TestRpc, TestRpcHandlerLatencyMetric) {
LOG(INFO) << "Sleep() #calls: " << latency_histogram->TotalCount();
ASSERT_EQ(1, latency_histogram->TotalCount());
+ ASSERT_EQ(0, queue_overflow_rejections->value());
ASSERT_GE(latency_histogram->MaxValueForTests(), sleep_micros);
ASSERT_TRUE(latency_histogram->MinValueForTests() ==
latency_histogram->MaxValueForTests());
diff --git a/src/kudu/rpc/rpc_service.h b/src/kudu/rpc/rpc_service.h
index 42decc7..82bbcd0 100644
--- a/src/kudu/rpc/rpc_service.h
+++ b/src/kudu/rpc/rpc_service.h
@@ -37,9 +37,11 @@ class RpcService : public RefCountedThreadSafe<RpcService> {
// responsible for responding to the client with a failure message.
virtual Status QueueInboundCall(std::unique_ptr<InboundCall> call) = 0;
- virtual RpcMethodInfo* LookupMethod(const RemoteMethod& /*method*/) {
- return nullptr;
- }
+ // Look up RPC method information.
+ //
+ // If this returns nullptr, then certain functionality like
+ // metrics collection will not be performed for this call.
+ virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) = 0;
};
} // namespace rpc
diff --git a/src/kudu/rpc/service_if.h b/src/kudu/rpc/service_if.h
index 686580e..2fc5f09 100644
--- a/src/kudu/rpc/service_if.h
+++ b/src/kudu/rpc/service_if.h
@@ -47,6 +47,7 @@ struct RpcMethodInfo : public
RefCountedThreadSafe<RpcMethodInfo> {
std::unique_ptr<google::protobuf::Message> resp_prototype;
scoped_refptr<Histogram> handler_latency_histogram;
+ scoped_refptr<Counter> queue_overflow_rejections;
// Whether we should track this method's result, using ResultTracker.
bool track_result;
@@ -80,7 +81,7 @@ class ServiceIf {
//
// If this returns nullptr, then certain functionality like
// metrics collection will not be performed for this call.
- virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) {
+ virtual RpcMethodInfo* LookupMethod(const RemoteMethod& /*method*/) {
return nullptr;
}
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index 0e7d5b8..a510937 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -28,6 +28,7 @@
#include <glog/logging.h>
#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
@@ -128,6 +129,10 @@ void ServicePool::RejectTooBusy(InboundCall* c) {
c->remote_address().ToString(),
service_queue_.max_size());
rpcs_queue_overflow_->Increment();
+ auto* minfo = c->method_info();
+ if (minfo) {
+ minfo->queue_overflow_rejections->Increment();
+ }
KLOG_EVERY_N_SECS(WARNING, 1) << err_msg << THROTTLE_MSG;
c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
Status::ServiceUnavailable(err_msg));
diff --git a/src/kudu/rpc/service_pool.h b/src/kudu/rpc/service_pool.h
index 3e7229a..a6a3da4 100644
--- a/src/kudu/rpc/service_pool.h
+++ b/src/kudu/rpc/service_pool.h
@@ -24,7 +24,6 @@
#include <vector>
#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/rpc_service.h"
#include "kudu/rpc/service_queue.h"
@@ -43,7 +42,6 @@ namespace rpc {
class InboundCall;
class RemoteMethod;
class ServiceIf;
-
struct RpcMethodInfo;
// A pool of threads that handle new incoming RPC calls.
@@ -74,7 +72,7 @@ class ServicePool : public RpcService {
RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
- virtual Status QueueInboundCall(std::unique_ptr<InboundCall> call) OVERRIDE;
+ Status QueueInboundCall(std::unique_ptr<InboundCall> call) override;
const Counter* RpcsTimedOutInQueueMetricForTests() const {
return rpcs_timed_out_in_queue_.get();