IMPALA-6448: Re-enable kerberized testing with KRPC

For the patch for IMPALA-5054, we realized that we needed to make
the kudu::rpc::Messenger configurable. A patch was done on the Kudu
side which is tracked by KUDU-2228. As part of that patch, one of
the design decisions taken was to only allow kerberos either on or
off for the entirety of the process life. This means that we cannot
switch kerberos on and off in the same process any more with KRPC.
This behavior can be found in SaslInit() in kudu/rpc/sasl_common.cc
as SaslInit() which is called once per process will hard code some
configuration which cannot be toggled.

This affected our kerberized rpc-mgr-tests. This patch splits out
the kerberized part of rpc-mgr-test into rpc-mgr-kerberized-test.

It also puts the common code between both the files into
rpc-mgr-test-base.h

Change-Id: I6412978316de90875c98f8fbe51c8d215c227b18
Reviewed-on: http://gerrit.cloudera.org:8080/9164
Reviewed-by: Sailesh Mukil <sail...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d84657ba
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d84657ba
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d84657ba

Branch: refs/heads/2.x
Commit: d84657baa14f166a0b750c9bf410711fd27c6d43
Parents: ee74a62
Author: Sailesh Mukil <sail...@apache.org>
Authored: Tue Jan 30 21:35:28 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Feb 8 07:01:53 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/CMakeLists.txt             |   7 +-
 be/src/rpc/rpc-mgr-kerberized-test.cc |  89 ++++++++
 be/src/rpc/rpc-mgr-test-base.h        | 269 +++++++++++++++++++++++
 be/src/rpc/rpc-mgr-test.cc            | 333 ++---------------------------
 4 files changed, 381 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d84657ba/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 7beb80d..e4d96e2 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -50,7 +50,12 @@ ADD_BE_TEST(rpc-mgr-test)
 add_dependencies(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test security-test-for-impala)
-target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE})
+
+ADD_BE_TEST(rpc-mgr-kerberized-test)
+add_dependencies(rpc-mgr-kerberized-test rpc_test_proto)
+target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto)
+target_link_libraries(rpc-mgr-kerberized-test security-test-for-impala)
+target_link_libraries(rpc-mgr-kerberized-test ${KRB5_REALM_OVERRIDE})
 
 add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS})
 add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc)

http://git-wip-us.apache.org/repos/asf/impala/blob/d84657ba/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc 
b/be/src/rpc/rpc-mgr-kerberized-test.cc
new file mode 100644
index 0000000..57ed3eb
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr-test-base.h"
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+
+namespace impala {
+
+static int kdc_port = GetServerPort();
+
+class RpcMgrKerberizedTest :
+    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    string spn = Substitute("impala-test/$0", ip);
+
+    kdc_wrapper_.reset(new MiniKdcWrapper(
+        std::move(spn), "KRBTEST.COM", "24h", "7d", kdc_port));
+    DCHECK(kdc_wrapper_.get() != nullptr);
+
+    ASSERT_OK(kdc_wrapper_->SetupAndStartMiniKDC(GetParam()));
+    ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
+
+    RpcMgrTestBase::SetUp();
+  }
+
+  virtual void TearDown() {
+    ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
+    RpcMgrTestBase::TearDown();
+  }
+
+ private:
+  boost::scoped_ptr<MiniKdcWrapper> kdc_wrapper_;
+};
+
+INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
+                        RpcMgrKerberizedTest,
+                        ::testing::Values(USE_KUDU_KERBEROS,
+                                          USE_IMPALA_KERBEROS));
+
+TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+  // TODO: We're starting a seperate RpcMgr here instead of configuring
+  // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need 
to introduce
+  // new gtest params to turn on TLS which needs to be a coordinated change 
across
+  // rpc-mgr-test and thrift-server-test.
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  // Enable TLS.
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
+  ASSERT_OK(tls_rpc_mgr.Init());
+
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, 
tls_krpc_address));
+  tls_rpc_mgr.Shutdown();
+}
+
+} // namespace impala
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+
+  // Fill in the path of the current binary for use by the tests.
+  CURRENT_EXECUTABLE_PATH = argv[0];
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/d84657ba/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
new file mode 100644
index 0000000..43b6d83
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr.inline.h"
+
+#include "common/init.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "rpc/auth-provider.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/gtest-util.h"
+#include "testutil/mini-kdc-wrapper.h"
+#include "testutil/scoped-flag-setter.h"
+#include "util/counting-barrier.h"
+#include "util/network-util.h"
+#include "util/openssl-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/rpc_test.proxy.h"
+#include "gen-cpp/rpc_test.service.h"
+
+#include "common/names.h"
+
+using kudu::rpc::ServiceIf;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
+using kudu::Slice;
+
+using namespace std;
+
+DECLARE_int32(num_reactor_threads);
+DECLARE_int32(num_acceptor_threads);
+DECLARE_string(hostname);
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+
+// The path of the current executable file that is required for passing into 
the SASL
+// library as the 'application name'.
+static string CURRENT_EXECUTABLE_PATH;
+
+namespace impala {
+
+static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
+
+int GetServerPort() {
+  int port = FindUnusedEphemeralPort(nullptr);
+  EXPECT_FALSE(port == -1);
+  return port;
+}
+
+const static string IMPALA_HOME(getenv("IMPALA_HOME"));
+const string& SERVER_CERT =
+    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
+const string& PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
+const string& BAD_SERVER_CERT =
+    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
+const string& BAD_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
+const string& PASSWORD_PROTECTED_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
+
+/// Use this class to set the appropriate required TLS flags for the duration 
of the
+/// lifetime of the object.
+/// It is assumed that the flags always hold empty values by default.
+class ScopedSetTlsFlags {
+ public:
+  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& 
ca_cert,
+      const string& pkey_passwd = "", const string& ciphers = "") {
+    FLAGS_ssl_server_certificate = cert;
+    FLAGS_ssl_private_key = pkey;
+    FLAGS_ssl_client_ca_certificate = ca_cert;
+    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
+    FLAGS_ssl_cipher_list = ciphers;
+  }
+
+  ~ScopedSetTlsFlags() {
+    FLAGS_ssl_server_certificate = "";
+    FLAGS_ssl_private_key = "";
+    FLAGS_ssl_client_ca_certificate = "";
+    FLAGS_ssl_private_key_password_cmd = "";
+    FLAGS_ssl_cipher_list = "";
+  }
+};
+
+// Only use TLSv1.0 compatible ciphers, as tests might run on machines with 
only TLSv1.0
+// support.
+const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
+const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
+
+#define PAYLOAD_SIZE (4096)
+
+template <class T> class RpcMgrTestBase : public T {
+ public:
+  // Utility function to initialize the parameter for ScanMem RPC.
+  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a 
sidecar
+  // to 'controller'. Also sets up 'request' with the random value and index 
of the
+  // sidecar.
+  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* 
controller) {
+    int32_t pattern = random();
+    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = 
pattern;
+    int idx;
+    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+    request->set_pattern(pattern);
+    request->set_sidecar_idx(idx);
+  }
+
+  MemTracker* service_tracker() { return &service_tracker_; }
+
+ protected:
+  TNetworkAddress krpc_address_;
+  RpcMgr rpc_mgr_;
+
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    ASSERT_OK(rpc_mgr_.Init());
+  }
+
+  virtual void TearDown() {
+    rpc_mgr_.Shutdown();
+  }
+
+ private:
+  int32_t payload_[PAYLOAD_SIZE];
+  MemTracker service_tracker_;
+};
+
+typedef std::function<void(RpcContext*)> ServiceCB;
+
+class PingServiceImpl : public PingServiceIf {
+ public:
+  // 'cb' is a callback used by tests to inject custom behaviour into the RPC 
handler.
+  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* 
mem_tracker,
+      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
+    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
+
+  virtual void Ping(
+      const PingRequestPB* request, PingResponsePB* response, RpcContext* 
context) {
+    response->set_int_response(42);
+    // Incoming requests will already be tracked and we need to release the 
memory.
+    mem_tracker_->Release(context->GetTransferSize());
+    cb_(context);
+  }
+
+ private:
+  MemTracker* mem_tracker_;
+  ServiceCB cb_;
+};
+
+class ScanMemServiceImpl : public ScanMemServiceIf {
+ public:
+  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* 
mem_tracker)
+    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
+  }
+
+  // The request comes with an int 'pattern' and a payload of int array sent 
with
+  // sidecar. Scan the array to make sure every element matches 'pattern'.
+  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* 
response,
+      RpcContext* context) {
+    int32_t pattern = request->pattern();
+    Slice payload;
+    ASSERT_OK(
+        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), 
&payload)));
+    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
+
+    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
+    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
+      int32_t val = v[i];
+      if (val != pattern) {
+        // Incoming requests will already be tracked and we need to release 
the memory.
+        mem_tracker_->Release(context->GetTransferSize());
+        context->RespondFailure(kudu::Status::Corruption(
+            Substitute("Expecting $1; Found $2", pattern, val)));
+        return;
+      }
+    }
+    // Incoming requests will already be tracked and we need to release the 
memory.
+    mem_tracker_->Release(context->GetTransferSize());
+    context->RespondSuccess();
+  }
+
+ private:
+  MemTracker* mem_tracker_;
+
+};
+
+template <class T>
+Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
+    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
+  MemTracker* mem_tracker = test_base->service_tracker();
+  // Test that a service can be started, and will respond to requests.
+  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), 
mem_tracker));
+
+  // Test that a second service, that verifies the RPC payload is not 
corrupted,
+  // can be started.
+  unique_ptr<ServiceIf> scan_mem_impl(new 
ScanMemServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), 
mem_tracker));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
+
+  unique_ptr<PingServiceProxy> ping_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, 
&ping_proxy));
+
+  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, 
&scan_mem_proxy));
+
+  RpcController controller;
+  srand(0);
+  // Randomly invoke either services to make sure a RpcMgr can host multiple
+  // services at the same time.
+  for (int i = 0; i < 100; ++i) {
+    controller.Reset();
+    if (random() % 2 == 0) {
+      PingRequestPB request;
+      PingResponsePB response;
+      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
+          "unable to execute Ping() RPC.");
+      if (response.int_response() != 42) {
+          return Status(Substitute(
+              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
+                  response.int_response()));
+      }
+    } else {
+      ScanMemRequestPB request;
+      ScanMemResponsePB response;
+      test_base->SetupScanMemRequest(&request, &controller);
+      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, 
&controller),
+          "unable to execute ScanMem() RPC.");
+    }
+  }
+
+  return Status::OK();
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/d84657ba/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index c525148..4c4b100 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -15,124 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/rpc-mgr.inline.h"
-
-#include "common/init.h"
-#include "exec/kudu-util.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/rpc_sidecar.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "rpc/auth-provider.h"
-#include "runtime/mem-tracker.h"
-#include "testutil/gtest-util.h"
-#include "testutil/mini-kdc-wrapper.h"
-#include "testutil/scoped-flag-setter.h"
-#include "util/counting-barrier.h"
-#include "util/network-util.h"
-#include "util/openssl-util.h"
-#include "util/test-info.h"
-
-#include "gen-cpp/rpc_test.proxy.h"
-#include "gen-cpp/rpc_test.service.h"
-
-#include "common/names.h"
-
-using kudu::rpc::ErrorStatusPB;
+#include "rpc/rpc-mgr-test-base.h"
+
 using kudu::rpc::ServiceIf;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
-using kudu::rpc::RpcSidecar;
 using kudu::MonoDelta;
-using kudu::Slice;
-
-using namespace std;
 
 DECLARE_int32(num_reactor_threads);
 DECLARE_int32(num_acceptor_threads);
 DECLARE_string(hostname);
 
-DECLARE_string(ssl_client_ca_certificate);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_private_key);
-DECLARE_string(ssl_private_key_password_cmd);
-DECLARE_string(ssl_cipher_list);
-
-// The path of the current executable file that is required for passing into 
the SASL
-// library as the 'application name'.
-static string CURRENT_EXECUTABLE_PATH;
-
 namespace impala {
 
-static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
-
-int GetServerPort() {
-  int port = FindUnusedEphemeralPort(nullptr);
-  EXPECT_FALSE(port == -1);
-  return port;
-}
-
-static int kdc_port = GetServerPort();
-
-const static string IMPALA_HOME(getenv("IMPALA_HOME"));
-const string& SERVER_CERT =
-    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
-const string& PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
-const string& BAD_SERVER_CERT =
-    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
-const string& BAD_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
-const string& PASSWORD_PROTECTED_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
-
-// Only use TLSv1.0 compatible ciphers, as tests might run on machines with 
only TLSv1.0
-// support.
-const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
-const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
-
-#define PAYLOAD_SIZE (4096)
-
-template <class T> class RpcMgrTestBase : public T {
- public:
-  // Utility function to initialize the parameter for ScanMem RPC.
-  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a 
sidecar
-  // to 'controller'. Also sets up 'request' with the random value and index 
of the
-  // sidecar.
-  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* 
controller) {
-    int32_t pattern = random();
-    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = 
pattern;
-    int idx;
-    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
-    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
-    request->set_pattern(pattern);
-    request->set_sidecar_idx(idx);
-  }
-
-  MemTracker* service_tracker() { return &service_tracker_; }
-
- protected:
-  TNetworkAddress krpc_address_;
-  RpcMgr rpc_mgr_;
-
-  virtual void SetUp() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
-    ASSERT_OK(rpc_mgr_.Init());
-  }
-
-  virtual void TearDown() {
-    rpc_mgr_.Shutdown();
-  }
-
- private:
-  int32_t payload_[PAYLOAD_SIZE];
-  MemTracker service_tracker_;
-};
-
 // For tests that do not require kerberized testing, we use RpcTest.
 class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
   virtual void SetUp() {
@@ -144,157 +39,7 @@ class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
   }
 };
 
-class RpcMgrKerberizedTest :
-    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
-  virtual void SetUp() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    string spn = Substitute("impala-test/$0", ip);
-
-    kdc_wrapper_.reset(new MiniKdcWrapper(
-        std::move(spn), "KRBTEST.COM", "24h", "7d", kdc_port));
-    DCHECK(kdc_wrapper_.get() != nullptr);
-
-    ASSERT_OK(kdc_wrapper_->SetupAndStartMiniKDC(GetParam()));
-    ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
-
-    RpcMgrTestBase::SetUp();
-  }
-
-  virtual void TearDown() {
-    ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
-    RpcMgrTestBase::TearDown();
-  }
-
- private:
-  boost::scoped_ptr<MiniKdcWrapper> kdc_wrapper_;
-};
-
-typedef std::function<void(RpcContext*)> ServiceCB;
-
-class PingServiceImpl : public PingServiceIf {
- public:
-  // 'cb' is a callback used by tests to inject custom behaviour into the RPC 
handler.
-  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* 
mem_tracker,
-      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
-
-  virtual void Ping(
-      const PingRequestPB* request, PingResponsePB* response, RpcContext* 
context) {
-    response->set_int_response(42);
-    // Incoming requests will already be tracked and we need to release the 
memory.
-    mem_tracker_->Release(context->GetTransferSize());
-    cb_(context);
-  }
-
- private:
-  MemTracker* mem_tracker_;
-  ServiceCB cb_;
-};
-
-class ScanMemServiceImpl : public ScanMemServiceIf {
- public:
-  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* 
mem_tracker)
-    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
-  }
-
-  // The request comes with an int 'pattern' and a payload of int array sent 
with
-  // sidecar. Scan the array to make sure every element matches 'pattern'.
-  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* 
response,
-      RpcContext* context) {
-    int32_t pattern = request->pattern();
-    Slice payload;
-    ASSERT_OK(
-        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), 
&payload)));
-    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
-
-    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
-    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
-      int32_t val = v[i];
-      if (val != pattern) {
-        // Incoming requests will already be tracked and we need to release 
the memory.
-        mem_tracker_->Release(context->GetTransferSize());
-        context->RespondFailure(kudu::Status::Corruption(
-            Substitute("Expecting $1; Found $2", pattern, val)));
-        return;
-      }
-    }
-    // Incoming requests will already be tracked and we need to release the 
memory.
-    mem_tracker_->Release(context->GetTransferSize());
-    context->RespondSuccess();
-  }
-
- private:
-  MemTracker* mem_tracker_;
-};
-
-// TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to 
IMPALA-6448.
-// Re-enable after fixing.
-INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
-                        RpcMgrKerberizedTest,
-                        ::testing::Values(KERBEROS_OFF));
-
-template <class T>
-Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
-    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
-  MemTracker* mem_tracker = test_base->service_tracker();
-  // Test that a service can be started, and will respond to requests.
-  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), 
mem_tracker));
-
-  // Test that a second service, that verifies the RPC payload is not 
corrupted,
-  // can be started.
-  unique_ptr<ServiceIf> scan_mem_impl(new 
ScanMemServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), 
mem_tracker));
-
-  FLAGS_num_acceptor_threads = 2;
-  FLAGS_num_reactor_threads = 10;
-  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
-
-  unique_ptr<PingServiceProxy> ping_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, 
&ping_proxy));
-
-  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, 
&scan_mem_proxy));
-
-  RpcController controller;
-  srand(0);
-  // Randomly invoke either services to make sure a RpcMgr can host multiple
-  // services at the same time.
-  for (int i = 0; i < 100; ++i) {
-    controller.Reset();
-    if (random() % 2 == 0) {
-      PingRequestPB request;
-      PingResponsePB response;
-      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
-          "unable to execute Ping() RPC.");
-      if (response.int_response() != 42) {
-          return Status(Substitute(
-              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
-                  response.int_response()));
-      }
-    } else {
-      ScanMemRequestPB request;
-      ScanMemResponsePB response;
-      test_base->SetupScanMemRequest(&request, &controller);
-      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, 
&controller),
-          "unable to execute ScanMem() RPC.");
-    }
-  }
-
-  return Status::OK();
-}
-
-
-TEST_F(RpcMgrTest, MultipleServices) {
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
-}
-
-TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+TEST_F(RpcMgrTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring
   // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need 
to introduce
   // new gtest params to turn on TLS which needs to be a coordinated change 
across
@@ -307,28 +52,20 @@ TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  // Enable TLS.
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, 
SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, 
SERVER_CERT);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
   ASSERT_OK(tls_rpc_mgr.Init());
 
   ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, 
tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
+TEST_F(RpcMgrTest, MultipleServices) {
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
+}
+
 // Test with a misconfigured TLS certificate and verify that an error is 
thrown.
 TEST_F(RpcMgrTest, BadCertificateTls) {
-
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, 
SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, 
"unknown");
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, "unknown");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -344,16 +81,8 @@ TEST_F(RpcMgrTest, BadCertificateTls) {
 
 // Test with a bad password command for the password protected private key.
 TEST_F(RpcMgrTest, BadPasswordTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, 
SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, 
SERVER_CERT);
-  auto password_cmd =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key_password_cmd, "echo badpassword");
+  ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
+      "echo badpassword");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -369,16 +98,8 @@ TEST_F(RpcMgrTest, BadPasswordTls) {
 
 // Test with a correct password command for the password protected private key.
 TEST_F(RpcMgrTest, CorrectPasswordTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, 
SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, 
SERVER_CERT);
-  auto password_cmd =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key_password_cmd, "echo password");
+  ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
+      "echo password");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -395,14 +116,7 @@ TEST_F(RpcMgrTest, CorrectPasswordTls) {
 
 // Test with a bad TLS cipher and verify that an error is thrown.
 TEST_F(RpcMgrTest, BadCiphersTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, 
SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, 
SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, "not_a_cipher");
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", 
"not_a_cipher");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -418,14 +132,8 @@ TEST_F(RpcMgrTest, BadCiphersTls) {
 
 // Test with a valid TLS cipher.
 TEST_F(RpcMgrTest, ValidCiphersTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, 
SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, 
SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, 
TLS1_0_COMPATIBLE_CIPHER);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "",
+      TLS1_0_COMPATIBLE_CIPHER);
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -444,14 +152,7 @@ TEST_F(RpcMgrTest, ValidCiphersTls) {
 TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
   const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER,
       TLS1_0_COMPATIBLE_CIPHER_2);
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, 
SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, 
SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, cipher_list);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", cipher_list);
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;

Reply via email to