Repository: kudu
Updated Branches:
  refs/heads/master 0ee40b661 -> 1ae56048b


KUDU-2301: (Part-1) Add instrumentation on a per connection level

This patch returns the OutboundTransfer queue size on a per
connection level and makes them accessible via the
DumpRunningRpcs() call.

A test is added in rpc-test to ensure that this metric works as
expected.

A future patch will add more metrics.

Change-Id: Iae1a5fe0066adf644a9cac41ad6696e1bbf00465
Reviewed-on: http://gerrit.cloudera.org:8080/9343
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <t...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/68d797da
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/68d797da
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/68d797da

Branch: refs/heads/master
Commit: 68d797da1c11e15ebde06b16bb7e6ef3f36f9994
Parents: 0ee40b6
Author: Sailesh Mukil <sail...@apache.org>
Authored: Mon Feb 12 14:30:49 2018 -0800
Committer: Todd Lipcon <t...@apache.org>
Committed: Thu Feb 22 18:06:22 2018 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection.cc           |  5 ++-
 src/kudu/rpc/connection.h            |  5 +++
 src/kudu/rpc/messenger.h             |  1 +
 src/kudu/rpc/rpc-test.cc             | 59 +++++++++++++++++++++++++++++++
 src/kudu/rpc/rpc_introspection.proto |  1 +
 5 files changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/68d797da/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 857649b..7332b50 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -19,8 +19,6 @@
 
 #include <algorithm>
 #include <cerrno>
-#include <cstddef>
-#include <functional>
 #include <iostream>
 #include <memory>
 #include <set>
@@ -49,7 +47,6 @@
 #include "kudu/util/net/socket.h"
 #include "kudu/util/status.h"
 
-using std::function;
 using std::includes;
 using std::set;
 using std::shared_ptr;
@@ -749,6 +746,8 @@ Status Connection::DumpPB(const DumpRunningRpcsRequestPB& 
req,
         c->call->DumpPB(req, resp->add_calls_in_flight());
       }
     }
+
+    resp->set_outbound_queue_size(num_queued_outbound_transfers());
   } else if (direction_ == SERVER) {
     if (negotiation_complete_) {
       // It's racy to dump credentials while negotiating, since the Connection

http://git-wip-us.apache.org/repos/asf/kudu/blob/68d797da/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index a1f5911..362a35b 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_RPC_CONNECTION_H
 #define KUDU_RPC_CONNECTION_H
 
+#include <cstddef>
 #include <cstdint>
 #include <limits>
 #include <memory>
@@ -232,6 +233,10 @@ class Connection : public RefCountedThreadSafe<Connection> 
{
     scheduled_for_shutdown_ = true;
   }
 
+  size_t num_queued_outbound_transfers() const {
+    return outbound_transfers_.size();
+  }
+
  private:
   friend struct CallAwaitingResponse;
   friend class QueueTransferTask;

http://git-wip-us.apache.org/repos/asf/kudu/blob/68d797da/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 129ad0b..3835cea 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -323,6 +323,7 @@ class Messenger {
  private:
   FRIEND_TEST(TestRpc, TestConnectionKeepalive);
   FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
+  FRIEND_TEST(TestRpc, TestClientConnectionsMetrics);
   FRIEND_TEST(TestRpc, TestCredentialsPolicy);
   FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/68d797da/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index b653706..b6e563e 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -25,11 +25,13 @@
 #include <ostream>
 #include <set>
 #include <string>
+#include <unistd.h>
 #include <unordered_map>
 #include <vector>
 
 #include <boost/bind.hpp>
 #include <boost/core/ref.hpp>
+#include <boost/function.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -47,6 +49,7 @@
 #include "kudu/rpc/proxy.h"
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/rtest.pb.h"
 #include "kudu/rpc/serialization.h"
@@ -445,6 +448,62 @@ TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
   ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 
client connections";
 }
 
+// Test that the metrics on a per connection level work accurately.
+TEST_P(TestRpc, TestClientConnectionMetrics) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+  keepalive_time_ms_ = -1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, 
enable_ssl));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
+
+  // Cause the reactor thread to be blocked for 2 seconds.
+  server_messenger_->ScheduleOnReactor(boost::bind(sleep, 2), 
MonoDelta::FromSeconds(0));
+
+  RpcController controller;
+  DumpRunningRpcsRequestPB dump_req;
+  DumpRunningRpcsResponsePB dump_resp;
+  dump_req.set_include_traces(false);
+
+  // We'll send several calls asynchronously to force RPC queueing on the 
sender side.
+  int n_calls = 1000;
+  AddRequestPB add_req;
+  add_req.set_x(rand());
+  add_req.set_y(rand());
+  AddResponsePB add_resp;
+
+  vector<unique_ptr<RpcController>> controllers;
+  CountDownLatch latch(n_calls);
+  for (int i = 0; i < n_calls; i++) {
+    controllers.emplace_back(new RpcController());
+    p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, 
&add_resp,
+        controllers.back().get(), boost::bind(&CountDownLatch::CountDown, 
boost::ref(latch)));
+  }
+
+  // Since we blocked the only reactor thread for sometime, we should see RPCs 
queued on the
+  // OutboundTransfer queue, unless the main thread is very slow.
+  ASSERT_OK(client_messenger->DumpRunningRpcs(dump_req, &dump_resp));
+  ASSERT_EQ(1, dump_resp.outbound_connections_size());
+  ASSERT_GT(dump_resp.outbound_connections(0).outbound_queue_size(), 0);
+
+  // Wait for the calls to be marked finished.
+  latch.Wait();
+
+  // Verify that all the RPCs have finished.
+  for (const auto& controller : controllers) {
+    ASSERT_TRUE(controller->finished());
+  }
+}
+
 // Test that outbound connections to the same server are reopen upon every RPC
 // call when the 'rpc_reopen_outbound_connections' flag is set.
 TEST_P(TestRpc, TestReopenOutboundConnections) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/68d797da/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_introspection.proto 
b/src/kudu/rpc/rpc_introspection.proto
index 5c4f4f1..7685903 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -62,6 +62,7 @@ message RpcConnectionPB {
   // TODO: swap out for separate fields
   optional string remote_user_credentials = 3;
   repeated RpcCallInProgressPB calls_in_flight = 4;
+  optional int64 outbound_queue_size = 5;
 }
 
 message DumpRunningRpcsRequestPB {

Reply via email to