IMPALA-6456: Add flags to configure rpc_negotiation_timeout_ms and negotiation 
thread count in KRPC

With the fix for KUDU-2228, the FLAGS_rpc_negotiation_timeout_ms was
retired in KRPC.

This patch introduces a flag to be able to configure that from the
Impala side (FLAGS_rpc_negotiation_timeout_ms).

It also introduces a flag to configure the negotiation
thread count (FLAGS_rpc_negotiation_thread_count).

Added a test to verify that setting FLAGS_rpc_negotiation_timeout_ms
to 0 causes negotiation failures. We unfortunately can't write a test
to check the same for FLAGS_rpc_negotiation_thread_count due to
DCHECKS present in the code.

Change-Id: I108d700e7eac04b678e21a3a920aac81ba8eede5
Reviewed-on: http://gerrit.cloudera.org:8080/9186
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/83f96501
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/83f96501
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/83f96501

Branch: refs/heads/2.x
Commit: 83f9650186a2be5271142a00af6bde5128a16102
Parents: 31f3868
Author: Sailesh Mukil <sail...@cloudera.com>
Authored: Thu Feb 1 16:06:57 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Feb 8 07:01:53 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/rpc-mgr-test.cc | 22 ++++++++++++++++++++++
 be/src/rpc/rpc-mgr.cc      |  6 ++++++
 2 files changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/83f96501/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 4c4b100..cd24672 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -24,6 +24,7 @@ using kudu::MonoDelta;
 
 DECLARE_int32(num_reactor_threads);
 DECLARE_int32(num_acceptor_threads);
+DECLARE_int32(rpc_negotiation_timeout_ms);
 DECLARE_string(hostname);
 
 namespace impala {
@@ -230,6 +231,27 @@ TEST_F(RpcMgrTest, AsyncCall) {
   }
 }
 
+// Run a test with the negotiation timeout as 0 ms and ensure that connection
+// establishment fails.
+// This is to verify that FLAGS_rpc_negotiation_timeout_ms is actually 
effective.
+TEST_F(RpcMgrTest, NegotiationTimeout) {
+  // Set negotiation timeout to 0 milliseconds.
+  auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_rpc_negotiation_timeout_ms, 
0);
+
+  RpcMgr secondary_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress secondary_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t secondary_service_port = FindUnusedEphemeralPort(nullptr);
+  secondary_krpc_address = MakeNetworkAddress(ip, secondary_service_port);
+
+  ASSERT_OK(secondary_rpc_mgr.Init());
+  ASSERT_FALSE(RunMultipleServicesTestTemplate(
+      this, &secondary_rpc_mgr, secondary_krpc_address).ok());
+  secondary_rpc_mgr.Shutdown();
+}
+
 } // namespace impala
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/impala/blob/83f96501/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 7adde36..44ecc02 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -64,6 +64,10 @@ DEFINE_int32(num_reactor_threads, 0,
     "default value 0, it will be set to number of CPU cores.");
 DEFINE_int32(rpc_retry_interval_ms, 5,
     "Time in millisecond of waiting before retrying an RPC when remote is 
busy");
+DEFINE_int32(rpc_negotiation_timeout_ms, 60000,
+    "Time in milliseconds of waiting for a negotiation to complete before 
timing out.");
+DEFINE_int32(rpc_negotiation_thread_count, 4,
+    "Maximum number of threads dedicated to handling RPC connection 
negotiations.");
 
 namespace impala {
 
@@ -77,6 +81,8 @@ Status RpcMgr::Init() {
   int num_reactor_threads =
       FLAGS_num_reactor_threads > 0 ? FLAGS_num_reactor_threads : 
CpuInfo::num_cores();
   bld.set_num_reactors(num_reactor_threads).set_metric_entity(entity);
+  bld.set_rpc_negotiation_timeout_ms(FLAGS_rpc_negotiation_timeout_ms);
+  bld.set_max_negotiation_threads(max(1, FLAGS_rpc_negotiation_thread_count));
 
   // Disable idle connection detection by setting keepalive_time to -1. Idle 
connections
   // tend to be closed and re-opened around the same time, which may lead to 
negotiation

Reply via email to