This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 771d36f84ef86c80334b9c322bf0748397cd8dd9 Author: Andrew Wong <[email protected]> AuthorDate: Thu Mar 5 01:10:37 2020 -0800 subprocess: add server metric for queue size Unlike the Java in/outbound queues, the C++ queues are measured in bytes. This exposes histogram metrics for the sizes of the inbound and outbound queue at Put-time. This also adds a method to get the logical size of a blocking queue. Change-Id: I62d2d5727dca3f54b59ff1044431326cbdde855d Reviewed-on: http://gerrit.cloudera.org:8080/15375 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Andrew Wong <[email protected]> --- src/kudu/ranger/ranger_client.cc | 46 +++++++++++++++-------- src/kudu/subprocess/echo_subprocess.cc | 56 ++++++++++++++++++---------- src/kudu/subprocess/server.cc | 4 ++ src/kudu/subprocess/server.h | 6 ++- src/kudu/subprocess/subprocess_proxy-test.cc | 30 ++++++++++++--- src/kudu/util/blocking_queue-test.cc | 3 ++ src/kudu/util/blocking_queue.h | 7 +++- 7 files changed, 107 insertions(+), 45 deletions(-) diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc index 8ca9812..c8ca24f 100644 --- a/src/kudu/ranger/ranger_client.cc +++ b/src/kudu/ranger/ranger_client.cc @@ -49,49 +49,63 @@ DEFINE_string(ranger_jar_path, "", "Path to the JAR file containing the Ranger subprocess."); TAG_FLAG(ranger_jar_path, experimental); +METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms, + "Ranger subprocess execution time (ms)", + kudu::MetricUnit::kMilliseconds, + "Duration of time in ms spent executing the Ranger subprocess request, excluding " + "time spent spent in the subprocess queues", + kudu::MetricLevel::kInfo, + 60000LU, 1); METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_length, "Ranger subprocess inbound queue length", kudu::MetricUnit::kMessages, "Number of request messages in the Ranger subprocess' inbound request queue", kudu::MetricLevel::kInfo, 1000, 1); -METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length, - "Ranger subprocess outbound queue length", - kudu::MetricUnit::kMessages, - "Number of request messages in the Ranger subprocess' outbound response queue", - kudu::MetricLevel::kInfo, - 1000, 1); METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_time_ms, "Ranger subprocess inbound queue time (ms)", kudu::MetricUnit::kMilliseconds, "Duration of time in ms spent in the Ranger subprocess' inbound request queue", kudu::MetricLevel::kInfo, 60000LU, 1); +METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length, + "Ranger subprocess outbound queue length", + kudu::MetricUnit::kMessages, + "Number of request messages in the Ranger subprocess' outbound response queue", + kudu::MetricLevel::kInfo, + 1000, 1); METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_time_ms, "Ranger subprocess outbound queue time (ms)", kudu::MetricUnit::kMilliseconds, "Duration of time in ms spent in the Ranger subprocess' outbound response queue", kudu::MetricLevel::kInfo, 60000LU, 1); -METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms, - "Ranger subprocess execution time (ms)", +METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_size_bytes, + "Ranger server inbound queue size (bytes)", + kudu::MetricUnit::kBytes, + "Number of bytes in the inbound response queue of the Ranger server, recorded " + "at the time a new response is read from the pipe and added to the inbound queue", + kudu::MetricLevel::kInfo, + 4 * 1024 * 1024, 1); +METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms, + "Ranger server inbound queue time (ms)", kudu::MetricUnit::kMilliseconds, - "Duration of time in ms spent executing the Ranger subprocess request, excluding " - "time spent spent in the subprocess queues", + "Duration of time in ms spent in the Ranger server's inbound response queue", kudu::MetricLevel::kInfo, 60000LU, 1); +METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_size_bytes, + "Ranger server outbound queue size (bytes)", + kudu::MetricUnit::kBytes, + "Number of bytes in the outbound request queue of the Ranger server, recorded " + "at the time a new request is added to the outbound request queue", + kudu::MetricLevel::kInfo, + 4 * 1024 * 1024, 1); METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms, "Ranger server outbound queue time (ms)", kudu::MetricUnit::kMilliseconds, "Duration of time in ms spent in the Ranger server's outbound request queue", kudu::MetricLevel::kInfo, 60000LU, 1); -METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms, - "Ranger server inbound queue time (ms)", - kudu::MetricUnit::kMilliseconds, - "Duration of time in ms spent in the Ranger server's inbound response queue", - kudu::MetricLevel::kInfo, - 60000LU, 1); namespace kudu { namespace ranger { diff --git a/src/kudu/subprocess/echo_subprocess.cc b/src/kudu/subprocess/echo_subprocess.cc index 5e39114..df5634d 100644 --- a/src/kudu/subprocess/echo_subprocess.cc +++ b/src/kudu/subprocess/echo_subprocess.cc @@ -19,62 +19,78 @@ #include "kudu/util/metrics.h" +METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms, + "Echo subprocess execution time (ms)", + kudu::MetricUnit::kMilliseconds, + "Duration of time in ms spent executing the Echo subprocess request, excluding " + "time spent spent in the subprocess queues", + kudu::MetricLevel::kInfo, + 60000LU, 1); METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length, "Echo subprocess inbound queue length", kudu::MetricUnit::kMessages, "Number of request messages in the Echo subprocess' inbound request queue", kudu::MetricLevel::kInfo, 1000, 1); -METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length, - "Echo subprocess outbound queue length", - kudu::MetricUnit::kMessages, - "Number of request messages in the Echo subprocess' outbound response queue", - kudu::MetricLevel::kInfo, - 1000, 1); METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms, "Echo subprocess inbound queue time (ms)", kudu::MetricUnit::kMilliseconds, "Duration of time in ms spent in the Echo subprocess' inbound request queue", kudu::MetricLevel::kInfo, 60000LU, 1); +METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length, + "Echo subprocess outbound queue length", + kudu::MetricUnit::kMessages, + "Number of request messages in the Echo subprocess' outbound response queue", + kudu::MetricLevel::kInfo, + 1000, 1); METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms, "Echo subprocess outbound queue time (ms)", kudu::MetricUnit::kMilliseconds, "Duration of time in ms spent in the Echo subprocess' outbound response queue", kudu::MetricLevel::kInfo, 60000LU, 1); -METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms, - "Echo subprocess execution time (ms)", +METRIC_DEFINE_histogram(server, echo_server_inbound_queue_size_bytes, + "Echo server inbound queue size (bytes)", + kudu::MetricUnit::kBytes, + "Number of bytes in the inbound response queue of the Echo server, recorded " + "at the time a new response is read from the pipe and added to the inbound queue", + kudu::MetricLevel::kInfo, + 4 * 1024 * 1024, 1); +METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms, + "Echo server inbound queue time (ms)", kudu::MetricUnit::kMilliseconds, - "Duration of time in ms spent executing the Echo subprocess request, excluding " - "time spent spent in the subprocess queues", + "Duration of time in ms spent in the Echo server's inbound response queue", kudu::MetricLevel::kInfo, 60000LU, 1); +METRIC_DEFINE_histogram(server, echo_server_outbound_queue_size_bytes, + "Echo server outbound queue size (bytes)", + kudu::MetricUnit::kBytes, + "Number of bytes in the outbound request queue of the Echo server, recorded " + "at the time a new request is added to the outbound request queue", + kudu::MetricLevel::kInfo, + 4 * 1024 * 1024, 1); METRIC_DEFINE_histogram(server, echo_server_outbound_queue_time_ms, "Echo server outbound queue time (ms)", kudu::MetricUnit::kMilliseconds, "Duration of time in ms spent in the Echo server's outbound request queue", kudu::MetricLevel::kInfo, 60000LU, 1); -METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms, - "Echo server inbound queue time (ms)", - kudu::MetricUnit::kMilliseconds, - "Duration of time in ms spent in the Echo server's inbound response queue", - kudu::MetricLevel::kInfo, - 60000LU, 1); namespace kudu { namespace subprocess { #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity) EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) { + HISTINIT(server_inbound_queue_size_bytes, echo_server_inbound_queue_size_bytes); + HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms); + HISTINIT(server_outbound_queue_size_bytes, echo_server_outbound_queue_size_bytes); + HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms); + HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms); HISTINIT(sp_inbound_queue_length, echo_subprocess_inbound_queue_length); - HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length); HISTINIT(sp_inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms); + HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length); HISTINIT(sp_outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms); - HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms); - HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms); - HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms); } #undef HISTINIT diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc index 744d0dc..3c7607b 100644 --- a/src/kudu/subprocess/server.cc +++ b/src/kudu/subprocess/server.cc @@ -131,6 +131,8 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req, req->set_id(next_id_++); Synchronizer sync; auto cb = sync.AsStdStatusCallback(); + // Before adding to the queue, record the size of the call queue. + metrics_.server_outbound_queue_size_bytes->Increment(outbound_call_queue_.size()); CallAndTimer call_and_timer = { make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} }; RETURN_NOT_OK_PREPEND( @@ -203,6 +205,8 @@ void SubprocessServer::ReceiveMessagesThread() { // subprocess. DCHECK(s.ok()); WARN_NOT_OK(s, "failed to receive response from the subprocess"); + // Before adding to the queue, record the size of the response queue. + metrics_.server_inbound_queue_size_bytes->Increment(inbound_response_queue_.size()); ResponsePBAndTimer resp_and_timer = { std::move(response), {} }; if (s.ok() && !inbound_response_queue_.BlockingPut(resp_and_timer).ok()) { // The queue has been shut down and we should shut down too. diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h index ed03f27..512966b 100644 --- a/src/kudu/subprocess/server.h +++ b/src/kudu/subprocess/server.h @@ -66,14 +66,16 @@ struct SimpleTimer { struct SubprocessMetrics { // Metrics returned from the subprocess. + scoped_refptr<Histogram> sp_execution_time_ms; scoped_refptr<Histogram> sp_inbound_queue_length; - scoped_refptr<Histogram> sp_outbound_queue_length; scoped_refptr<Histogram> sp_inbound_queue_time_ms; + scoped_refptr<Histogram> sp_outbound_queue_length; scoped_refptr<Histogram> sp_outbound_queue_time_ms; - scoped_refptr<Histogram> sp_execution_time_ms; // Metrics recorded by the SubprocessServer. + scoped_refptr<Histogram> server_inbound_queue_size_bytes; scoped_refptr<Histogram> server_inbound_queue_time_ms; + scoped_refptr<Histogram> server_outbound_queue_size_bytes; scoped_refptr<Histogram> server_outbound_queue_time_ms; }; diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc index e83490c..ff612e4 100644 --- a/src/kudu/subprocess/subprocess_proxy-test.cc +++ b/src/kudu/subprocess/subprocess_proxy-test.cc @@ -45,6 +45,8 @@ METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length); METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_time_ms); METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_time_ms); METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms); +METRIC_DECLARE_histogram(echo_server_outbound_queue_size_bytes); +METRIC_DECLARE_histogram(echo_server_inbound_queue_size_bytes); METRIC_DECLARE_histogram(echo_server_outbound_queue_time_ms); METRIC_DECLARE_histogram(echo_server_inbound_queue_time_ms); @@ -119,6 +121,16 @@ TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) { ASSERT_EQ(1, in_hist->TotalCount()); ASSERT_LE(0, in_hist->MaxValueForTests()); + // There shouldn't have anything bytes the server queues when we enqueue. + Histogram* server_in_size_hist = + GET_HIST(metric_entity_, echo_server_inbound_queue_size_bytes); + ASSERT_EQ(1, server_in_size_hist->TotalCount()); + ASSERT_EQ(0, server_in_size_hist->MaxValueForTests()); + Histogram* server_out_size_hist = + GET_HIST(metric_entity_, echo_server_outbound_queue_size_bytes); + ASSERT_EQ(1, server_out_size_hist->TotalCount()); + ASSERT_EQ(0, server_out_size_hist->MaxValueForTests()); + // We should have some non-negative queue times on the server side too. Histogram* server_out_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms); @@ -156,8 +168,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) { Histogram* in_len_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_length); Histogram* sp_out_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_time_ms); Histogram* sp_in_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_time_ms); - Histogram* server_out_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms); - Histogram* server_in_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms); + Histogram* server_out_time_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms); + Histogram* server_out_size_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_size_bytes); + Histogram* server_in_time_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms); + Histogram* server_in_size_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_size_bytes); ASSERT_EQ(0, exec_hist->TotalCount()); ASSERT_EQ(0, out_len_hist->TotalCount()); ASSERT_EQ(0, in_len_hist->TotalCount()); @@ -166,8 +180,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) { // We'll have sent the request from the server and not received the response. // Our metrics should reflect that. - ASSERT_EQ(1, server_out_hist->TotalCount()); - ASSERT_EQ(0, server_in_hist->TotalCount()); + ASSERT_EQ(1, server_out_time_hist->TotalCount()); + ASSERT_EQ(1, server_out_size_hist->TotalCount()); + ASSERT_EQ(0, server_in_time_hist->TotalCount()); + ASSERT_EQ(0, server_in_size_hist->TotalCount()); // Eventually the subprocess will return our call, and we'll see some // metrics. @@ -179,8 +195,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) { ASSERT_EQ(1, in_len_hist->TotalCount()); ASSERT_EQ(1, sp_out_hist->TotalCount()); ASSERT_EQ(1, sp_in_hist->TotalCount()); - ASSERT_EQ(1, server_out_hist->TotalCount()); - ASSERT_EQ(1, server_in_hist->TotalCount()); + ASSERT_EQ(1, server_out_time_hist->TotalCount()); + ASSERT_EQ(1, server_in_time_hist->TotalCount()); + ASSERT_EQ(1, server_out_size_hist->TotalCount()); + ASSERT_EQ(1, server_in_size_hist->TotalCount()); }); } diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc index a0f1297..38d38dd 100644 --- a/src/kudu/util/blocking_queue-test.cc +++ b/src/kudu/util/blocking_queue-test.cc @@ -207,8 +207,11 @@ struct LengthLogicalSize { TEST(BlockingQueueTest, TestLogicalSize) { BlockingQueue<string, LengthLogicalSize> test_queue(4); ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS); + ASSERT_EQ(1, test_queue.size()); ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS); + ASSERT_EQ(4, test_queue.size()); ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL); + ASSERT_EQ(4, test_queue.size()); } TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) { diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h index 40734b4..c9d59a2 100644 --- a/src/kudu/util/blocking_queue.h +++ b/src/kudu/util/blocking_queue.h @@ -244,6 +244,11 @@ class BlockingQueue { return max_size_; } + size_t size() const { + MutexLock l(lock_); + return size_; + } + std::string ToString() const { std::string ret; @@ -269,7 +274,7 @@ class BlockingQueue { bool shutdown_; size_t size_; - size_t max_size_; + const size_t max_size_; mutable Mutex lock_; ConditionVariable not_empty_; ConditionVariable not_full_;
