IMPALA-5054: [SECURITY] Enable KRPC w/ TLS in Impala KRPC has some flags that turn on TLS. This patch sets those to enable TLS communication.
Tests are added to rpc-mgr-test. TODO: Kudu kerberos testing is disabled. Will re-enable as part of IMPALA-6448. Change-Id: I9a14a44fdea9ab668f3714eb69fdb188bce38f5a Reviewed-on: http://gerrit.cloudera.org:8080/8439 Reviewed-by: Sailesh Mukil <sail...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/68b7c8b8 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/68b7c8b8 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/68b7c8b8 Branch: refs/heads/2.x Commit: 68b7c8b8aabd1a65f325f971dc861f6cb2eff5ad Parents: 885776e Author: Sailesh Mukil <sail...@apache.org> Authored: Sun Oct 29 18:38:57 2017 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Fri Feb 2 01:10:15 2018 +0000 ---------------------------------------------------------------------- be/src/catalog/catalogd-main.cc | 9 +- be/src/rpc/authentication-test.cc | 5 +- be/src/rpc/rpc-mgr-test.cc | 274 +++++++++++++++++++++--- be/src/rpc/rpc-mgr.cc | 31 ++- be/src/rpc/rpc-mgr.h | 6 + be/src/rpc/thrift-server.cc | 14 -- be/src/rpc/thrift-server.h | 4 - be/src/runtime/exec-env.cc | 3 +- be/src/service/impala-server.cc | 9 +- be/src/statestore/statestore-subscriber.cc | 5 +- be/src/statestore/statestore.cc | 5 +- be/src/statestore/statestored-main.cc | 3 +- be/src/testutil/in-process-servers.cc | 3 +- be/src/util/openssl-util.cc | 23 ++ be/src/util/openssl-util.h | 9 + 15 files changed, 332 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/catalog/catalogd-main.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc index f98a406..95e5dcd 100644 --- a/be/src/catalog/catalogd-main.cc +++ b/be/src/catalog/catalogd-main.cc @@ -29,14 +29,15 @@ #include "rpc/thrift-server.h" #include "runtime/mem-tracker.h" #include "service/fe-support.h" +#include "util/common-metrics.h" #include "util/debug-util.h" #include "util/jni-util.h" +#include "util/default-path-handlers.h" +#include "util/memory-metrics.h" #include "util/metrics.h" -#include "util/common-metrics.h" #include "util/network-util.h" -#include "util/memory-metrics.h" +#include "util/openssl-util.h" #include "util/webserver.h" -#include "util/default-path-handlers.h" DECLARE_string(classpath); DECLARE_string(principal); @@ -95,7 +96,7 @@ int CatalogdMain(int argc, char** argv) { ThriftServer* server; ThriftServerBuilder builder("CatalogService", processor, FLAGS_catalog_service_port); - if (EnableInternalSslConnections()) { + if (IsInternalTlsConfigured()) { SSLProtocol ssl_version; ABORT_IF_ERROR( SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version)); http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/authentication-test.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/authentication-test.cc b/be/src/rpc/authentication-test.cc index 2b204ad..0dacdee 100644 --- a/be/src/rpc/authentication-test.cc +++ b/be/src/rpc/authentication-test.cc @@ -22,6 +22,7 @@ #include "rpc/thrift-server.h" #include "util/auth-util.h" #include "util/network-util.h" +#include "util/openssl-util.h" #include "util/thread.h" #include <ldap.h> @@ -32,6 +33,7 @@ DECLARE_string(keytab_file); DECLARE_string(principal); DECLARE_string(ssl_client_ca_certificate); DECLARE_string(ssl_server_certificate); +DECLARE_string(ssl_private_key); DECLARE_string(internal_principals_whitelist); // These are here so that we can grab them early in main() - the kerberos @@ -172,7 +174,8 @@ TEST(Auth, KerbAndSslEnabled) { ASSERT_OK(GetHostname(&hostname)); FLAGS_ssl_client_ca_certificate = "some_path"; FLAGS_ssl_server_certificate = "some_path"; - ASSERT_TRUE(EnableInternalSslConnections()); + FLAGS_ssl_private_key = "some_path"; + ASSERT_TRUE(IsInternalTlsConfigured()); SaslAuthProvider sa_internal(true); ASSERT_OK( sa_internal.InitKerberos("service_name/_HOST@some.realm", "/etc/hosts")); http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc index 441619b..7effda9 100644 --- a/be/src/rpc/rpc-mgr-test.cc +++ b/be/src/rpc/rpc-mgr-test.cc @@ -28,8 +28,10 @@ #include "rpc/auth-provider.h" #include "testutil/gtest-util.h" #include "testutil/mini-kdc-wrapper.h" +#include "testutil/scoped-flag-setter.h" #include "util/counting-barrier.h" #include "util/network-util.h" +#include "util/openssl-util.h" #include "util/test-info.h" #include "gen-cpp/rpc_test.proxy.h" @@ -51,6 +53,11 @@ DECLARE_int32(num_reactor_threads); DECLARE_int32(num_acceptor_threads); DECLARE_string(hostname); +DECLARE_string(ssl_client_ca_certificate); +DECLARE_string(ssl_server_certificate); +DECLARE_string(ssl_private_key); +DECLARE_string(ssl_private_key_password_cmd); +DECLARE_string(ssl_cipher_list); // The path of the current executable file that is required for passing into the SASL // library as the 'application name'. @@ -68,9 +75,41 @@ int GetServerPort() { static int kdc_port = GetServerPort(); +const static string IMPALA_HOME(getenv("IMPALA_HOME")); +const string& SERVER_CERT = + Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME); +const string& PRIVATE_KEY = + Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME); +const string& BAD_SERVER_CERT = + Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME); +const string& BAD_PRIVATE_KEY = + Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME); +const string& PASSWORD_PROTECTED_PRIVATE_KEY = + Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME); + +// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0 +// support. +const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA"; +const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5"; + #define PAYLOAD_SIZE (4096) template <class T> class RpcMgrTestBase : public T { + public: + // Utility function to initialize the parameter for ScanMem RPC. + // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar + // to 'controller'. Also sets up 'request' with the random value and index of the + // sidecar. + void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) { + int32_t pattern = random(); + for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern; + int idx; + Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE); + controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx); + request->set_pattern(pattern); + request->set_sidecar_idx(idx); + } + protected: TNetworkAddress krpc_address_; RpcMgr rpc_mgr_; @@ -86,20 +125,6 @@ template <class T> class RpcMgrTestBase : public T { rpc_mgr_.Shutdown(); } - // Utility function to initialize the parameter for ScanMem RPC. - // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar - // to 'controller'. Also sets up 'request' with the random value and index of the - // sidecar. - void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) { - int32_t pattern = random(); - for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern; - int idx; - Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE); - controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx); - request->set_pattern(pattern); - request->set_sidecar_idx(idx); - } - private: int32_t payload_[PAYLOAD_SIZE]; }; @@ -191,35 +216,35 @@ class ScanMemServiceImpl : public ScanMemServiceIf { } }; -// TODO: Disabled 'USE_KUDU_KERBEROS' and 'USE_IMPALA_KERBEROS' due to IMPALA-6268. -// Reenable after fixing. +// TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to IMPALA-6448. +// Re-enable after fixing. INSTANTIATE_TEST_CASE_P(KerberosOnAndOff, RpcMgrKerberizedTest, - ::testing::Values(KERBEROS_OFF, - USE_KUDU_KERBEROS, - USE_IMPALA_KERBEROS)); + ::testing::Values(KERBEROS_OFF)); -TEST_P(RpcMgrKerberizedTest, MultipleServices) { +template <class T> +Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base, + RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) { // Test that a service can be started, and will respond to requests. unique_ptr<ServiceIf> ping_impl( - new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker())); - ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(ping_impl))); + new PingServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker())); + RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl))); // Test that a second service, that verifies the RPC payload is not corrupted, // can be started. unique_ptr<ServiceIf> scan_mem_impl( - new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker())); - ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl))); + new ScanMemServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker())); + RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl))); FLAGS_num_acceptor_threads = 2; FLAGS_num_reactor_threads = 10; - ASSERT_OK(rpc_mgr_.StartServices(krpc_address_)); + RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address)); unique_ptr<PingServiceProxy> ping_proxy; - ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &ping_proxy)); + RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy)); unique_ptr<ScanMemServiceProxy> scan_mem_proxy; - ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy)); + RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy)); RpcController controller; srand(0); @@ -230,17 +255,200 @@ TEST_P(RpcMgrKerberizedTest, MultipleServices) { if (random() % 2 == 0) { PingRequestPB request; PingResponsePB response; - kudu::Status status = ping_proxy->Ping(request, &response, &controller); - ASSERT_TRUE(status.ok()); - ASSERT_EQ(response.int_response(), 42); + KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller), + "unable to execute Ping() RPC."); + if (response.int_response() != 42) { + return Status(Substitute( + "Ping() failed. Incorrect response. Expected: 42; Got: $0", + response.int_response())); + } } else { ScanMemRequestPB request; ScanMemResponsePB response; - SetupScanMemRequest(&request, &controller); - kudu::Status status = scan_mem_proxy->ScanMem(request, &response, &controller); - ASSERT_TRUE(status.ok()); + test_base->SetupScanMemRequest(&request, &controller); + KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller), + "unable to execute ScanMem() RPC."); } } + + return Status::OK(); +} + + +TEST_F(RpcMgrTest, MultipleServices) { + ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_)); +} + +TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) { + // TODO: We're starting a seperate RpcMgr here instead of configuring + // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce + // new gtest params to turn on TLS which needs to be a coordinated change across + // rpc-mgr-test and thrift-server-test. + RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); + TNetworkAddress tls_krpc_address; + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + + int32_t tls_service_port = FindUnusedEphemeralPort(nullptr); + tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + + // Enable TLS. + auto cert_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT); + auto pkey_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY); + auto ca_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT); + ASSERT_OK(tls_rpc_mgr.Init()); + + ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address)); + tls_rpc_mgr.Shutdown(); +} + +// Test with a misconfigured TLS certificate and verify that an error is thrown. +TEST_F(RpcMgrTest, BadCertificateTls) { + + auto cert_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT); + auto pkey_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY); + auto ca_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, "unknown"); + + RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); + TNetworkAddress tls_krpc_address; + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + + int32_t tls_service_port = FindUnusedEphemeralPort(nullptr); + tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + + ASSERT_FALSE(tls_rpc_mgr.Init().ok()); + tls_rpc_mgr.Shutdown(); +} + +// Test with a bad password command for the password protected private key. +TEST_F(RpcMgrTest, BadPasswordTls) { + auto cert_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT); + auto pkey_flag = + ScopedFlagSetter<string>::Make( + &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY); + auto ca_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT); + auto password_cmd = + ScopedFlagSetter<string>::Make( + &FLAGS_ssl_private_key_password_cmd, "echo badpassword"); + + RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); + TNetworkAddress tls_krpc_address; + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + + int32_t tls_service_port = FindUnusedEphemeralPort(nullptr); + tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + + ASSERT_FALSE(tls_rpc_mgr.Init().ok()); + tls_rpc_mgr.Shutdown(); +} + +// Test with a correct password command for the password protected private key. +TEST_F(RpcMgrTest, CorrectPasswordTls) { + auto cert_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT); + auto pkey_flag = + ScopedFlagSetter<string>::Make( + &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY); + auto ca_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT); + auto password_cmd = + ScopedFlagSetter<string>::Make( + &FLAGS_ssl_private_key_password_cmd, "echo password"); + + RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); + TNetworkAddress tls_krpc_address; + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + + int32_t tls_service_port = FindUnusedEphemeralPort(nullptr); + tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + + ASSERT_OK(tls_rpc_mgr.Init()); + ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address)); + tls_rpc_mgr.Shutdown(); +} + +// Test with a bad TLS cipher and verify that an error is thrown. +TEST_F(RpcMgrTest, BadCiphersTls) { + auto cert_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT); + auto pkey_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY); + auto ca_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT); + auto cipher = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, "not_a_cipher"); + + RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); + TNetworkAddress tls_krpc_address; + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + + int32_t tls_service_port = FindUnusedEphemeralPort(nullptr); + tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + + ASSERT_FALSE(tls_rpc_mgr.Init().ok()); + tls_rpc_mgr.Shutdown(); +} + +// Test with a valid TLS cipher. +TEST_F(RpcMgrTest, ValidCiphersTls) { + auto cert_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT); + auto pkey_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY); + auto ca_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT); + auto cipher = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, TLS1_0_COMPATIBLE_CIPHER); + + RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); + TNetworkAddress tls_krpc_address; + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + + int32_t tls_service_port = FindUnusedEphemeralPort(nullptr); + tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + + ASSERT_OK(tls_rpc_mgr.Init()); + ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address)); + tls_rpc_mgr.Shutdown(); +} + +// Test with multiple valid TLS ciphers. +TEST_F(RpcMgrTest, ValidMultiCiphersTls) { + const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER, + TLS1_0_COMPATIBLE_CIPHER_2); + auto cert_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT); + auto pkey_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY); + auto ca_flag = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT); + auto cipher = + ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, cipher_list); + + RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); + TNetworkAddress tls_krpc_address; + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + + int32_t tls_service_port = FindUnusedEphemeralPort(nullptr); + tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + + ASSERT_OK(tls_rpc_mgr.Init()); + ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address)); + tls_rpc_mgr.Shutdown(); } TEST_F(RpcMgrTest, SlowCallback) { http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc index d4e8fe1..c70c117 100644 --- a/be/src/rpc/rpc-mgr.cc +++ b/be/src/rpc/rpc-mgr.cc @@ -27,6 +27,7 @@ #include "util/auth-util.h" #include "util/cpu-info.h" #include "util/network-util.h" +#include "util/openssl-util.h" #include "common/names.h" @@ -43,6 +44,15 @@ using kudu::Sockaddr; DECLARE_string(hostname); DECLARE_string(principal); DECLARE_string(be_principal); +DECLARE_string(keytab_file); + +// Impala's TLS flags. +DECLARE_string(ssl_client_ca_certificate); +DECLARE_string(ssl_server_certificate); +DECLARE_string(ssl_private_key); +DECLARE_string(ssl_private_key_password_cmd); +DECLARE_string(ssl_cipher_list); +DECLARE_string(ssl_minimum_version); // Defined in kudu/rpc/rpcz_store.cc DECLARE_int32(rpc_duration_too_long_ms); @@ -82,9 +92,23 @@ Status RpcMgr::Init() { RETURN_IF_ERROR(ParseKerberosPrincipal(internal_principal, &service_name, &unused_hostname, &unused_realm)); bld.set_sasl_proto_name(service_name); - // TODO: Once the Messenger can take more options pertaining to 'rpc_authentication' - // and more, we need to explicitly set those options here. (KUDU-2228) + bld.set_rpc_authentication("required"); + bld.set_keytab_file(FLAGS_keytab_file); + } + + if (use_tls_) { + LOG (INFO) << "Initing RpcMgr with TLS"; + bld.set_epki_cert_key_files(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key); + bld.set_epki_certificate_authority_file(FLAGS_ssl_client_ca_certificate); + bld.set_epki_private_password_key_cmd(FLAGS_ssl_private_key_password_cmd); + if (!FLAGS_ssl_cipher_list.empty()) { + bld.set_rpc_tls_ciphers(FLAGS_ssl_cipher_list); + } + bld.set_rpc_tls_min_protocol(FLAGS_ssl_minimum_version); + bld.set_rpc_encryption("required"); + bld.enable_inbound_tls(); } + KUDU_RETURN_IF_ERROR(bld.Build(&messenger_), "Could not build messenger"); return Status::OK(); } @@ -97,8 +121,7 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu new ImpalaServicePool(std::move(service_ptr), messenger_->metric_entity(), service_queue_depth); // Start the thread pool first before registering the service in case the startup fails. - RETURN_IF_ERROR( - service_pool->Init(num_service_threads)); + RETURN_IF_ERROR(service_pool->Init(num_service_threads)); KUDU_RETURN_IF_ERROR( messenger_->RegisterService(service_pool->service_name(), service_pool), "Could not register service"); http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h index 26dbae0..b2099f2 100644 --- a/be/src/rpc/rpc-mgr.h +++ b/be/src/rpc/rpc-mgr.h @@ -97,6 +97,8 @@ namespace impala { /// port is configurable via FLAGS_acceptor_threads. class RpcMgr { public: + RpcMgr(bool use_tls = false) : use_tls_(use_tls) {} + /// Initializes the reactor threads, and prepares for sending outbound RPC requests. Status Init() WARN_UNUSED_RESULT; @@ -176,6 +178,10 @@ class RpcMgr { /// True after StartServices() completes. bool services_started_ = false; + + /// True if TLS is configured for communication between Impala backends. messenger_ will + /// be configured to use TLS if this is set. + const bool use_tls_; }; } // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/thrift-server.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc index 75ad424..ded710e 100644 --- a/be/src/rpc/thrift-server.cc +++ b/be/src/rpc/thrift-server.cc @@ -63,9 +63,6 @@ DEFINE_int32_hidden(rpc_cnxn_retry_interval_ms, 2000, "Deprecated"); DECLARE_string(principal); DECLARE_string(keytab_file); -DECLARE_string(ssl_client_ca_certificate); -DECLARE_string(ssl_server_certificate); -DECLARE_string(ssl_cipher_list); namespace impala { @@ -103,17 +100,6 @@ bool SSLProtoVersions::IsSupported(const SSLProtocol& protocol) { } } -bool EnableInternalSslConnections() { - // Enable SSL between servers only if both the client validation certificate and the - // server certificate are specified. 'Client' here means clients that are used by Impala - // services to contact other Impala services (as distinct from user clients of Impala - // like the shell), and 'servers' are the processes that serve those clients. The server - // needs a certificate to demonstrate it is who the client thinks it is; the client - // needs a certificate to validate that assertion from the server. - return !FLAGS_ssl_client_ca_certificate.empty() && - !FLAGS_ssl_server_certificate.empty(); -} - // Helper class that starts a server in a separate thread, and handles // the inter-thread communication to monitor whether it started // correctly. http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/thrift-server.h ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h index 588904f..d95d90e 100644 --- a/be/src/rpc/thrift-server.h +++ b/be/src/rpc/thrift-server.h @@ -341,10 +341,6 @@ struct SSLProtoVersions { static bool IsSupported(const apache::thrift::transport::SSLProtocol& protocol); }; -// Returns true if, per the process configuration flags, server<->server communications -// should use SSL. -bool EnableInternalSslConnections(); - } #endif http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 0e5b0f6..0551848 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -58,6 +58,7 @@ #include "util/memory-metrics.h" #include "util/metrics.h" #include "util/network-util.h" +#include "util/openssl-util.h" #include "util/parse-util.h" #include "util/pretty-printer.h" #include "util/thread-pool.h" @@ -180,7 +181,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port, VLOG_QUERY << "Using KRPC."; // KRPC relies on resolved IP address. It's set in StartServices(). krpc_address_.__set_port(krpc_port); - rpc_mgr_.reset(new RpcMgr()); + rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured())); stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get())); } else { stream_mgr_.reset(new DataStreamMgr(metrics_.get())); http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index a62130c..af79180 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -67,6 +67,7 @@ #include "util/impalad-metrics.h" #include "util/lineage-util.h" #include "util/network-util.h" +#include "util/openssl-util.h" #include "util/parse-util.h" #include "util/redactor.h" #include "util/runtime-profile-counters.h" @@ -2025,7 +2026,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, if (FLAGS_is_coordinator) exec_env_->frontend()->WaitForCatalog(); SSLProtocol ssl_version = SSLProtocol::TLSv1_0; - if (!FLAGS_ssl_server_certificate.empty() || EnableInternalSslConnections()) { + if (IsExternalTlsConfigured() || IsInternalTlsConfigured()) { RETURN_IF_ERROR( SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version)); } @@ -2041,7 +2042,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, ThriftServerBuilder be_builder("backend", be_processor, thrift_be_port); - if (EnableInternalSslConnections()) { + if (IsInternalTlsConfigured()) { LOG(INFO) << "Enabling SSL for backend"; be_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key) .pem_password_cmd(FLAGS_ssl_private_key_password_cmd) @@ -2067,7 +2068,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, beeswax_processor->setEventHandler(event_handler); ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port); - if (!FLAGS_ssl_server_certificate.empty()) { + if (IsExternalTlsConfigured()) { LOG(INFO) << "Enabling SSL for Beeswax"; builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key) .pem_password_cmd(FLAGS_ssl_private_key_password_cmd) @@ -2094,7 +2095,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port); - if (!FLAGS_ssl_server_certificate.empty()) { + if (IsExternalTlsConfigured()) { LOG(INFO) << "Enabling SSL for HiveServer2"; builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key) .pem_password_cmd(FLAGS_ssl_private_key_password_cmd) http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestore-subscriber.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc index 99da183..e58c177 100644 --- a/be/src/statestore/statestore-subscriber.cc +++ b/be/src/statestore/statestore-subscriber.cc @@ -32,8 +32,9 @@ #include "rpc/rpc-trace.h" #include "rpc/thrift-util.h" #include "statestore/statestore-service-client-wrapper.h" -#include "util/time.h" #include "util/debug-util.h" +#include "util/openssl-util.h" +#include "util/time.h" #include "common/names.h" @@ -197,7 +198,7 @@ Status StatestoreSubscriber::Start() { ThriftServerBuilder builder( "StatestoreSubscriber", processor, heartbeat_address_.port); - if (EnableInternalSslConnections()) { + if (IsInternalTlsConfigured()) { SSLProtocol ssl_version; RETURN_IF_ERROR( SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version)); http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index b135e38..0f72e58 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -29,6 +29,7 @@ #include "statestore/statestore-subscriber-client-wrapper.h" #include "util/debug-util.h" #include "util/logging-support.h" +#include "util/openssl-util.h" #include "util/time.h" #include "util/uid-util.h" #include "util/webserver.h" @@ -225,11 +226,11 @@ Statestore::Statestore(MetricGroup* metrics) update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0, FLAGS_statestore_update_tcp_timeout_seconds * 1000, FLAGS_statestore_update_tcp_timeout_seconds * 1000, "", - EnableInternalSslConnections())), + IsInternalTlsConfigured())), heartbeat_client_cache_(new StatestoreSubscriberClientCache(1, 0, FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, "", - EnableInternalSslConnections())), + IsInternalTlsConfigured())), thrift_iface_(new StatestoreThriftIf(this)), failure_detector_(new MissedHeartbeatFailureDetector( FLAGS_statestore_max_missed_heartbeats, http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestored-main.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc index 1a11237..633d449 100644 --- a/be/src/statestore/statestored-main.cc +++ b/be/src/statestore/statestored-main.cc @@ -31,6 +31,7 @@ #include "util/common-metrics.h" #include "util/debug-util.h" #include "util/metrics.h" +#include "util/openssl-util.h" #include "util/memory-metrics.h" #include "util/webserver.h" #include "util/default-path-handlers.h" @@ -91,7 +92,7 @@ int StatestoredMain(int argc, char** argv) { ThriftServer* server; ThriftServerBuilder builder("StatestoreService", processor, FLAGS_state_store_port); - if (EnableInternalSslConnections()) { + if (IsInternalTlsConfigured()) { SSLProtocol ssl_version; ABORT_IF_ERROR( SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version)); http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/testutil/in-process-servers.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc index 4817d7f..7ff44a8 100644 --- a/be/src/testutil/in-process-servers.cc +++ b/be/src/testutil/in-process-servers.cc @@ -27,6 +27,7 @@ #include "util/webserver.h" #include "util/default-path-handlers.h" #include "util/metrics.h" +#include "util/openssl-util.h" #include "runtime/exec-env.h" #include "service/impala-server.h" @@ -149,7 +150,7 @@ Status InProcessStatestore::Start() { new StatestoreServiceProcessor(statestore_->thrift_iface())); ThriftServerBuilder builder("StatestoreService", processor, statestore_port_); - if (EnableInternalSslConnections()) { + if (IsInternalTlsConfigured()) { LOG(INFO) << "Enabling SSL for Statestore"; builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key); } http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/util/openssl-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/openssl-util.cc b/be/src/util/openssl-util.cc index 264a49a..69dc676 100644 --- a/be/src/util/openssl-util.cc +++ b/be/src/util/openssl-util.cc @@ -31,6 +31,11 @@ #include "common/names.h" +DECLARE_string(ssl_client_ca_certificate); +DECLARE_string(ssl_server_certificate); +DECLARE_string(ssl_private_key); +DECLARE_string(ssl_cipher_list); + namespace impala { // Counter to track the number of encryption keys generated. Incremented before each key @@ -47,6 +52,24 @@ int MaxSupportedTlsVersion() { return SSLv23_method()->version; } +bool IsInternalTlsConfigured() { + // Enable SSL between servers only if both the client validation certificate and the + // server certificate are specified. 'Client' here means clients that are used by Impala + // services to contact other Impala services (as distinct from user clients of Impala + // like the shell), and 'servers' are the processes that serve those clients. The server + // needs a certificate (FLAGS_ssl_server_certificate) to demonstrate it is who the + // client thinks it is; the client needs a certificate (FLAGS_ssl_client_ca_certificate) + // to validate that assertion from the server. + return !FLAGS_ssl_client_ca_certificate.empty() && + !FLAGS_ssl_server_certificate.empty() && !FLAGS_ssl_private_key.empty(); +} + +bool IsExternalTlsConfigured() { + // If the ssl_server_certificate is set, then external TLS is configured, i.e. external + // clients can talk to Impala at least over unauthenticated TLS. + return !FLAGS_ssl_server_certificate.empty() && !FLAGS_ssl_private_key.empty(); +} + // Callback used by OpenSSLErr() - write the error given to us through buf to the // stringstream that's passed in through ctx. static int OpenSSLErrCallback(const char* buf, size_t len, void* ctx) { http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/util/openssl-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/openssl-util.h b/be/src/util/openssl-util.h index 67d014d..7b1b28e 100644 --- a/be/src/util/openssl-util.h +++ b/be/src/util/openssl-util.h @@ -47,6 +47,14 @@ namespace impala { /// Returns the maximum supported TLS version available in the linked OpenSSL library. int MaxSupportedTlsVersion(); +/// Returns true if, per the process configuration flags, server<->server communications +/// should use TLS. +bool IsInternalTlsConfigured(); + +/// Returns true if, per the process configuration flags, client<->server communications +/// should use TLS. +bool IsExternalTlsConfigured(); + /// Add entropy from the system RNG to OpenSSL's global RNG. Called at system startup /// and again periodically to add new entropy. void SeedOpenSSLRNG(); @@ -138,6 +146,7 @@ class EncryptionKey { /// Cipher Mode AES_CIPHER_MODE mode_; }; + } #endif