This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit be68ce81beeb708edfc0545695e72282506f3845 Author: Todd Lipcon <[email protected]> AuthorDate: Thu Apr 9 16:42:44 2020 -0700 client/tserver: add support for connecting over unix domain sockets This adds new experiental flags -rpc_listen_on_unix_domain_socket and -client_use_unix_domain_sockets. The former makes the RPC server bind to a unix socket and advertise this to the kudu master as part of the TS registration. The latter makes the client attempt to connect via a domain socket when it sees such a socket path advertised. Note that this makes one behavioral change even when those flags are not enabled: we now consider any tablet server with a loopback IP to be "local" (and thus a candidate for unix domain socket connection). This mostly affects the MiniCluster where tablet servers register using various IPs in the loopback range 127.0.0.0/8, and was necessary in order to test unix socket connections from the client. I perf tested by scanning an int32 column from a table with 800M rows and using 'perf stat -a -r10' to look at total CPU consumption across the tserver and system. There's a fair amount of variability here due to inconsistent scheduling to cores/numa nodes, but seems like the unix socket on average is 10% faster or so in terms of total cycles. TCP sockets: Performance counter stats for 'system wide' (10 runs): 148,367.78 msec cpu-clock # 87.755 CPUs utilized ( +- 4.82% ) 101,755 context-switches # 0.686 K/sec ( +- 9.03% ) 866 cpu-migrations # 0.006 K/sec ( +- 6.42% ) 21,440 page-faults # 0.145 K/sec ( +- 19.32% ) 43,847,792,445 cycles # 0.296 GHz ( +- 3.77% ) (1.01%) 50,668,281,554 instructions # 1.16 insn per cycle ( +- 1.80% ) (1.11%) 7,676,337,185 branches # 51.739 M/sec ( +- 4.61% ) (0.85%) 69,634,718 branch-misses # 0.91% of all branches ( +- 4.72% ) (0.84%) 1.6907 +- 0.0811 seconds time elapsed ( +- 4.80% ) Unix sockets: Performance counter stats for 'system wide' (10 runs): 136,877.86 msec cpu-clock # 87.638 CPUs utilized ( +- 2.67% ) 77,376 context-switches # 0.565 K/sec ( +- 14.16% ) 846 cpu-migrations # 0.006 K/sec ( +- 6.58% ) 23,430 page-faults # 0.171 K/sec ( +- 39.77% ) 39,106,012,185 cycles # 0.286 GHz ( +- 4.26% ) (0.99%) 48,957,283,894 instructions # 1.25 insn per cycle ( +- 2.24% ) (1.08%) 7,635,756,771 branches # 55.785 M/sec ( +- 3.54% ) (0.83%) 69,900,882 branch-misses # 0.92% of all branches ( +- 5.14% ) (0.82%) 1.5619 +- 0.0415 seconds time elapsed ( +- 2.66% ) Change-Id: I0c390b4209ac7e08cd45239c49499fb0b96405d0 Reviewed-on: http://gerrit.cloudera.org:8080/15701 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/client/client-internal.cc | 9 ++++++++- src/kudu/client/client-test.cc | 30 ++++++++++++++++++++++++++-- src/kudu/client/meta_cache.cc | 40 +++++++++++++++++++++++++++++++++++-- src/kudu/client/meta_cache.h | 7 +++++++ src/kudu/common/wire_protocol.cc | 2 ++ src/kudu/common/wire_protocol.proto | 5 +++++ src/kudu/master/catalog_manager.cc | 3 +++ src/kudu/master/master.proto | 5 +++++ src/kudu/rpc/acceptor_pool.cc | 15 +++++++++++--- src/kudu/server/rpc_server.cc | 12 ++++++++++- src/kudu/server/rpc_server.h | 4 ++++ src/kudu/server/server_base.cc | 17 ++++++++++++++++ src/kudu/tserver/heartbeater.cc | 8 ++++++++ src/kudu/util/net/sockaddr.cc | 3 +++ src/kudu/util/net/sockaddr.h | 12 +++++++++++ 15 files changed, 163 insertions(+), 9 deletions(-) diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc index 9243836..f7bf98c 100644 --- a/src/kudu/client/client-internal.cc +++ b/src/kudu/client/client-internal.cc @@ -443,7 +443,14 @@ Status KuduClient::Data::InitLocalHostNames() { } bool KuduClient::Data::IsLocalHostPort(const HostPort& hp) const { - return ContainsKey(local_host_names_, hp.host()); + if (ContainsKey(local_host_names_, hp.host())) { + return true; + } + + // It may be that HostPort is a numeric form (non-reversable) address like + // 127.0.1.1, etc. In that case we can still consider it local. + Sockaddr addr; + return addr.ParseFromNumericHostPort(hp).ok() && addr.IsAnyLocalAddress(); } bool KuduClient::Data::IsTabletServerLocal(const RemoteTabletServer& rts) const { diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index 6961ff3..4c57900 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -125,11 +125,13 @@ DECLARE_bool(allow_unsafe_replication_factor); DECLARE_bool(catalog_manager_support_live_row_count); DECLARE_bool(catalog_manager_support_on_disk_size); +DECLARE_bool(client_use_unix_domain_sockets); DECLARE_bool(fail_dns_resolution); DECLARE_bool(location_mapping_by_uuid); DECLARE_bool(log_inject_latency); DECLARE_bool(master_support_connect_to_master_rpc); DECLARE_bool(mock_table_metrics_for_testing); +DECLARE_bool(rpc_listen_on_unix_domain_socket); DECLARE_bool(rpc_trace_negotiation); DECLARE_bool(scanner_inject_service_unavailable_on_continue_scan); DECLARE_int32(flush_threshold_mb); @@ -156,13 +158,14 @@ DECLARE_string(user_acl); DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan"); METRIC_DECLARE_counter(block_manager_total_bytes_read); -METRIC_DECLARE_counter(rpcs_queue_overflow); METRIC_DECLARE_counter(location_mapping_cache_hits); METRIC_DECLARE_counter(location_mapping_cache_queries); +METRIC_DECLARE_counter(rpc_connections_accepted_unix_domain_socket); +METRIC_DECLARE_counter(rpcs_queue_overflow); METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetMasterRegistration); -METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTabletLocations); METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations); METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableSchema); +METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTabletLocations); METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerService_Scan); using base::subtle::Atomic32; @@ -6646,5 +6649,28 @@ TEST_F(ClientTest, TestProjectionPredicatesFuzz) { unordered_set<string>(rows.begin(), rows.end())) << rows; } +class ClientTestUnixSocket : public ClientTest { + public: + void SetUp() override { + FLAGS_rpc_listen_on_unix_domain_socket = true; + FLAGS_client_use_unix_domain_sockets = true; + ClientTest::SetUp(); + } +}; + +TEST_F(ClientTestUnixSocket, TestConnectViaUnixSocket) { + static constexpr int kNumRows = 100; + NO_FATALS(InsertTestRows(client_table_.get(), kNumRows)); + ASSERT_EQ(kNumRows, CountRowsFromClient(client_table_.get())); + + int total_unix_conns = 0; + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + auto counter = METRIC_rpc_connections_accepted_unix_domain_socket.Instantiate( + cluster_->mini_tablet_server(0)->server()->metric_entity()); + total_unix_conns += counter->value(); + } + ASSERT_EQ(1, total_unix_conns); +} + } // namespace client } // namespace kudu diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc index b005256..5fc227d 100644 --- a/src/kudu/client/meta_cache.cc +++ b/src/kudu/client/meta_cache.cc @@ -27,6 +27,7 @@ #include <utility> #include <vector> +#include <gflags/gflags.h> #include <glog/logging.h> #include <google/protobuf/repeated_field.h> // IWYU pragma: keep @@ -48,11 +49,13 @@ #include "kudu/rpc/rpc.h" #include "kudu/rpc/rpc_controller.h" #include "kudu/tserver/tserver_service.proxy.h" +#include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" #include "kudu/util/net/dns_resolver.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" #include "kudu/util/pb_util.h" +#include "kudu/util/scoped_cleanup.h" using kudu::consensus::RaftPeerPB; using kudu::master::ANY_REPLICA; @@ -71,6 +74,16 @@ using std::unique_ptr; using std::vector; using strings::Substitute; +// TODO(todd) before enabling by default, need to think about how this works with +// docker/k8s -- I think the abstract namespace is scoped to a given k8s pod. We +// probably need to have the client blacklist the socket if it attempts to use it +// and can't connect. +DEFINE_bool(client_use_unix_domain_sockets, false, + "Whether to try to connect to tablet servers using unix domain sockets. " + "This will only be attempted if the server has indicated that it is listening " + "on such a socket and the client is running on the same host."); +TAG_FLAG(client_use_unix_domain_sockets, experimental); + namespace kudu { namespace client { namespace internal { @@ -85,8 +98,7 @@ void RemoteTabletServer::DnsResolutionFinished(const HostPort& hp, KuduClient* client, const StatusCallback& user_callback, const Status &result_status) { - unique_ptr<vector<Sockaddr>> scoped_addrs(addrs); - + SCOPED_CLEANUP({ delete addrs; }); Status s = result_status; if (s.ok() && addrs->empty()) { @@ -129,6 +141,25 @@ void RemoteTabletServer::InitProxy(KuduClient* client, const StatusCallback& cb) } auto addrs = new vector<Sockaddr>(); + + if (FLAGS_client_use_unix_domain_sockets && unix_domain_socket_path_ && + client->data_->IsLocalHostPort(hp)) { + Sockaddr unix_socket; + Status parse_status = unix_socket.ParseUnixDomainPath(*unix_domain_socket_path_); + if (!parse_status.ok()) { + KLOG_EVERY_N_SECS(WARNING, 60) + << Substitute("Tablet server $0 ($1) reported an invalid UNIX domain socket path '$2'", + hp.ToString(), uuid_, *unix_domain_socket_path_); + // Fall through to normal TCP path. + } else { + VLOG(1) << Substitute("Will try to connect to UNIX socket $0 for local tablet server $1 ($2)", + unix_socket.ToString(), hp.ToString(), uuid_); + addrs->emplace_back(unix_socket); + this->DnsResolutionFinished(hp, addrs, client, cb, Status::OK()); + return; + } + } + client->data_->dns_resolver_->ResolveAddressesAsync( hp, addrs, [=](const Status& s) { this->DnsResolutionFinished(hp, addrs, client, cb, s); @@ -145,6 +176,11 @@ void RemoteTabletServer::Update(const master::TSInfoPB& pb) { rpc_hostports_.emplace_back(hostport_pb.host(), hostport_pb.port()); } location_ = pb.location(); + if (pb.has_unix_domain_socket_path()) { + unix_domain_socket_path_ = pb.unix_domain_socket_path(); + } else { + unix_domain_socket_path_ = boost::none; + } } const string& RemoteTabletServer::permanent_uuid() const { diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h index 2e9a216..f02305f 100644 --- a/src/kudu/client/meta_cache.h +++ b/src/kudu/client/meta_cache.h @@ -28,6 +28,7 @@ #include <utility> #include <vector> +#include <boost/optional/optional.hpp> #include <glog/logging.h> #include <gtest/gtest_prod.h> @@ -124,6 +125,12 @@ class RemoteTabletServer { std::string location_; std::vector<HostPort> rpc_hostports_; + + // The path on which this server is listening for unix domain socket connections. + // This should only be used in the case that it can be determined that the tablet + // server is local to the client. + boost::optional<std::string> unix_domain_socket_path_; + std::shared_ptr<tserver::TabletServerServiceProxy> proxy_; DISALLOW_COPY_AND_ASSIGN(RemoteTabletServer); diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc index c423a02..cdc5dde 100644 --- a/src/kudu/common/wire_protocol.cc +++ b/src/kudu/common/wire_protocol.cc @@ -199,6 +199,8 @@ HostPort HostPortFromPB(const HostPortPB& host_port_pb) { Status AddHostPortPBs(const vector<Sockaddr>& addrs, RepeatedPtrField<HostPortPB>* pbs) { for (const Sockaddr& addr : addrs) { + // Don't add unix domain sockets to the list of HostPorts. + if (!addr.is_ip()) continue; HostPortPB* pb = pbs->Add(); if (addr.IsWildcard()) { RETURN_NOT_OK(GetFQDN(pb->mutable_host())); diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto index 3b3bec2..3dce5a0 100644 --- a/src/kudu/common/wire_protocol.proto +++ b/src/kudu/common/wire_protocol.proto @@ -96,6 +96,11 @@ message ServerRegistrationPB { // Seconds since the epoch. optional int64 start_time = 5; + + // The path of a UNIX domain socket where the server is listening. + // An '@' prefix indicates the abstract namespace. May be missing + // if this feature is not enabled. + optional string unix_domain_socket_path = 6; } message ServerEntryPB { diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 0f98119..05645f8 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -4940,6 +4940,9 @@ Status CatalogManager::BuildLocationsForTablet( ServerRegistrationPB reg; ts_desc->GetRegistration(®); tsinfo_pb->mutable_rpc_addresses()->Swap(reg.mutable_rpc_addresses()); + if (reg.has_unix_domain_socket_path()) { + tsinfo_pb->set_unix_domain_socket_path(reg.unix_domain_socket_path()); + } if (ts_desc->location()) tsinfo_pb->set_location(*(ts_desc->location())); } else { // If we've never received a heartbeat from the tserver, we'll fall back diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto index bda76e2..9a7dbc6 100644 --- a/src/kudu/master/master.proto +++ b/src/kudu/master/master.proto @@ -431,6 +431,11 @@ message TSInfoPB { repeated HostPortPB rpc_addresses = 2; optional string location = 3; + + // The path of a UNIX domain socket where the server is listening. + // An '@' prefix indicates the abstract namespace. May be missing + // if this feature is not enabled. + optional string unix_domain_socket_path = 4; } // Selector to specify policy for listing tablet replicas in diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc index f466c59..84fbd18 100644 --- a/src/kudu/rpc/acceptor_pool.cc +++ b/src/kudu/rpc/acceptor_pool.cc @@ -54,6 +54,12 @@ METRIC_DEFINE_counter(server, rpc_connections_accepted, "Number of incoming TCP connections made to the RPC server", kudu::MetricLevel::kInfo); +METRIC_DEFINE_counter(server, rpc_connections_accepted_unix_domain_socket, + "RPC Connections Accepted via UNIX Domain Socket", + kudu::MetricUnit::kConnections, + "Number of incoming UNIX Domain Socket connections made to the RPC server", + kudu::MetricLevel::kInfo); + DEFINE_int32(rpc_acceptor_listen_backlog, 128, "Socket backlog parameter used when listening for RPC connections. " "This defines the maximum length to which the queue of pending " @@ -71,9 +77,12 @@ AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket, : messenger_(messenger), socket_(socket->Release()), bind_address_(bind_address), - rpc_connections_accepted_(METRIC_rpc_connections_accepted.Instantiate( - messenger->metric_entity())), - closing_(false) {} + closing_(false) { + auto& accept_metric = bind_address.is_ip() ? + METRIC_rpc_connections_accepted : + METRIC_rpc_connections_accepted_unix_domain_socket; + rpc_connections_accepted_ = accept_metric.Instantiate(messenger->metric_entity()); +} AcceptorPool::~AcceptorPool() { Shutdown(); diff --git a/src/kudu/server/rpc_server.cc b/src/kudu/server/rpc_server.cc index e1d8872..632a35e 100644 --- a/src/kudu/server/rpc_server.cc +++ b/src/kudu/server/rpc_server.cc @@ -121,7 +121,10 @@ Status RpcServer::Init(const shared_ptr<Messenger>& messenger) { RETURN_NOT_OK(ParseAddressList(options_.rpc_bind_addresses, options_.default_port, &rpc_bind_addresses_)); + for (const Sockaddr& addr : rpc_bind_addresses_) { + if (!addr.is_ip()) continue; + if (IsPrivilegedPort(addr.port())) { LOG(WARNING) << "May be unable to bind to privileged port for address " << addr.ToString(); @@ -171,6 +174,12 @@ Status RpcServer::RegisterService(unique_ptr<rpc::ServiceIf> service) { return Status::OK(); } +Status RpcServer::AddBindAddress(const Sockaddr& addr) { + CHECK_EQ(server_state_, INITIALIZED) << "must add bind addresses between Init() and Bind()"; + rpc_bind_addresses_.emplace_back(addr); + return Status::OK(); +} + Status RpcServer::Bind() { CHECK_EQ(server_state_, INITIALIZED); @@ -182,8 +191,9 @@ Status RpcServer::Bind() { RETURN_NOT_OK(messenger_->AddAcceptorPool( bind_addr, &pool)); - new_acceptor_pools.push_back(pool); + new_acceptor_pools.emplace_back(std::move(pool)); } + acceptor_pools_.swap(new_acceptor_pools); server_state_ = BOUND; diff --git a/src/kudu/server/rpc_server.h b/src/kudu/server/rpc_server.h index 06f462e..314b542 100644 --- a/src/kudu/server/rpc_server.h +++ b/src/kudu/server/rpc_server.h @@ -70,6 +70,10 @@ class RpcServer { } Status Init(const std::shared_ptr<rpc::Messenger>& messenger) WARN_UNUSED_RESULT; + + // Add an additional address to bind and accept connections on. + Status AddBindAddress(const Sockaddr& addr) WARN_UNUSED_RESULT; + // Services need to be registered after Init'ing, but before Start'ing. // The service's ownership will be given to a ServicePool. Status RegisterService(std::unique_ptr<rpc::ServiceIf> service) WARN_UNUSED_RESULT; diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc index d8a12bf..89bb5b8 100644 --- a/src/kudu/server/server_base.cc +++ b/src/kudu/server/server_base.cc @@ -151,6 +151,12 @@ DEFINE_string(rpc_encryption, "optional", TAG_FLAG(rpc_authentication, evolving); TAG_FLAG(rpc_encryption, evolving); +DEFINE_bool(rpc_listen_on_unix_domain_socket, false, + "Whether the RPC server should listen on a Unix domain socket. If enabled, " + "the RPC server will bind to a socket in the \"abstract namespace\" using " + "a name which uniquely identifies the server instance."); +TAG_FLAG(rpc_listen_on_unix_domain_socket, experimental); + DEFINE_string(rpc_tls_ciphers, kudu::security::SecurityDefaults::kDefaultTlsCiphers, "The cipher suite preferences to use for TLS-secured RPC connections. " @@ -526,6 +532,17 @@ Status ServerBase::Init() { }); RETURN_NOT_OK(rpc_server_->Init(messenger_)); + + if (FLAGS_rpc_listen_on_unix_domain_socket) { + VLOG(1) << "Enabling listening on unix domain socket."; + Sockaddr addr; + RETURN_NOT_OK_PREPEND(addr.ParseUnixDomainPath(Substitute("@kudu-$0", fs_manager_->uuid())), + "unable to parse provided UNIX socket path"); + RETURN_NOT_OK_PREPEND(rpc_server_->AddBindAddress(addr), + "unable to add configured UNIX socket path to list of bind addresses " + "for RPC server"); + } + RETURN_NOT_OK(rpc_server_->Bind()); RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging"); diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc index f3e6025..e3bf926 100644 --- a/src/kudu/tserver/heartbeater.cc +++ b/src/kudu/tserver/heartbeater.cc @@ -17,6 +17,7 @@ #include "kudu/tserver/heartbeater.h" +#include <algorithm> #include <atomic> #include <cstdint> #include <functional> @@ -337,6 +338,13 @@ Status Heartbeater::Thread::SetupRegistration(ServerRegistrationPB* reg) { RETURN_NOT_OK(CHECK_NOTNULL(server_->rpc_server())->GetAdvertisedAddresses(&addrs)); RETURN_NOT_OK_PREPEND(AddHostPortPBs(addrs, reg->mutable_rpc_addresses()), "Failed to add RPC addresses to registration"); + auto unix_socket_it = std::find_if(addrs.begin(), addrs.end(), + [](const Sockaddr& addr) { + return addr.is_unix(); + }); + if (unix_socket_it != addrs.end()) { + reg->set_unix_domain_socket_path(unix_socket_it->UnixDomainPath()); + } addrs.clear(); if (server_->web_server()) { diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc index b680294..f5d616c 100644 --- a/src/kudu/util/net/sockaddr.cc +++ b/src/kudu/util/net/sockaddr.cc @@ -89,7 +89,10 @@ Sockaddr::Sockaddr(const struct sockaddr_in& addr) : Status Sockaddr::ParseString(const string& s, uint16_t default_port) { HostPort hp; RETURN_NOT_OK(hp.ParseString(s, default_port)); + return ParseFromNumericHostPort(hp); +} +Status Sockaddr::ParseFromNumericHostPort(const HostPort& hp) { struct in_addr addr; if (inet_pton(AF_INET, hp.host().c_str(), &addr) != 1) { return Status::InvalidArgument("Invalid IP address", hp.host()); diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h index cd50e50..e473c8b 100644 --- a/src/kudu/util/net/sockaddr.h +++ b/src/kudu/util/net/sockaddr.h @@ -32,6 +32,8 @@ namespace kudu { +class HostPort; + /// Represents a sockaddr. /// /// Typically this wraps a sockaddr_in, but in the future will be extended to support @@ -86,6 +88,12 @@ class Sockaddr { // Returns a bad Status if the input is malformed. Status ParseString(const std::string& s, uint16_t default_port); + // Parse a HostPort instance which must contain a hostname in numeric notation + // as described above. + // + // Note that this function will not handle resolving hostnames. + Status ParseFromNumericHostPort(const HostPort& hp); + // Parse a UNIX domain path, storing the result in this Sockaddr object. // A leading '@' indicates the address should be in the UNIX domain "abstract // namespace" (see man unix(7)). @@ -143,6 +151,10 @@ class Sockaddr { return family() == AF_INET; } + bool is_unix() const { + return family() == AF_UNIX; + } + // Returns the stringified address in '1.2.3.4:<port>' format. std::string ToString() const;
