This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit eda2aa55530b9dfd0888790081e0df3335bc1a82 Author: wzhou-code <[email protected]> AuthorDate: Mon Mar 21 16:51:18 2022 -0700 IMPALA-11129: Support running KRPC over Unix Domain Socket This patch make following changes to support running KRPC over UDS. - Add FLAGS_rpc_use_unix_domain_socket to enable running KRPC over UDS. Add FLAGS_uds_address_unique_id to specify unique Id for UDS address. It could be 'ip_address', 'backend_id', or 'none'. - Add variable uds_address in NetworkAddressPB and TNetworkAddress. Replace TNetworkAddress with NetworkAddressPB for KRPC related class variables and APIs. - Set UDS address for each daemon as @impala-kprc:<unique_id> during initialization with unique_id specified by starting flag FLAGS_uds_address_unique_id. - When FLAG_rpc_use_unix_domain_socket is true, the socket of KRPC server will be binded to the UDS address of the daemon. KRPC Client will connect to KRPC server with the UDS address of the server when creating proxy service, which in turn call kudu::Socket::Connect() function to connect KRPC server. - rpcz Web page show TCP related stats as 'N/A' when using UDS. Show remote UDS address for KRPC inbound connections on rpcz Web page as '*' when using UDS since the remote UDS addresses are not available. - Add new unit-tests for UDS. - BackendId of admissiond is not available. Use admissiond's IP address as unique ID for UDS. TODO: Advertise BackendId of admissiond in global admission control mode. Testing: - Passed core test with FLAG_rpc_use_unix_domain_socket as fault value false. - Passed core test with FLAG_rpc_use_unix_domain_socket as true. Change-Id: I439f5a03eb425c17451bcaa96a154bb0bca17ee7 Reviewed-on: http://gerrit.cloudera.org:8080/18369 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/benchmarks/expr-benchmark.cc | 2 +- be/src/rpc/impala-service-pool.cc | 6 +- be/src/rpc/impala-service-pool.h | 2 +- be/src/rpc/rpc-mgr-kerberized-test.cc | 24 +++-- be/src/rpc/rpc-mgr-test.cc | 72 ++++++++------- be/src/rpc/rpc-mgr-test.h | 17 ++-- be/src/rpc/rpc-mgr.cc | 57 ++++++++++-- be/src/rpc/rpc-mgr.h | 25 ++++-- be/src/rpc/rpc-mgr.inline.h | 13 +-- be/src/runtime/coordinator-backend-state.cc | 12 +-- be/src/runtime/data-stream-test.cc | 14 ++- be/src/runtime/exec-env.cc | 17 ++-- be/src/runtime/exec-env.h | 10 +-- be/src/runtime/krpc-data-stream-mgr.cc | 2 +- be/src/runtime/krpc-data-stream-sender.cc | 3 +- be/src/runtime/query-driver.cc | 3 +- be/src/runtime/query-state.cc | 5 +- be/src/runtime/runtime-filter-bank.cc | 3 +- be/src/runtime/test-env.cc | 10 ++- be/src/runtime/tmp-file-mgr.cc | 4 +- be/src/scheduling/admissiond-env.cc | 11 +-- be/src/scheduling/admissiond-env.h | 3 + be/src/service/client-request-state.cc | 45 ++++++++-- be/src/service/control-service.cc | 2 +- be/src/service/control-service.h | 3 +- be/src/service/data-stream-service.cc | 4 +- be/src/service/data-stream-service.h | 3 +- be/src/service/impala-server.cc | 14 +-- be/src/service/impala-server.h | 2 +- be/src/testutil/in-process-servers.cc | 14 +++ be/src/util/container-util.h | 4 +- be/src/util/network-util.cc | 72 +++++++++++++-- be/src/util/network-util.h | 17 +++- common/protobuf/common.proto | 9 ++ common/thrift/Types.thrift | 5 ++ tests/custom_cluster/test_krpc_socket.py | 123 ++++++++++++++++++++++++++ tests/custom_cluster/test_restart_services.py | 67 ++++++++++++++ tests/webserver/test_web_pages.py | 23 +++-- www/rpcz.tmpl | 36 +++++--- 39 files changed, 597 insertions(+), 161 deletions(-) diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc index d850a5771..be738a76c 100644 --- a/be/src/benchmarks/expr-benchmark.cc +++ b/be/src/benchmarks/expr-benchmark.cc @@ -98,7 +98,7 @@ class Planner { query_ctx.client_request.__set_stmt(query); query_ctx.client_request.__set_query_options(query_options_); string dummy_hostname = ""; - TNetworkAddress dummy_addr; + NetworkAddressPB dummy_addr; ImpalaServer::PrepareQueryContext(dummy_hostname, dummy_addr, &query_ctx); RuntimeState* state = pool_.Add(new RuntimeState(query_ctx, &exec_env_)); diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc index e204f4387..2f774bd0e 100644 --- a/be/src/rpc/impala-service-pool.cc +++ b/be/src/rpc/impala-service-pool.cc @@ -59,14 +59,14 @@ const char * ImpalaServicePool::RPC_QUEUE_OVERFLOW_METRIC_KEY = ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity, int service_queue_length, kudu::rpc::GeneratedServiceIf* service, - MemTracker* service_mem_tracker, const TNetworkAddress& address, + MemTracker* service_mem_tracker, const NetworkAddressPB& address, MetricGroup* rpc_metrics) : service_mem_tracker_(service_mem_tracker), service_(service), service_queue_(service_queue_length), incoming_queue_time_(METRIC_impala_incoming_queue_time.Instantiate(entity)), - hostname_(address.hostname), - port_(SimpleItoa(address.port)) { + hostname_(address.hostname()), + port_(SimpleItoa(address.port())) { DCHECK(service_mem_tracker_ != nullptr); const TMetricDef& overflow_metric_def = MetricDefs::Get(RPC_QUEUE_OVERFLOW_METRIC_KEY, service_->service_name()); diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h index 5431111d0..6354e64d3 100644 --- a/be/src/rpc/impala-service-pool.h +++ b/be/src/rpc/impala-service-pool.h @@ -49,7 +49,7 @@ class ImpalaServicePool : public kudu::rpc::RpcService { /// 'address' is the ip address and port that 'service' runs on. ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity, int service_queue_length, kudu::rpc::GeneratedServiceIf* service, - MemTracker* service_mem_tracker, const TNetworkAddress& address, + MemTracker* service_mem_tracker, const NetworkAddressPB& address, MetricGroup* rpc_metrics); virtual ~ImpalaServicePool(); diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc index b803206a4..763dbdb9d 100644 --- a/be/src/rpc/rpc-mgr-kerberized-test.cc +++ b/be/src/rpc/rpc-mgr-kerberized-test.cc @@ -68,18 +68,19 @@ class RpcMgrKerberizedTest : public RpcMgrTest { } }; -TEST_F(RpcMgrKerberizedTest, MultipleServicesTls) { +TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) { // TODO: We're starting a seperate RpcMgr here instead of configuring // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce // new gtest params to turn on TLS which needs to be a coordinated change across // rpc-mgr-test and thrift-server-test. RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); // Enable TLS. ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT); @@ -91,7 +92,7 @@ TEST_F(RpcMgrKerberizedTest, MultipleServicesTls) { // This test aims to exercise the authorization function in RpcMgr by accessing // services with a principal different from FLAGS_be_principal. -TEST_F(RpcMgrKerberizedTest, AuthorizationFail) { +TEST_P(RpcMgrKerberizedTest, AuthorizationFail) { GeneratedServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_)); GeneratedServiceIf* scan_mem_impl = @@ -145,7 +146,7 @@ TEST_F(RpcMgrKerberizedTest, AuthorizationFail) { } // Test cases in which bad Kerberos credentials cache path is specified. -TEST_F(RpcMgrKerberizedTest, BadCredentialsCachePath) { +TEST_P(RpcMgrKerberizedTest, BadCredentialsCachePath) { FLAGS_krb5_ccname = "MEMORY:foo"; Status status = InitAuth(CURRENT_EXECUTABLE_PATH); ASSERT_TRUE(!status.ok()); @@ -166,7 +167,7 @@ TEST_F(RpcMgrKerberizedTest, BadCredentialsCachePath) { } // Test cases in which bad keytab path is specified. -TEST_F(RpcMgrKerberizedTest, BadKeytabPath) { +TEST_P(RpcMgrKerberizedTest, BadKeytabPath) { FLAGS_keytab_file = "non_existent_file_for_testing"; Status status = InitAuth(CURRENT_EXECUTABLE_PATH); ASSERT_TRUE(!status.ok()); @@ -183,7 +184,7 @@ TEST_F(RpcMgrKerberizedTest, BadKeytabPath) { // Test that configurations are passed through via env variables even if kerberos // is disabled for internal auth (i.e. --principal is not set). -TEST_F(RpcMgrKerberizedTest, DisabledKerberosConfigs) { +TEST_P(RpcMgrKerberizedTest, DisabledKerberosConfigs) { // These flags are reset in Setup, so just overwrite them. FLAGS_principal = FLAGS_be_principal = ""; FLAGS_keytab_file = "/tmp/DisabledKerberosConfigsKeytab"; @@ -222,7 +223,7 @@ TEST_F(RpcMgrKerberizedTest, DisabledKerberosConfigs) { // Test that we kinit even with --skip_internal_kerberos_auth and // --skip_external_kerberos_auth set. We do this indirectly by checking for // kinit success/failure. -TEST_F(RpcMgrKerberizedTest, KinitWhenIncomingAuthDisabled) { +TEST_P(RpcMgrKerberizedTest, KinitWhenIncomingAuthDisabled) { auto ia = ScopedFlagSetter<bool>::Make(&FLAGS_skip_internal_kerberos_auth, true); auto ea = @@ -247,7 +248,7 @@ TEST_F(RpcMgrKerberizedTest, KinitWhenIncomingAuthDisabled) { // This test confirms that auth is bypassed on KRPC services when // --skip_external_kerberos_auth=true -TEST_F(RpcMgrKerberizedTest, InternalAuthorizationSkip) { +TEST_P(RpcMgrKerberizedTest, InternalAuthorizationSkip) { auto ia = ScopedFlagSetter<bool>::Make(&FLAGS_skip_internal_kerberos_auth, true); GeneratedServiceIf* ping_impl = @@ -281,6 +282,11 @@ TEST_F(RpcMgrKerberizedTest, InternalAuthorizationSkip) { EXPECT_OK(FromKuduStatus(ping_proxy->Ping(ping_request, &ping_response, &controller))); } +// Run tests with Unix domain socket and TCP socket by setting +// FLAGS_rpc_use_unix_domain_socket as true and false. +INSTANTIATE_TEST_CASE_P( + UdsOnAndOff, RpcMgrKerberizedTest, ::testing::Values(true, false)); + } // namespace impala using impala::Status; diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc index 8ea6f3f38..242b6b837 100644 --- a/be/src/rpc/rpc-mgr-test.cc +++ b/be/src/rpc/rpc-mgr-test.cc @@ -37,18 +37,19 @@ DECLARE_string(debug_actions); namespace impala { // Test multiple services managed by an Rpc Manager using TLS. -TEST_F(RpcMgrTest, MultipleServicesTls) { +TEST_P(RpcMgrTest, MultipleServicesTls) { // TODO: We're starting a separate RpcMgr here instead of configuring // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce // new gtest params to turn on TLS which needs to be a coordinated change across // rpc-mgr-test and thrift-server-test. RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT); ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); @@ -58,55 +59,58 @@ TEST_F(RpcMgrTest, MultipleServicesTls) { } // Test multiple services managed by an Rpc Manager. -TEST_F(RpcMgrTest, MultipleServices) { +TEST_P(RpcMgrTest, MultipleServices) { ASSERT_OK(RunMultipleServicesTest(&rpc_mgr_, krpc_address_)); } // Test with a misconfigured TLS certificate and verify that an error is thrown. -TEST_F(RpcMgrTest, BadCertificateTls) { +TEST_P(RpcMgrTest, BadCertificateTls) { ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, "unknown"); RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok()); tls_rpc_mgr.Shutdown(); } // Test with a bad password command for the password protected private key. -TEST_F(RpcMgrTest, BadPasswordTls) { +TEST_P(RpcMgrTest, BadPasswordTls) { ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT, "echo badpassword"); RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok()); tls_rpc_mgr.Shutdown(); } // Test with a correct password command for the password protected private key. -TEST_F(RpcMgrTest, CorrectPasswordTls) { +TEST_P(RpcMgrTest, CorrectPasswordTls) { ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT, "echo password"); RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address)); @@ -115,16 +119,17 @@ TEST_F(RpcMgrTest, CorrectPasswordTls) { #ifndef __aarch64__ // Test with a bad TLS cipher and verify that an error is thrown. -TEST_F(RpcMgrTest, BadCiphersTls) { +TEST_P(RpcMgrTest, BadCiphersTls) { ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", "not_a_cipher"); RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok()); tls_rpc_mgr.Shutdown(); @@ -132,17 +137,18 @@ TEST_F(RpcMgrTest, BadCiphersTls) { #endif // Test with a valid TLS cipher. -TEST_F(RpcMgrTest, ValidCiphersTls) { +TEST_P(RpcMgrTest, ValidCiphersTls) { ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", TLS1_0_COMPATIBLE_CIPHER); RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address)); @@ -150,18 +156,19 @@ TEST_F(RpcMgrTest, ValidCiphersTls) { } // Test with multiple valid TLS ciphers. -TEST_F(RpcMgrTest, ValidMultiCiphersTls) { +TEST_P(RpcMgrTest, ValidMultiCiphersTls) { const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER, TLS1_0_COMPATIBLE_CIPHER_2); ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", cipher_list); RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress tls_krpc_address; + NetworkAddressPB tls_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t tls_service_port = FindUnusedEphemeralPort(); - tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); + tls_krpc_address = + MakeNetworkAddressPB(ip, tls_service_port, tls_rpc_mgr.GetUdsAddressUniqueId()); ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address)); @@ -169,7 +176,7 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) { } // Test behavior with a slow service. -TEST_F(RpcMgrTest, SlowCallback) { +TEST_P(RpcMgrTest, SlowCallback) { // Use a callback which is slow to respond. auto slow_cb = [](RpcContext* ctx) { SleepForMs(300); @@ -207,7 +214,7 @@ TEST_F(RpcMgrTest, SlowCallback) { } // Test async calls. -TEST_F(RpcMgrTest, AsyncCall) { +TEST_P(RpcMgrTest, AsyncCall) { GeneratedServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_)); ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl, @@ -243,17 +250,18 @@ TEST_F(RpcMgrTest, AsyncCall) { // Run a test with the negotiation timeout as 0 ms and ensure that connection // establishment fails. // This is to verify that FLAGS_rpc_negotiation_timeout_ms is actually effective. -TEST_F(RpcMgrTest, NegotiationTimeout) { +TEST_P(RpcMgrTest, NegotiationTimeout) { // Set negotiation timeout to 0 milliseconds. auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_rpc_negotiation_timeout_ms, 0); RpcMgr secondary_rpc_mgr(IsInternalTlsConfigured()); - TNetworkAddress secondary_krpc_address; + NetworkAddressPB secondary_krpc_address; IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); int32_t secondary_service_port = FindUnusedEphemeralPort(); - secondary_krpc_address = MakeNetworkAddress(ip, secondary_service_port); + secondary_krpc_address = MakeNetworkAddressPB( + ip, secondary_service_port, secondary_rpc_mgr.GetUdsAddressUniqueId()); ASSERT_OK(secondary_rpc_mgr.Init(secondary_krpc_address)); ASSERT_FALSE(RunMultipleServicesTest(&secondary_rpc_mgr, secondary_krpc_address).ok()); @@ -261,7 +269,7 @@ TEST_F(RpcMgrTest, NegotiationTimeout) { } // Test RpcMgr::DoRpcWithRetry using a fake proxy. -TEST_F(RpcMgrTest, DoRpcWithRetry) { +TEST_P(RpcMgrTest, DoRpcWithRetry) { TQueryCtx query_ctx; const int num_retries = 10; const int64_t timeout_ms = 10 * MILLIS_PER_SEC; @@ -292,7 +300,7 @@ TEST_F(RpcMgrTest, DoRpcWithRetry) { } // Test RpcMgr::DoRpcWithRetry by injecting service-too-busy failures. -TEST_F(RpcMgrTest, BusyService) { +TEST_P(RpcMgrTest, BusyService) { TQueryCtx query_ctx; auto cb = [](RpcContext* ctx) { ctx->RespondSuccess(); }; GeneratedServiceIf* ping_impl = @@ -325,7 +333,7 @@ TEST_F(RpcMgrTest, BusyService) { // service is too busy. auto s = ScopedFlagSetter<string>::Make(&FLAGS_debug_actions, Substitute("IMPALA_SERVICE_POOL:$0:$1:Ping:[email protected]@REJECT_TOO_BUSY", - krpc_address_.hostname, krpc_address_.port)); + krpc_address_.hostname(), krpc_address_.port())); PingRequestPB request; PingResponsePB response; const int64_t timeout_ms = 10 * MILLIS_PER_SEC; @@ -342,6 +350,10 @@ TEST_F(RpcMgrTest, BusyService) { ASSERT_GT(overflow_metric->GetValue(), 0); } +// Run tests with Unix domain socket and TCP socket by setting +// FLAGS_rpc_use_unix_domain_socket as true and false. +INSTANTIATE_TEST_CASE_P(UdsOnAndOff, RpcMgrTest, ::testing::Values(true, false)); + } // namespace impala int main(int argc, char** argv) { diff --git a/be/src/rpc/rpc-mgr-test.h b/be/src/rpc/rpc-mgr-test.h index e35e8e363..20caceed5 100644 --- a/be/src/rpc/rpc-mgr-test.h +++ b/be/src/rpc/rpc-mgr-test.h @@ -51,6 +51,7 @@ using kudu::Slice; using namespace std; +DECLARE_bool(rpc_use_unix_domain_socket); DECLARE_int32(num_reactor_threads); DECLARE_int32(num_acceptor_threads); DECLARE_string(hostname); @@ -109,7 +110,7 @@ const string TLS1_0_COMPATIBLE_CIPHER_2 = "AES256-SHA"; #define PAYLOAD_SIZE (4096) -class RpcMgrTest : public testing::Test { +class RpcMgrTest : public testing::TestWithParam<bool> { public: // Utility function to initialize the parameter for ScanMem RPC. // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar @@ -126,16 +127,18 @@ class RpcMgrTest : public testing::Test { } // Utility function which alternately makes requests to PingService and ScanMemService. - Status RunMultipleServicesTest(RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address); + Status RunMultipleServicesTest(RpcMgr* rpc_mgr, const NetworkAddressPB& krpc_address); protected: - TNetworkAddress krpc_address_; + NetworkAddressPB krpc_address_; RpcMgr rpc_mgr_; virtual void SetUp() { + FLAGS_rpc_use_unix_domain_socket = GetParam(); IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); - krpc_address_ = MakeNetworkAddress(ip, FindUnusedEphemeralPort()); + krpc_address_ = MakeNetworkAddressPB( + ip, FindUnusedEphemeralPort(), rpc_mgr_.GetUdsAddressUniqueId()); exec_env_.reset(new ExecEnv()); ASSERT_OK(rpc_mgr_.Init(krpc_address_)); } @@ -173,7 +176,7 @@ class PingServiceImpl : public PingServiceIf { mem_tracker_(-1, "Ping Service"), cb_(cb) {} - Status GetProxy(const TNetworkAddress& address, const std::string& hostname, + Status GetProxy(const NetworkAddressPB& address, const std::string& hostname, std::unique_ptr<PingServiceProxy>* proxy) { return rpc_mgr_->GetProxy(address, hostname, proxy); } @@ -219,7 +222,7 @@ class ScanMemServiceImpl : public ScanMemServiceIf { mem_tracker_(-1, "ScanMem Service") { } - Status GetProxy(const TNetworkAddress& address, const std::string& hostname, + Status GetProxy(const NetworkAddressPB& address, const std::string& hostname, std::unique_ptr<ScanMemServiceProxy>* proxy) { return rpc_mgr_->GetProxy(address, hostname, proxy); } @@ -284,7 +287,7 @@ class FailingPingServiceProxy { }; Status RpcMgrTest::RunMultipleServicesTest( - RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) { + RpcMgr* rpc_mgr, const NetworkAddressPB& krpc_address) { // Test that a service can be started, and will respond to requests. GeneratedServiceIf* ping_impl = TakeOverService( make_unique<PingServiceImpl>(rpc_mgr)); diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc index 761b7f97e..74795a487 100644 --- a/be/src/rpc/rpc-mgr.cc +++ b/be/src/rpc/rpc-mgr.cc @@ -93,10 +93,44 @@ DEFINE_int32(rpc_negotiation_thread_count, 64, DEFINE_bool(rpc_use_loopback, false, "Always use loopback for local connections. This requires binding to all addresses, " "not just the KRPC address."); +// Cannot set rpc_use_unix_domain_socket as true when rpc_use_loopback is set as true. +DEFINE_bool(rpc_use_unix_domain_socket, false, + "Whether the KRPC client and server should use Unix domain socket. If enabled, " + "each daemon is identified with Unix Domain Socket address in the unique name in " + "\"Abstract Namespace\", in format @impala-krpc:<BackendId>. The KRPC server bind " + "to a Unix domain socket. KRPC Client attempt to connect to KRPC server via a Unix " + "domain socket."); +DEFINE_string(uds_address_unique_id, "ip_address", + "Specify unique Id for UDS address. It could be \"ip_address\", \"backend_id\", or " + "\"none\""); namespace impala { -Status RpcMgr::Init(const TNetworkAddress& address) { +RpcMgr::RpcMgr(bool use_tls) : use_tls_(use_tls) { + krpc_use_uds_ = FLAGS_rpc_use_unix_domain_socket; + if (krpc_use_uds_ && FLAGS_rpc_use_loopback) { + LOG(WARNING) << "Cannot use Unix Domain Socket when using loopback address."; + krpc_use_uds_ = false; + } + if (krpc_use_uds_) { + if (strcasecmp("backend_id", FLAGS_uds_address_unique_id.c_str()) == 0) { + uds_addr_unique_id_ = UdsAddressUniqueIdPB::BACKEND_ID; + } else if (strcasecmp("none", FLAGS_uds_address_unique_id.c_str()) == 0) { + uds_addr_unique_id_ = UdsAddressUniqueIdPB::NO_UNIQUE_ID; + } else if (strcasecmp("ip_address", FLAGS_uds_address_unique_id.c_str()) == 0) { + uds_addr_unique_id_ = UdsAddressUniqueIdPB::IP_ADDRESS; + } else { + LOG(ERROR) << "Invalid unique Id for UDS address."; + uds_addr_unique_id_ = UdsAddressUniqueIdPB::NO_UNIQUE_ID; + invalid_uds_config_ = true; + } + } else { + uds_addr_unique_id_ = UdsAddressUniqueIdPB::NO_UNIQUE_ID; + } +} + +Status RpcMgr::Init(const NetworkAddressPB& address) { + if (krpc_use_uds_ && invalid_uds_config_) return Status("Invalid UDS configuration"); DCHECK(IsResolvedAddress(address)); address_ = address; @@ -204,11 +238,15 @@ Status RpcMgr::StartServices() { Sockaddr sockaddr = Sockaddr::Wildcard(); if (FLAGS_rpc_use_loopback) { // Listen on all addresses, including loopback. - sockaddr.set_port(address_.port); + sockaddr.set_port(address_.port()); DCHECK(sockaddr.IsWildcard()) << sockaddr.ToString(); } else { // Only listen on the canonical address for KRPC. - RETURN_IF_ERROR(TNetworkAddressToSockaddr(address_, &sockaddr)); + RETURN_IF_ERROR(NetworkAddressPBToSockaddr(address_, krpc_use_uds_, &sockaddr)); + if (krpc_use_uds_) { + // KRPC server bind to Unix domain socket if krpc_use_uds_ is true. + LOG(INFO) << "KRPC server bind to Unix domain socket: " << sockaddr.ToString(); + } } // Call the messenger to create an AcceptorPool for us. @@ -247,6 +285,9 @@ bool RpcMgr::IsServerTooBusy(const RpcController& rpc_controller) { void RpcMgr::ToJson(Document* document) { if (messenger_.get() == nullptr) return; + // Add rpc_use_unix_domain_socket. + document->AddMember( + "rpc_use_unix_domain_socket", krpc_use_uds_, document->GetAllocator()); // Add acceptor metrics. int64_t num_accepted = 0; if (acceptor_pool_.get() != nullptr) { @@ -263,9 +304,11 @@ void RpcMgr::ToJson(Document* document) { Value inbound_per_conn_metrics(kArrayType); for (const RpcConnectionPB& conn : response.inbound_connections()) { Value per_conn_metrics_entry(kObjectType); - Value remote_ip_str(conn.remote_ip().c_str(), document->GetAllocator()); + // Remote addresses for UDS inbound connections are not available. + Value remote_addr_str( + (!krpc_use_uds_ ? conn.remote_ip().c_str() : "*"), document->GetAllocator()); per_conn_metrics_entry.AddMember( - "remote_ip", remote_ip_str, document->GetAllocator()); + "remote_addr", remote_addr_str, document->GetAllocator()); per_conn_metrics_entry.AddMember( "num_calls_in_flight", conn.calls_in_flight().size(), document->GetAllocator()); num_inbound_calls_in_flight += conn.calls_in_flight().size(); @@ -297,9 +340,9 @@ void RpcMgr::ToJson(Document* document) { // Add per connection metrics to an array. Value per_conn_metrics_entry(kObjectType); - Value remote_ip_str(conn.remote_ip().c_str(), document->GetAllocator()); + Value remote_addr_str(conn.remote_ip().c_str(), document->GetAllocator()); per_conn_metrics_entry.AddMember( - "remote_ip", remote_ip_str, document->GetAllocator()); + "remote_addr", remote_addr_str, document->GetAllocator()); per_conn_metrics_entry.AddMember( "num_calls_in_flight", conn.calls_in_flight().size(), document->GetAllocator()); per_conn_metrics_entry.AddMember( diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h index b8de4e83e..a4e661d30 100644 --- a/be/src/rpc/rpc-mgr.h +++ b/be/src/rpc/rpc-mgr.h @@ -25,6 +25,7 @@ #include "rpc/impala-service-pool.h" #include "gen-cpp/Types_types.h" +#include "gen-cpp/common.pb.h" namespace kudu { namespace rpc { @@ -100,11 +101,15 @@ namespace impala { /// certificates, and encryption is enabled and marked as required. class RpcMgr { public: - RpcMgr(bool use_tls = false) : use_tls_(use_tls) {} + RpcMgr(bool use_tls = false); + + /// The following two are initialized in the constructor and always valid to call. + bool IsKrpcUsingUDS() const { return krpc_use_uds_; } + const UdsAddressUniqueIdPB& GetUdsAddressUniqueId() { return uds_addr_unique_id_; } /// Initializes the reactor threads, and prepares for sending outbound RPC requests. All /// services will be started on 'address', which must be a resolved IP address. - Status Init(const TNetworkAddress& address) WARN_UNUSED_RESULT; + Status Init(const NetworkAddressPB& address) WARN_UNUSED_RESULT; bool is_inited() const { return messenger_.get() != nullptr; } @@ -148,7 +153,7 @@ class RpcMgr { /// 'hostname' has to match the hostname used in the Kerberos principal of the /// destination host if Kerberos is enabled. 'P' must descend from kudu::rpc::Proxy. template <typename P> - Status GetProxy(const TNetworkAddress& address, const std::string& hostname, + Status GetProxy(const NetworkAddressPB& address, const std::string& hostname, std::unique_ptr<P>* proxy) WARN_UNUSED_RESULT; /// Wait until all reactor threads complete execution. @@ -227,8 +232,18 @@ class RpcMgr { /// be configured to use TLS if this is set. const bool use_tls_; - /// The host/port the rpc services are run on. - TNetworkAddress address_; + /// The network address the krpc services are run on. + NetworkAddressPB address_; + + /// The following three variables are initialized in the constructor and wouldn't be + /// changed. + /// True if use Unix Domain Socket for krpc. + bool krpc_use_uds_; + /// True if UDS configuration is invalid. + bool invalid_uds_config_ = false; + + /// Unique ID for setting UDS address. + UdsAddressUniqueIdPB uds_addr_unique_id_ = UdsAddressUniqueIdPB::IP_ADDRESS; }; } // namespace impala diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h index 8053f3b01..dd7d31720 100644 --- a/be/src/rpc/rpc-mgr.inline.h +++ b/be/src/rpc/rpc-mgr.inline.h @@ -35,19 +35,20 @@ namespace impala { /// Always inline to avoid having to provide a definition for each use type P. template <typename P> -Status RpcMgr::GetProxy(const TNetworkAddress& address, const std::string& hostname, +Status RpcMgr::GetProxy(const NetworkAddressPB& address, const std::string& hostname, std::unique_ptr<P>* proxy) { DCHECK(proxy != nullptr); DCHECK(is_inited()) << "Must call Init() before GetProxy()"; DCHECK(IsResolvedAddress(address)); - TNetworkAddress address_to_use = address; + NetworkAddressPB address_to_use = address; // Talk to self via loopback. - if (FLAGS_rpc_use_loopback && - address_to_use.hostname == ExecEnv::GetInstance()->krpc_address().hostname) { - address_to_use.__set_hostname(LOCALHOST_IP_STR); + if (FLAGS_rpc_use_loopback + && address_to_use.hostname() == ExecEnv::GetInstance()->krpc_address().hostname()) { + address_to_use.set_hostname(LOCALHOST_IP_STR); } kudu::Sockaddr sockaddr = kudu::Sockaddr::Wildcard(); - RETURN_IF_ERROR(TNetworkAddressToSockaddr(address_to_use, &sockaddr)); + // Connect to KRPC server via Unix domain socket if krpc_use_uds_ is true. + RETURN_IF_ERROR(NetworkAddressPBToSockaddr(address_to_use, krpc_use_uds_, &sockaddr)); proxy->reset(new P(messenger_, sockaddr, hostname)); // Always set the user credentials as Proxy ctor may fail in GetLoggedInUser(). diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 5b72ae946..c376bd7af 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -280,8 +280,8 @@ void Coordinator::BackendState::ExecAsync(const DebugOptions& debug_options, } std::unique_ptr<ControlServiceProxy> proxy; - Status get_proxy_status = ControlService::GetProxy( - FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy); + Status get_proxy_status = + ControlService::GetProxy(krpc_host_, host_.hostname(), &proxy); if (!get_proxy_status.ok()) { SetExecError(get_proxy_status, exec_status_barrier); goto done; @@ -660,8 +660,8 @@ Coordinator::BackendState::CancelResult Coordinator::BackendState::Cancel( VLogForBackend("Sending CancelQueryFInstances rpc"); std::unique_ptr<ControlServiceProxy> proxy; - Status get_proxy_status = ControlService::GetProxy( - FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy); + Status get_proxy_status = + ControlService::GetProxy(krpc_host_, host_.hostname(), &proxy); if (!get_proxy_status.ok()) { status_.MergeStatus(get_proxy_status); result.became_done = true; @@ -712,8 +712,8 @@ void Coordinator::BackendState::PublishFilter(FilterState* state, Status status; std::unique_ptr<DataStreamServiceProxy> proxy; - Status get_proxy_status = DataStreamService::GetProxy( - FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy); + Status get_proxy_status = + DataStreamService::GetProxy(krpc_host_, host_.hostname(), &proxy); if (!get_proxy_status.ok()) { // Failing to send a filter is not a query-wide error - the remote fragment will // continue regardless. diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index a8dd51a03..a167caf97 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -206,7 +206,9 @@ class DataStreamTest : public testing::Test { IpAddr ip; ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); - krpc_address_ = MakeNetworkAddress(ip, FLAGS_port); + UUIDToUniqueIdPB(boost::uuids::random_generator()(), &backend_id_); + krpc_address_ = MakeNetworkAddressPB( + ip, FLAGS_port, backend_id_, exec_env_->rpc_mgr()->GetUdsAddressUniqueId()); StartKrpcBackend(); } @@ -270,8 +272,11 @@ class DataStreamTest : public testing::Test { int next_val_; int64_t* tuple_mem_; + // Backend Id + UniqueIdPB backend_id_; + // Only used for KRPC. Not owned. - TNetworkAddress krpc_address_; + NetworkAddressPB krpc_address_; // The test service implementation. Owned by this class. unique_ptr<ImpalaKRPCTestBackend> test_service_; @@ -321,8 +326,9 @@ class DataStreamTest : public testing::Test { void GetNextInstanceId(TUniqueId* instance_id) { PlanFragmentDestinationPB* dest = dest_.Add(); *dest->mutable_fragment_instance_id() = next_instance_id_; - *dest->mutable_address() = MakeNetworkAddressPB("localhost", FLAGS_port); - *dest->mutable_krpc_backend() = FromTNetworkAddress(krpc_address_); + *dest->mutable_address() = MakeNetworkAddressPB("localhost", FLAGS_port, backend_id_, + exec_env_->rpc_mgr()->GetUdsAddressUniqueId()); + *dest->mutable_krpc_backend() = krpc_address_; UniqueIdPBToTUniqueId(next_instance_id_, instance_id); next_instance_id_.set_lo(next_instance_id_.lo() + 1); } diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 8a78d3524..f529b6466 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -225,9 +225,9 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port, ABORT_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address_)); // KRPC relies on resolved IP address. - krpc_address_.__set_hostname(ip_address_); - krpc_address_.__set_port(krpc_port); rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured())); + krpc_address_ = MakeNetworkAddressPB( + ip_address_, krpc_port, backend_id_, rpc_mgr_->GetUdsAddressUniqueId()); stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get())); request_pool_service_.reset(new RequestPoolService(metrics_.get())); @@ -266,13 +266,12 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port, } if (AdmissionServiceEnabled()) { - admission_service_address_ = - MakeNetworkAddress(FLAGS_admission_service_host, FLAGS_admission_service_port); - if (!IsResolvedAddress(admission_service_address_)) { - IpAddr ip; - ABORT_IF_ERROR(HostnameToIpAddr(FLAGS_admission_service_host, &ip)); - admission_service_address_ = MakeNetworkAddress(ip, FLAGS_admission_service_port); - } + IpAddr ip; + ABORT_IF_ERROR(HostnameToIpAddr(FLAGS_admission_service_host, &ip)); + // TODO: get BackendId of admissiond in global admission control mode. + // Use admissiond's IP address as unique ID for UDS now. + admission_service_address_ = MakeNetworkAddressPB( + ip, FLAGS_admission_service_port, UdsAddressUniqueIdPB::IP_ADDRESS); } exec_env_ = this; diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index a62440a23..de3bcc024 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -158,7 +158,7 @@ class ExecEnv { const IpAddr& ip_address() const { return ip_address_; } - const TNetworkAddress& krpc_address() const { return krpc_address_; } + const NetworkAddressPB& krpc_address() const { return krpc_address_; } /// Initializes the exec env for running FE tests. Status InitForFeSupport() WARN_UNUSED_RESULT; @@ -182,7 +182,7 @@ class ExecEnv { int64_t admit_mem_limit() const { return admit_mem_limit_; } int64_t admission_slots() const { return admission_slots_; } - const TNetworkAddress& admission_service_address() const { + const NetworkAddressPB& admission_service_address() const { return admission_service_address_; } @@ -256,8 +256,8 @@ class ExecEnv { /// Resolved IP address of the host name. IpAddr ip_address_; - /// IP address of the KRPC backend service: ip_address + krpc_port. - TNetworkAddress krpc_address_; + /// Address of the KRPC backend service: ip_address + krpc_port and UDS address. + NetworkAddressPB krpc_address_; /// fs.defaultFs value set in core-site.xml std::string default_fs_; @@ -292,7 +292,7 @@ class ExecEnv { /// If the admission control service is enabled, the resolved IP address and port where /// the service is running. - TNetworkAddress admission_service_address_; + NetworkAddressPB admission_service_address_; /// Initialize ExecEnv based on Hadoop config from frontend. Status InitHadoopConfig(); diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc index 96d6b6286..32a62d03d 100644 --- a/be/src/runtime/krpc-data-stream-mgr.cc +++ b/be/src/runtime/krpc-data-stream-mgr.cc @@ -366,7 +366,7 @@ void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextTyp TUniqueId finst_id; finst_id.__set_lo(request->dest_fragment_instance_id().lo()); finst_id.__set_hi(request->dest_fragment_instance_id().hi()); - string remote_addr = Substitute(" $0", ctx->rpc_context->remote_address().host()); + string remote_addr = Substitute(" $0", ctx->rpc_context->remote_address().ToString()); ErrorMsg msg(TErrorCode::DATASTREAM_SENDER_TIMEOUT, remote_addr, PrintId(finst_id), ctx->request->dest_node_id()); VLOG_QUERY << msg.msg(); diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index 37d6970df..3f278af52 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -373,8 +373,7 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) { batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker())); // Create a DataStreamService proxy to the destination. - RETURN_IF_ERROR( - DataStreamService::GetProxy(FromNetworkAddressPB(address_), hostname_, &proxy_)); + RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_)); return Status::OK(); } diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc index 636cf527d..0c9001dca 100644 --- a/be/src/runtime/query-driver.cc +++ b/be/src/runtime/query-driver.cc @@ -83,7 +83,8 @@ Status QueryDriver::SetExternalPlan( // Update coordinator related internal addresses in the external request exec_request_->query_exec_request.query_ctx.__set_coord_hostname( ExecEnv::GetInstance()->configured_backend_address().hostname); - const TNetworkAddress& address = ExecEnv::GetInstance()->krpc_address(); + const TNetworkAddress& address = + FromNetworkAddressPB(ExecEnv::GetInstance()->krpc_address()); DCHECK(IsResolvedAddress(address)); exec_request_->query_exec_request.query_ctx.__set_coord_ip_address(address); // Update local_time_zone in the external request diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index e7badc60a..548396732 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -269,8 +269,9 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params, RETURN_IF_ERROR(InitBufferPoolState()); // Initialize the RPC proxy once and report any error. - RETURN_IF_ERROR(ControlService::GetProxy( - query_ctx().coord_ip_address, query_ctx().coord_hostname, &proxy_)); + NetworkAddressPB coord_addr = FromTNetworkAddress(query_ctx().coord_ip_address); + RETURN_IF_ERROR( + ControlService::GetProxy(coord_addr, query_ctx().coord_hostname, &proxy_)); // don't copy query_ctx, it's large and we already did that in the c'tor exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx()); diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 7e01c0ee5..0752eaecd 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -283,7 +283,8 @@ void RuntimeFilterBank::UpdateFilterFromLocal( DCHECK_EQ(type, TRuntimeFilterType::IN_LIST); InListFilter::ToProtobuf(in_list_filter, params.mutable_in_list_filter()); } - const TNetworkAddress& krpc_address = query_state_->query_ctx().coord_ip_address; + const NetworkAddressPB& krpc_address = + FromTNetworkAddress(query_state_->query_ctx().coord_ip_address); const std::string& hostname = query_state_->query_ctx().coord_hostname; // Use 'proxy' to send the filter to the coordinator. diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index a3151443f..7c26931ab 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -32,6 +32,7 @@ #include "util/disk-info.h" #include "util/impalad-metrics.h" #include "util/memory-metrics.h" +#include "util/uid-util.h" #include "common/names.h" @@ -81,7 +82,7 @@ Status TestEnv::Init() { // Initialize RpcMgr and control service. IpAddr ip_address; RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address)); - exec_env_->krpc_address_.__set_hostname(ip_address); + exec_env_->krpc_address_.set_hostname(ip_address); RETURN_IF_ERROR(exec_env_->rpc_mgr_->Init(exec_env_->krpc_address_)); exec_env_->control_svc_.reset(new ControlService(exec_env_->rpc_metrics_)); RETURN_IF_ERROR(exec_env_->control_svc_->Init()); @@ -147,9 +148,10 @@ Status TestEnv::CreateQueryState( query_ctx.query_id.lo = query_id; query_ctx.request_pool = "test-pool"; query_ctx.coord_hostname = exec_env_->configured_backend_address_.hostname; - query_ctx.coord_ip_address = exec_env_->krpc_address_; - query_ctx.coord_backend_id.hi = 0; - query_ctx.coord_backend_id.lo = 0; + query_ctx.coord_ip_address = FromNetworkAddressPB(exec_env_->krpc_address_); + TUniqueId backend_id; + UniqueIdPBToTUniqueId(exec_env_->backend_id(), &backend_id); + query_ctx.__set_coord_backend_id(backend_id); TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options; int64_t mem_limit = query_options_to_use->__isset.mem_limit && query_options_to_use->mem_limit > 0 ? diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index ce3e21c6d..51828fbaf 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -1474,8 +1474,8 @@ void TmpFileGroup::WriteComplete( // instances, <port> is the BE krpc port (default 27000). const Status* p_write_status = &write_status; Status debug_status = DebugAction(debug_action_, "IMPALA_TMP_FILE_WRITE", - {ExecEnv::GetInstance()->krpc_address().hostname, - SimpleItoa(ExecEnv::GetInstance()->krpc_address().port)}); + {ExecEnv::GetInstance()->krpc_address().hostname(), + SimpleItoa(ExecEnv::GetInstance()->krpc_address().port())}); if (UNLIKELY(!debug_status.ok())) p_write_status = &debug_status; if (!p_write_status->ok()) { diff --git a/be/src/scheduling/admissiond-env.cc b/be/src/scheduling/admissiond-env.cc index 383eadf46..7c01cc916 100644 --- a/be/src/scheduling/admissiond-env.cc +++ b/be/src/scheduling/admissiond-env.cc @@ -88,12 +88,13 @@ Status AdmissiondEnv::Init() { http_handler_->RegisterHandlers(DaemonEnv::GetInstance()->webserver()); - string ip_address; + IpAddr ip_address; RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address)); - TNetworkAddress ip_addr_port; - ip_addr_port.__set_hostname(ip_address); - ip_addr_port.__set_port(FLAGS_admission_service_port); - RETURN_IF_ERROR(rpc_mgr_->Init(ip_addr_port)); + // TODO: advertise BackendId of admissiond to coordinators via heartbeats. + // Use admissiond's IP address as unique ID for UDS now. + krpc_address_ = MakeNetworkAddressPB( + ip_address, FLAGS_admission_service_port, UdsAddressUniqueIdPB::IP_ADDRESS); + RETURN_IF_ERROR(rpc_mgr_->Init(krpc_address_)); admission_control_svc_.reset( new AdmissionControlService(DaemonEnv::GetInstance()->metrics())); RETURN_IF_ERROR(admission_control_svc_->Init()); diff --git a/be/src/scheduling/admissiond-env.h b/be/src/scheduling/admissiond-env.h index e6efff451..6f2cd6dc2 100644 --- a/be/src/scheduling/admissiond-env.h +++ b/be/src/scheduling/admissiond-env.h @@ -65,6 +65,9 @@ class AdmissiondEnv { private: static AdmissiondEnv* admissiond_env_; + /// Address of the KRPC backend service: ip_address + krpc_port and UDS address. + NetworkAddressPB krpc_address_; + std::unique_ptr<AdmissionController> admission_controller_; std::unique_ptr<AdmissionControlService> admission_control_svc_; std::unique_ptr<ClusterMembershipMgr> cluster_membership_mgr_; diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index f39389b98..48c231dd4 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -42,6 +42,7 @@ #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" #include "scheduling/admission-control-client.h" +#include "scheduling/cluster-membership-mgr.h" #include "scheduling/scheduler.h" #include "service/frontend.h" #include "service/impala-server.h" @@ -861,21 +862,49 @@ Status ClientRequestState::ExecShutdownRequest() { << " to ip address, error: " << ip_status.GetDetail(); return ip_status; } - TNetworkAddress krpc_addr = MakeNetworkAddress(ip_address, port); - + // Find BackendId for the given remote ip address and port from cluster membership. + // The searching is not efficient, but Shutdown Requests are not called frequently. + // The BackendId is used to generate UDS address for Unix domain socket. Leave the + // Id value as 0 if it's not found in cluster membership. + // Note that UDS is only used when FLAGS_rpc_use_unix_domain_socket is set as true. + UniqueIdPB backend_id; + backend_id.set_hi(0); + backend_id.set_lo(0); + string krpc_error; + if (ExecEnv::GetInstance()->rpc_mgr()->IsKrpcUsingUDS()) { + if (ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId() + == UdsAddressUniqueIdPB::BACKEND_ID) { + ClusterMembershipMgr::SnapshotPtr membership_snapshot = + ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot(); + DCHECK(membership_snapshot.get() != nullptr); + for (const auto& it : membership_snapshot->current_backends) { + // Compare resolved IP addresses and ports. + if (it.second.ip_address() == ip_address && it.second.address().port() == port) { + DCHECK(it.second.has_backend_id()); + backend_id = it.second.backend_id(); + break; + } + } + } + krpc_error = "RemoteShutdown() RPC failed: Network error"; + } else { + krpc_error = "RemoteShutdown() RPC failed: Timed out: connection negotiation to"; + } + NetworkAddressPB krpc_addr = MakeNetworkAddressPB(ip_address, port, backend_id, + ExecEnv::GetInstance()->rpc_mgr()->GetUdsAddressUniqueId()); std::unique_ptr<ControlServiceProxy> proxy; Status get_proxy_status = ControlService::GetProxy(krpc_addr, request.backend.hostname, &proxy); if (!get_proxy_status.ok()) { return Status( Substitute("Could not get Proxy to ControlService at $0 with error: $1.", - TNetworkAddressToString(krpc_addr), get_proxy_status.msg().msg())); + NetworkAddressPBToString(krpc_addr), get_proxy_status.msg().msg())); } RemoteShutdownParamsPB params; if (request.__isset.deadline_s) params.set_deadline_s(request.deadline_s); RemoteShutdownResultPB resp; - VLOG_QUERY << "Sending Shutdown RPC to " << TNetworkAddressToString(krpc_addr); + VLOG_QUERY << "Sending Shutdown RPC to " << NetworkAddressPBToString(krpc_addr); const int num_retries = 3; const int64_t timeout_ms = 10 * MILLIS_PER_SEC; @@ -887,14 +916,12 @@ Status ClientRequestState::ExecShutdownRequest() { if (!rpc_status.ok()) { const string& msg = rpc_status.msg().msg(); VLOG_QUERY << "RemoteShutdown query_id= " << PrintId(query_id()) - << " failed to send RPC to " << TNetworkAddressToString(krpc_addr) << " :" + << " failed to send RPC to " << NetworkAddressPBToString(krpc_addr) << " :" << msg; string err_string = Substitute( - "Rpc to $0 failed with error '$1'", TNetworkAddressToString(krpc_addr), msg); + "Rpc to $0 failed with error '$1'", NetworkAddressPBToString(krpc_addr), msg); // Attempt to detect if the the failure is because of not using a KRPC port. - if (backend_port_specified - && msg.find("RemoteShutdown() RPC failed: Timed out: connection negotiation to") - != string::npos) { + if (backend_port_specified && msg.find(krpc_error) != string::npos) { // Prior to IMPALA-7985 :shutdown() used the backend port. err_string.append(" This may be because the port specified is wrong. You may have" " specified the backend (thrift) port which :shutdown() can no" diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc index 4e672c5c3..8e1dd2d19 100644 --- a/be/src/service/control-service.cc +++ b/be/src/service/control-service.cc @@ -89,7 +89,7 @@ Status ControlService::Init() { return Status::OK(); } -Status ControlService::GetProxy(const TNetworkAddress& address, const string& hostname, +Status ControlService::GetProxy(const NetworkAddressPB& address, const string& hostname, unique_ptr<ControlServiceProxy>* proxy) { // Create a ControlService proxy to the destination. RETURN_IF_ERROR(ExecEnv::GetInstance()->rpc_mgr()->GetProxy(address, hostname, proxy)); diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h index acd3ae037..2411fff6f 100644 --- a/be/src/service/control-service.h +++ b/be/src/service/control-service.h @@ -18,6 +18,7 @@ #ifndef IMPALA_SERVICE_CONTROL_SERVICE_H #define IMPALA_SERVICE_CONTROL_SERVICE_H +#include "gen-cpp/common.pb.h" #include "gen-cpp/control_service.service.h" #include "kudu/rpc/rpc_context.h" @@ -77,7 +78,7 @@ class ControlService : public ControlServiceIf { /// Gets a ControlService proxy to a server with 'address' and 'hostname'. /// The newly created proxy is returned in 'proxy'. Returns error status on failure. - static Status GetProxy(const TNetworkAddress& address, const std::string& hostname, + static Status GetProxy(const NetworkAddressPB& address, const std::string& hostname, std::unique_ptr<ControlServiceProxy>* proxy); private: diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc index e135098c0..4cb863585 100644 --- a/be/src/service/data-stream-service.cc +++ b/be/src/service/data-stream-service.cc @@ -82,8 +82,8 @@ Status DataStreamService::Init() { return Status::OK(); } -Status DataStreamService::GetProxy(const TNetworkAddress& address, const string& hostname, - unique_ptr<DataStreamServiceProxy>* proxy) { +Status DataStreamService::GetProxy(const NetworkAddressPB& address, + const string& hostname, unique_ptr<DataStreamServiceProxy>* proxy) { // Create a DataStreamService proxy to the destination. RETURN_IF_ERROR(ExecEnv::GetInstance()->rpc_mgr()->GetProxy(address, hostname, proxy)); (*proxy)->set_network_plane("datastream"); diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h index 24a224931..9b63e8541 100644 --- a/be/src/service/data-stream-service.h +++ b/be/src/service/data-stream-service.h @@ -18,6 +18,7 @@ #ifndef IMPALA_SERVICE_DATA_STREAM_SERVICE_H #define IMPALA_SERVICE_DATA_STREAM_SERVICE_H +#include "gen-cpp/common.pb.h" #include "gen-cpp/data_stream_service.service.h" #include "common/status.h" @@ -88,7 +89,7 @@ class DataStreamService : public DataStreamServiceIf { /// Gets a DataStreamService proxy to a server with 'address' and 'hostname'. /// The newly created proxy is returned in 'proxy'. Returns error status on failure. - static Status GetProxy(const TNetworkAddress& address, const std::string& hostname, + static Status GetProxy(const NetworkAddressPB& address, const std::string& hostname, std::unique_ptr<DataStreamServiceProxy>* proxy); private: diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index e4e66ea42..f58f3a9e4 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1279,8 +1279,8 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) { exec_env_->krpc_address(), query_ctx); } -void ImpalaServer::PrepareQueryContext( - const std::string& hostname, const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx) { +void ImpalaServer::PrepareQueryContext(const std::string& hostname, + const NetworkAddressPB& krpc_addr, TQueryCtx* query_ctx) { query_ctx->__set_pid(getpid()); int64_t now_us = UnixMicros(); const Timezone& utc_tz = TimezoneDatabase::GetUtcTimezone(); @@ -1308,7 +1308,7 @@ void ImpalaServer::PrepareQueryContext( } query_ctx->__set_start_unix_millis(now_us / MICROS_PER_MILLI); query_ctx->__set_coord_hostname(hostname); - query_ctx->__set_coord_ip_address(krpc_addr); + query_ctx->__set_coord_ip_address(FromNetworkAddressPB(krpc_addr)); TUniqueId backend_id; UniqueIdPBToTUniqueId(ExecEnv::GetInstance()->backend_id(), &backend_id); query_ctx->__set_coord_backend_id(backend_id); @@ -2308,7 +2308,9 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(BackendDescriptorPB* be_d *be_desc->mutable_backend_id() = exec_env_->backend_id(); *be_desc->mutable_address() = - FromTNetworkAddress(exec_env_->configured_backend_address()); + MakeNetworkAddressPB(exec_env_->configured_backend_address().hostname, + exec_env_->configured_backend_address().port, exec_env_->backend_id(), + exec_env_->rpc_mgr()->GetUdsAddressUniqueId()); be_desc->set_ip_address(exec_env_->ip_address()); be_desc->set_is_coordinator(FLAGS_is_coordinator); be_desc->set_is_executor(FLAGS_is_executor); @@ -2320,9 +2322,9 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(BackendDescriptorPB* be_d be_desc->set_secure_webserver(webserver->IsSecure()); } - const TNetworkAddress& krpc_address = exec_env_->krpc_address(); + const NetworkAddressPB& krpc_address = exec_env_->krpc_address(); DCHECK(IsResolvedAddress(krpc_address)); - *be_desc->mutable_krpc_address() = FromTNetworkAddress(krpc_address); + *be_desc->mutable_krpc_address() = krpc_address; be_desc->set_admit_mem_limit(exec_env_->admit_mem_limit()); be_desc->set_admission_slots(exec_env_->admission_slots()); diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 478874f65..bff9166d1 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -380,7 +380,7 @@ class ImpalaServer : public ImpalaServiceIf, /// Static helper for PrepareQueryContext() that is used from expr-benchmark. static void PrepareQueryContext(const std::string& hostname, - const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx); + const NetworkAddressPB& krpc_addr, TQueryCtx* query_ctx); /// SessionHandlerIf methods diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc index 7e72ff47a..3f4508791 100644 --- a/be/src/testutil/in-process-servers.cc +++ b/be/src/testutil/in-process-servers.cc @@ -38,12 +38,26 @@ DECLARE_string(ssl_server_certificate); DECLARE_string(ssl_private_key); DECLARE_int32(krpc_port); +DECLARE_bool(rpc_use_unix_domain_socket); using namespace apache::thrift; using namespace impala; Status InProcessImpalaServer::StartWithEphemeralPorts(const string& statestore_host, int statestore_port, InProcessImpalaServer** server) { + if (FLAGS_rpc_use_unix_domain_socket) { + // IMPALA-11129: This test utility function call WaitForServer() to wait KRPC server + // to start. WaitForServer(), which is defined in be/src/rpc/thrift-util.c, use + // TSocket (Thrift Socket) to connect KRPC server. Even TSocket provide a constructor + // with Unix Domain Socket address as parameter, but TSocket don't support UDS + // address in the form of name in "Abstract Namespace". So we have to disable KRPC + // running over UDS for this unit-test by setting FLAGS_rpc_use_unix_domain_socket + // as false. + // This function is called by backend unit tests: + // be/src/service/session-expiry-test.cc and be/src/exprs/expr-test.cc. + FLAGS_rpc_use_unix_domain_socket = false; + } + // This flag is read directly in several places to find the address of the backend // interface, so we must set it here. FLAGS_krpc_port = FindUnusedEphemeralPort(); diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h index c8900a414..01f586b0f 100644 --- a/be/src/util/container-util.h +++ b/be/src/util/container-util.h @@ -79,7 +79,7 @@ inline bool operator<(const UniqueIdPB& lhs, const UniqueIdPB& rhs) { } // TNetworkAddress -STATIC_ASSERT_SIZE(TNetworkAddress, 48); +STATIC_ASSERT_SIZE(TNetworkAddress, 88); inline bool operator==(const TNetworkAddress& lhs, const TNetworkAddress& rhs) { return std::tie(lhs.hostname, lhs.port) == std::tie(rhs.hostname, rhs.port); @@ -90,7 +90,7 @@ inline bool operator!=(const TNetworkAddress& lhs, const TNetworkAddress& rhs) { } // NetworkAddressPB -STATIC_ASSERT_SIZE(NetworkAddressPB, 40); +STATIC_ASSERT_SIZE(NetworkAddressPB, 48); inline bool operator==(const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) { return lhs.hostname() == rhs.hostname() && lhs.port() == rhs.port(); diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index 5a7918230..484e1be2b 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -27,11 +27,14 @@ #include <random> #include <vector> #include <boost/algorithm/string.hpp> +#include <boost/uuid/uuid.hpp> +#include <boost/uuid/uuid_generators.hpp> #include "exec/kudu-util.h" #include "kudu/util/net/sockaddr.h" #include "util/debug-util.h" #include "util/error-util.h" +#include "util/uid-util.h" #include <util/string-parser.h> #include "common/names.h" @@ -169,6 +172,54 @@ NetworkAddressPB MakeNetworkAddressPB(const string& hostname, int port) { return ret; } +string GetUDSAddress(const std::string& hostname, int port, const UniqueIdPB& backend_id, + const UdsAddressUniqueIdPB& uds_addr_unique_id) { + stringstream ss; + switch (uds_addr_unique_id) { + case UdsAddressUniqueIdPB::IP_ADDRESS: { + string ip_addr = hostname; + kudu::Sockaddr sock; + // Check if the hostname is resolved IP address. + if (!sock.ParseString(hostname, port).ok()) { + IpAddr ip; + Status status = HostnameToIpAddr(hostname, &ip); + if (status.ok()) ip_addr = ip; + } + ss << "@impala-krpc:" << ip_addr << ":" << port; + break; + } + case UdsAddressUniqueIdPB::BACKEND_ID: + ss << "@impala-krpc:" << PrintId(backend_id); + break; + case UdsAddressUniqueIdPB::NO_UNIQUE_ID: + ss << "@impala-krpc"; + break; + } + return ss.str(); +} + +NetworkAddressPB MakeNetworkAddressPB(const std::string& hostname, int port, + const UniqueIdPB& backend_id, const UdsAddressUniqueIdPB& uds_addr_unique_id) { + NetworkAddressPB ret; + ret.set_hostname(hostname); + ret.set_port(port); + ret.set_uds_address(GetUDSAddress(hostname, port, backend_id, uds_addr_unique_id)); + return ret; +} + +NetworkAddressPB MakeNetworkAddressPB(const std::string& hostname, int port, + const UdsAddressUniqueIdPB& uds_addr_unique_id) { + NetworkAddressPB ret; + UniqueIdPB backend_id; + if (uds_addr_unique_id == UdsAddressUniqueIdPB::BACKEND_ID) { + UUIDToUniqueIdPB(boost::uuids::random_generator()(), &backend_id); + } + ret.set_hostname(hostname); + ret.set_port(port); + ret.set_uds_address(GetUDSAddress(hostname, port, backend_id, uds_addr_unique_id)); + return ret; +} + bool IsWildcardAddress(const string& ipaddress) { return ipaddress == "0.0.0.0"; } @@ -189,6 +240,7 @@ TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address) { TNetworkAddress t_address; t_address.__set_hostname(address.hostname()); t_address.__set_port(address.port()); + t_address.__set_uds_address(address.uds_address()); return t_address; } @@ -196,6 +248,7 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& address) { NetworkAddressPB address_pb; address_pb.set_hostname(address.hostname); address_pb.set_port(address.port); + address_pb.set_uds_address(address.uds_address); return address_pb; } @@ -225,13 +278,18 @@ int FindUnusedEphemeralPort() { return -1; } -Status TNetworkAddressToSockaddr(const TNetworkAddress& address, - kudu::Sockaddr* sockaddr) { - DCHECK(IsResolvedAddress(address)); - KUDU_RETURN_IF_ERROR( - sockaddr->ParseString(TNetworkAddressToString(address), address.port), - "Failed to parse address to Kudu Sockaddr."); +Status NetworkAddressPBToSockaddr( + const NetworkAddressPB& address, bool use_uds, kudu::Sockaddr* sockaddr) { + if (use_uds) { + DCHECK(!address.uds_address().empty()); + KUDU_RETURN_IF_ERROR(sockaddr->ParseUnixDomainPath(address.uds_address()), + "Invalid UNIX domain socket address."); + } else { + DCHECK(IsResolvedAddress(address)); + KUDU_RETURN_IF_ERROR( + sockaddr->ParseString(NetworkAddressPBToString(address), address.port()), + "Failed to parse IP address to Kudu Sockaddr."); + } return Status::OK(); } - } diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index cdd697591..bffcde812 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -54,9 +54,19 @@ bool FindFirstNonLocalhost(const std::vector<std::string>& addresses, std::strin /// Returns OK if a hostname can be found, false otherwise. Status GetHostname(std::string* hostname) WARN_UNUSED_RESULT; +/// Generate UDS address. +string GetUDSAddress(const std::string& hostname, int port, const UniqueIdPB& backend_id, + const UdsAddressUniqueIdPB& uds_addr_unique_id); + /// Utility methods because Thrift/protobuf do not supply useful constructors TNetworkAddress MakeNetworkAddress(const std::string& hostname, int port); NetworkAddressPB MakeNetworkAddressPB(const std::string& hostname, int port); +NetworkAddressPB MakeNetworkAddressPB(const std::string& hostname, int port, + const UniqueIdPB& backend_id, const UdsAddressUniqueIdPB& uds_addr_unique_id); +/// This function generate unique ID if needed. +/// It's only used if backend ID is not available, like unit-test or Admissiond. +NetworkAddressPB MakeNetworkAddressPB(const std::string& hostname, int port, + const UdsAddressUniqueIdPB& uds_addr_unique_id); /// Utility method to parse the given string into a network address. /// Accepted format: "host:port" or "host". For the latter format the port is set to zero. @@ -83,10 +93,11 @@ TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address); /// Utility method to convert a TNetworkAddress to a NetworkAddressPB. NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& address); -/// Utility method to convert TNetworkAddress to Kudu sock addr. +/// Utility method to convert NetworkAddressPB to Kudu Sockaddr. +/// If use_uds is true, set Kudu Sockaddr as UDS address. /// Note that 'address' has to contain a resolved IP address. -Status TNetworkAddressToSockaddr(const TNetworkAddress& address, - kudu::Sockaddr* sockaddr); +Status NetworkAddressPBToSockaddr( + const NetworkAddressPB& address, bool use_uds, kudu::Sockaddr* sockaddr); /// Returns a ephemeral port that is currently unused. Returns -1 on an error or if /// a free ephemeral port can't be found after 100 tries. diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto index 99b862cdc..9c8558475 100644 --- a/common/protobuf/common.proto +++ b/common/protobuf/common.proto @@ -22,9 +22,18 @@ syntax="proto2"; package impala; // Refer to Types.thrift for documentation. +// UDS is limited to KRPC. message NetworkAddressPB { required string hostname = 1; required int32 port = 2; + optional string uds_address = 3; +} + +// Unique-id used for setting UDS address. +enum UdsAddressUniqueIdPB { + IP_ADDRESS = 0; + BACKEND_ID = 1; + NO_UNIQUE_ID = 2; } // Proto-serialized version of Impala's Status object. diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index f28d1adb4..ecd78d5e7 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -142,9 +142,14 @@ enum TPrefetchMode { // A TNetworkAddress is the standard host, port representation of a // network address. The hostname field must be resolvable to an IPv4 // address. +// uds_address is Unix Domain Socket address. UDS is limited to KRPC. +// We use the unique name in "Abstract Namespace" as UDS address in the form of +// "@impala-krpc:<unique-id>". This field is optional. It is only used for KRPC +// bind/listen/connect when FLAGS_rpc_use_unix_domain_socket is set as true. struct TNetworkAddress { 1: required string hostname 2: required i32 port + 3: optional string uds_address } // Wire format for UniqueId diff --git a/tests/custom_cluster/test_krpc_socket.py b/tests/custom_cluster/test_krpc_socket.py new file mode 100644 index 000000000..1845102cf --- /dev/null +++ b/tests/custom_cluster/test_krpc_socket.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import socket +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_cluster import DEFAULT_KRPC_PORT +from tests.util.shell_util import exec_process + + +class TestKrpcSocket(CustomClusterTestSuite): + """Test for different types of socket used by KRPC.""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--rpc_use_unix_domain_socket=false") + def test_krpc_use_tcp_socket(self, vector): + """Sanity test for KRPC running over TCP socket.""" + + # Run a query that will execute on multiple hosts. + self.execute_query_expect_success(self.client, + "select min(int_col) from functional_parquet.alltypes", vector=vector) + + # Find the listening TCP ports via netstat. + rc, netstat_output, stderr = exec_process("netstat -lnt") + assert rc == 0, "Error finding listening TCP ports\nstdout={0}\nstderr={1}".format( + netstat_output, stderr) + # Verify port number DEFAULT_KRPC_PORT, DEFAULT_KRPC_PORT+1, and DEFAULT_KRPC_PORT+2 + # are in the list. + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT)) == 1) + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT + 1)) == 1) + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT + 2)) == 1) + + # Find the listening Unix sockets via netstat. + rc, netstat_output, stderr = exec_process("netstat -lx") + assert rc == 0, "Error finding Unix sockets\nstdout={0}\nstderr={1}".format( + netstat_output, stderr) + # Verify that KRPC are not binding to Unix domain socket. + assert(netstat_output.count("@impala-krpc") == 0) + + # Check that we can connect on TCP port of KRPC. + sock = socket.socket() + sock.connect(("localhost", DEFAULT_KRPC_PORT)) + sock.close() + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--rpc_use_unix_domain_socket=true") + def test_krpc_use_unix_domain_socket(self, vector): + """Sanity test for KRPC running over Unix domain socket. + Use IP address as the unique Id for UDS address. + """ + + # Run a query that will execute on multiple hosts. + self.execute_query_expect_success(self.client, + "select min(int_col) from functional_parquet.alltypes", vector=vector) + + # Find the listening TCP ports via netstat. + rc, netstat_output, stderr = exec_process("netstat -lnt") + assert rc == 0, "Error finding listening TCP port\nstdout={0}\nstderr={1}".format( + netstat_output, stderr) + # Verify port number DEFAULT_KRPC_PORT, DEFAULT_KRPC_PORT+1, and DEFAULT_KRPC_PORT+2 + # are not in the list. + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT)) == 0) + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT + 1)) == 0) + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT + 2)) == 0) + + # Find the listening Unix sockets via netstat. + rc, netstat_output, stderr = exec_process("netstat -lx") + assert rc == 0, "Error finding Unix sockets\nstdout={0}\nstderr={1}".format( + netstat_output, stderr) + # Verify that KRPC are binding to Unix domain socket. + assert(netstat_output.count("@impala-krpc") == 3) + + # Not try to connect to Unix domain socket of KRPC since "Abstract Namespace" + # may be not supported by the Python socket. + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--rpc_use_unix_domain_socket=true --uds_address_unique_id=backend_id") + def test_krpc_uds_address_backend_id(self, vector): + """Sanity test for KRPC running over Unix domain socket. + Use BackendId as the unique Id for UDS address. + """ + + # Run a query that will execute on multiple hosts. + self.execute_query_expect_success(self.client, + "select min(int_col) from functional_parquet.alltypes", vector=vector) + + # Find the listening TCP ports via netstat. + rc, netstat_output, stderr = exec_process("netstat -lnt") + assert rc == 0, "Error finding listening TCP port\nstdout={0}\nstderr={1}".format( + netstat_output, stderr) + # Verify port number DEFAULT_KRPC_PORT, DEFAULT_KRPC_PORT+1, and DEFAULT_KRPC_PORT+2 + # are not in the list. + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT)) == 0) + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT + 1)) == 0) + assert(netstat_output.count(":{0}".format(DEFAULT_KRPC_PORT + 2)) == 0) + + # Find the listening Unix sockets via netstat. + rc, netstat_output, stderr = exec_process("netstat -lx") + assert rc == 0, "Error finding Unix sockets\nstdout={0}\nstderr={1}".format( + netstat_output, stderr) + # Verify that KRPC are binding to Unix domain socket. + assert(netstat_output.count("@impala-krpc") == 3) diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py index da797ec86..55b3800c2 100644 --- a/tests/custom_cluster/test_restart_services.py +++ b/tests/custom_cluster/test_restart_services.py @@ -365,6 +365,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--shutdown_grace_period_s={grace_period} \ + --rpc_use_unix_domain_socket=false \ --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S, hostname=socket.gethostname())) def test_shutdown_idle(self): @@ -425,6 +426,72 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite): shutdown_duration = time.time() - start_time assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10 + @SkipIfGCS.jira(reason="IMPALA-10562") + @SkipIfCOS.jira(reason="IMPALA-10562") + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--shutdown_grace_period_s={grace_period} \ + --rpc_use_unix_domain_socket=true \ + --hostname={hostname}".format(grace_period=IDLE_SHUTDOWN_GRACE_PERIOD_S, + hostname=socket.gethostname())) + def test_shutdown_idle_rpc_use_uds(self): + """Test that idle impalads shut down in a timely manner after the shutdown grace + period elapses.""" + impalad1 = psutil.Process(self.cluster.impalads[0].get_pid()) + impalad2 = psutil.Process(self.cluster.impalads[1].get_pid()) + impalad3 = psutil.Process(self.cluster.impalads[2].get_pid()) + + # Test that a failed shut down from a bogus host or port fails gracefully. + ex = self.execute_query_expect_failure(self.client, + ":shutdown('e6c00ca5cd67b567eb96c6ecfb26f05')") + assert "Could not find IPv4 address for:" in str(ex) + ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:100000')") + # IMPALA-11129: RPC return different error message for socket over Unix domain socket. + assert "Connection refused" in str(ex) + + # Test that pointing to the wrong thrift service (the HS2 port) fails gracefully-ish. + thrift_port = 21051 # HS2 port. + ex = self.execute_query_expect_failure(self.client, + ":shutdown('localhost:{0}')".format(thrift_port)) + assert ("failed with error 'RemoteShutdown() RPC failed") in str(ex) + assert ("This may be because the port specified is wrong.") in str(ex) + + # Test RPC error handling with debug action. + ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:27001')", + query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'}) + assert 'Rpc to 127.0.0.1:27001 failed with error \'Debug Action: ' \ + 'CRS_SHUTDOWN_RPC:FAIL' in str(ex) + + # Test remote shutdown. + LOG.info("Start remote shutdown {0}".format(time.time())) + self.execute_query_expect_success(self.client, ":shutdown('localhost:27001')", + query_options={}) + + # Remote shutdown does not require statestore. + self.cluster.statestored.kill() + self.cluster.statestored.wait_for_exit() + self.execute_query_expect_success(self.client, ":shutdown('localhost:27002')", + query_options={}) + + # Test local shutdown, which should succeed even with injected RPC error. + LOG.info("Start local shutdown {0}".format(time.time())) + self.execute_query_expect_success(self.client, + ":shutdown('{0}:27000')".format(socket.gethostname()), + query_options={'debug_action': 'CRS_SHUTDOWN_RPC:FAIL'}) + + # Make sure that the impala daemons exit after the shutdown grace period plus a 10 + # second margin of error. + start_time = time.time() + LOG.info("Waiting for impalads to exit {0}".format(start_time)) + impalad1.wait() + LOG.info("First impalad exited {0}".format(time.time())) + impalad2.wait() + LOG.info("Second impalad exited {0}".format(time.time())) + impalad3.wait() + LOG.info("Third impalad exited {0}".format(time.time())) + shutdown_duration = time.time() - start_time + assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10 + EXEC_SHUTDOWN_GRACE_PERIOD_S = 5 EXEC_SHUTDOWN_DEADLINE_S = 10 diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index 36ae0fd1d..defd560c2 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -596,6 +596,10 @@ class TestWebPage(ImpalaTestSuite): functional.alltypestiny join functional.alltypessmall c2" SVC_NAME = 'impala.DataStreamService' + def is_krpc_use_unix_domain_socket(): + rpcz = self.get_debug_page(self.RPCZ_URL) + return rpcz['rpc_use_unix_domain_socket'] + def get_per_conn_metrics(inbound): """Get inbound or outbound per-connection metrics""" rpcz = self.get_debug_page(self.RPCZ_URL) @@ -615,6 +619,8 @@ class TestWebPage(ImpalaTestSuite): return sorted(s['rpc_method_metrics'], key=lambda m: m['method_name']) assert False, 'Could not find metrics for %s' % svc_name + krpc_use_uds = is_krpc_use_unix_domain_socket() + svc_before = get_svc_metrics(SVC_NAME) inbound_before = get_per_conn_metrics(True) outbound_before = get_per_conn_metrics(False) @@ -624,23 +630,26 @@ class TestWebPage(ImpalaTestSuite): outbound_after = get_per_conn_metrics(False) assert svc_before != svc_after - assert inbound_before != inbound_after - assert outbound_before != outbound_after + if not krpc_use_uds: + assert inbound_before != inbound_after + assert outbound_before != outbound_after # Some connections should have metrics after executing query assert len(inbound_after) > 0 assert len(outbound_after) > 0 # Spot-check some fields, including socket stats. for conn in itertools.chain(inbound_after, outbound_after): - assert conn["remote_ip"] != "" + assert conn["remote_addr"] != "" assert conn["num_calls_in_flight"] >= 0 assert conn["num_calls_in_flight"] == len(conn["calls_in_flight"]) # Check rtt, which should be present in 'struct tcp_info' even in old kernels # like 2.6.32. - assert conn["socket_stats"]["rtt"] > 0, conn - # send_queue_bytes uses TIOCOUTQ, which is also present in 2.6.32 and even older - # kernels. - assert conn["socket_stats"]["send_queue_bytes"] >= 0, conn + # Skip these checking if using UDS. + if not krpc_use_uds: + assert conn["socket_stats"]["rtt"] > 0, conn + # send_queue_bytes uses TIOCOUTQ, which is also present in 2.6.32 and even older + # kernels. + assert conn["socket_stats"]["send_queue_bytes"] >= 0, conn @pytest.mark.execute_serially def test_admission_page(self): diff --git a/www/rpcz.tmpl b/www/rpcz.tmpl index f2e2e19e2..63a2c1b23 100644 --- a/www/rpcz.tmpl +++ b/www/rpcz.tmpl @@ -90,7 +90,7 @@ under the License. <table class="table table-bordered table-hover" id="per_conn_metrics"> <thead> <tr> - <th>Remote IP</th> + <th>Remote Address</th> <th># Calls in Flight</th> <th>Outbound Queue Size (count)</th> <th>Socket RTT (us)</th> @@ -110,7 +110,7 @@ under the License. <tbody> {{#per_conn_metrics}} <tr> - <td>{{remote_ip}}</td> + <td>{{remote_addr}}</td> <td>{{num_calls_in_flight}}</td> <td>{{outbound_queue_size}}</td> <td>{{socket_stats.rtt}}</td> @@ -134,7 +134,7 @@ under the License. <table class="table table-bordered table-hover" id="inbound_per_conn_metrics"> <thead> <tr> - <th>Remote IP</th> + <th>Remote Address</th> <th># Calls in Flight</th> <th>Socket RTT (us)</th> <th>Socket RTT Variance (us)</th> @@ -153,7 +153,7 @@ under the License. <tbody> {{#inbound_per_conn_metrics}} <tr> - <td>{{remote_ip}}</td> + <td>{{remote_addr}}</td> <td>{{num_calls_in_flight}}</td> <td>{{socket_stats.rtt}}</td> <td>{{socket_stats.rttvar}}</td> @@ -296,8 +296,9 @@ function update_krpc_services(json) { function update_krpc_conn_metrics_datatable(json) { var table = $('#per_conn_metrics').DataTable(); - var rows = $.map(json["per_conn_metrics"], function(row) { - return [[row["remote_ip"], row["num_calls_in_flight"], row["outbound_queue_size"], + if (!json["rpc_use_unix_domain_socket"]) { + var rows = $.map(json["per_conn_metrics"], function(row) { + return [[row["remote_addr"], row["num_calls_in_flight"], row["outbound_queue_size"], row["socket_stats"]["rtt"] ?? '', row["socket_stats"]["rttvar"] ?? '', row["socket_stats"]["snd_cwnd"] ?? '', @@ -310,15 +311,23 @@ function update_krpc_conn_metrics_datatable(json) { row["socket_stats"]["segs_in"] ?? '', row["socket_stats"]["send_queue_bytes"] ?? '', row["socket_stats"]["receive_queue_bytes"] ?? '']]; - }); + }); + } else { + var rows = $.map(json["per_conn_metrics"], function(row) { + return [[row["remote_addr"], row["num_calls_in_flight"], row["outbound_queue_size"], + 'N/A', 'N/A', 'N/A', 'N/A', 'N/A', 'N/A', + 'N/A', 'N/A', 'N/A', 'N/A', 'N/A', 'N/A']]; + }); + } table.clear().rows.add(rows).draw(); } function update_krpc_inbound_conn_metrics_datatable(json) { var table = $('#inbound_per_conn_metrics').DataTable(); - var rows = $.map(json["inbound_per_conn_metrics"], function(row) { - return [[row["remote_ip"], row["num_calls_in_flight"], + if (!json["rpc_use_unix_domain_socket"]) { + var rows = $.map(json["inbound_per_conn_metrics"], function(row) { + return [[row["remote_addr"], row["num_calls_in_flight"], row["socket_stats"]["rtt"] ?? '', row["socket_stats"]["rttvar"] ?? '', row["socket_stats"]["snd_cwnd"] ?? '', @@ -331,7 +340,14 @@ function update_krpc_inbound_conn_metrics_datatable(json) { row["socket_stats"]["segs_in"] ?? '', row["socket_stats"]["send_queue_bytes"] ?? '', row["socket_stats"]["receive_queue_bytes"] ?? '']]; - }); + }); + } else { + var rows = $.map(json["inbound_per_conn_metrics"], function(row) { + return [[row["remote_addr"], row["num_calls_in_flight"], + 'N/A', 'N/A', 'N/A', 'N/A', 'N/A', 'N/A', + 'N/A', 'N/A', 'N/A', 'N/A', 'N/A', 'N/A']]; + }); + } table.clear().rows.add(rows).draw(); }
