Repository: kudu
Updated Branches:
  refs/heads/master 46f52bcde -> a97004a91


rpc: Add min / max negotiation threads

Since we use a thread pool for connection negotiation, it would be
helpful for users to be able to specify the minimum and maximum number
of threads in that pool. Prior to this patch, there were no gflags to
control these parameters, and the only builder parameter was the maximum
(however it was not labelled as such).

This allows tuning of the per-tserver negotiation pool size, both
minimum and maximum threads to allow in the thread pool.

Change-Id: Ife98b39d5f3a340702151ab27dc8026c8bac12ac
Reviewed-on: http://gerrit.cloudera.org:8080/4574
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a97004a9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a97004a9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a97004a9

Branch: refs/heads/master
Commit: a97004a9117b9c46e20a1e34de77af7f9478fe74
Parents: 46f52bc
Author: Mike Percy <[email protected]>
Authored: Thu Sep 29 17:32:00 2016 +0100
Committer: Adar Dembo <[email protected]>
Committed: Mon Nov 7 22:30:35 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/create-table-stress-test.cc    |  2 +-
 src/kudu/integration-tests/external_mini_cluster.cc  |  2 +-
 src/kudu/rpc/messenger.cc                            | 15 +++++++++++----
 src/kudu/rpc/messenger.h                             | 13 +++++++++----
 src/kudu/server/server_base.cc                       |  8 ++++++++
 5 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/integration-tests/create-table-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/create-table-stress-test.cc 
b/src/kudu/integration-tests/create-table-stress-test.cc
index c4c63f4..00f3558 100644
--- a/src/kudu/integration-tests/create-table-stress-test.cc
+++ b/src/kudu/integration-tests/create-table-stress-test.cc
@@ -95,7 +95,7 @@ class CreateTableStressTest : public KuduTest {
 
     ASSERT_OK(MessengerBuilder("stress-test-msgr")
               .set_num_reactors(1)
-              .set_negotiation_threads(1)
+              .set_max_negotiation_threads(1)
               .Build(&messenger_));
     master_proxy_.reset(new MasterServiceProxy(messenger_,
                                                
cluster_->mini_master()->bound_rpc_addr()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc 
b/src/kudu/integration-tests/external_mini_cluster.cc
index 621c059..c6d875a 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -129,7 +129,7 @@ Status ExternalMiniCluster::Start() {
 
   RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
                         .set_num_reactors(1)
-                        .set_negotiation_threads(1)
+                        .set_max_negotiation_threads(1)
                         .Build(&messenger_),
                         "Failed to start Messenger for minicluster");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 437add7..c5acc20 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -94,7 +94,8 @@ MessengerBuilder::MessengerBuilder(std::string name)
       connection_keepalive_time_(
           MonoDelta::FromMilliseconds(FLAGS_rpc_default_keepalive_time_ms)),
       num_reactors_(4),
-      num_negotiation_threads_(4),
+      min_negotiation_threads_(0),
+      max_negotiation_threads_(4),
       coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)) {}
 
 MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(const 
MonoDelta &keepalive) {
@@ -107,8 +108,13 @@ MessengerBuilder& MessengerBuilder::set_num_reactors(int 
num_reactors) {
   return *this;
 }
 
-MessengerBuilder& MessengerBuilder::set_negotiation_threads(int 
num_negotiation_threads) {
-  num_negotiation_threads_ = num_negotiation_threads;
+MessengerBuilder& MessengerBuilder::set_min_negotiation_threads(int 
min_negotiation_threads) {
+  min_negotiation_threads_ = min_negotiation_threads;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_max_negotiation_threads(int 
max_negotiation_threads) {
+  max_negotiation_threads_ = max_negotiation_threads;
   return *this;
 }
 
@@ -271,7 +277,8 @@ Messenger::Messenger(const MessengerBuilder &bld)
     reactors_.push_back(new Reactor(retain_self_, i, bld));
   }
   CHECK_OK(ThreadPoolBuilder("negotiator")
-              .set_max_threads(bld.num_negotiation_threads_)
+              .set_min_threads(bld.min_negotiation_threads_)
+              .set_max_threads(bld.max_negotiation_threads_)
               .Build(&negotiation_pool_));
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 09c8b53..23b39cb 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -83,9 +83,13 @@ class MessengerBuilder {
   // receiving.
   MessengerBuilder &set_num_reactors(int num_reactors);
 
-  // Set the number of connection-negotiation threads that will be used to 
handle the
-  // blocking connection-negotiation step.
-  MessengerBuilder &set_negotiation_threads(int num_negotiation_threads);
+  // Set the minimum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_min_negotiation_threads(int min_negotiation_threads);
+
+  // Set the maximum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_max_negotiation_threads(int max_negotiation_threads);
 
   // Set the granularity with which connections are checked for keepalive.
   MessengerBuilder &set_coarse_timer_granularity(const MonoDelta &granularity);
@@ -99,7 +103,8 @@ class MessengerBuilder {
   const std::string name_;
   MonoDelta connection_keepalive_time_;
   int num_reactors_;
-  int num_negotiation_threads_;
+  int min_negotiation_threads_;
+  int max_negotiation_threads_;
   MonoDelta coarse_timer_granularity_;
   scoped_refptr<MetricEntity> metric_entity_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/a97004a9/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 4d5fb46..bbef89b 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -59,6 +59,12 @@
 DEFINE_int32(num_reactor_threads, 4, "Number of libev reactor threads to 
start.");
 TAG_FLAG(num_reactor_threads, advanced);
 
+DEFINE_int32(min_negotiation_threads, 0, "Minimum number of connection 
negotiation threads.");
+TAG_FLAG(min_negotiation_threads, advanced);
+
+DEFINE_int32(max_negotiation_threads, 50, "Maximum number of connection 
negotiation threads.");
+TAG_FLAG(max_negotiation_threads, advanced);
+
 DECLARE_bool(use_hybrid_clock);
 
 using std::ostringstream;
@@ -177,6 +183,8 @@ Status ServerBase::Init() {
   rpc::MessengerBuilder builder(name_);
 
   builder.set_num_reactors(FLAGS_num_reactor_threads);
+  builder.set_min_negotiation_threads(FLAGS_min_negotiation_threads);
+  builder.set_max_negotiation_threads(FLAGS_max_negotiation_threads);
   builder.set_metric_entity(metric_entity());
   RETURN_NOT_OK(builder.Build(&messenger_));
 

Reply via email to