http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/inbound_call.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/inbound_call.h b/be/src/kudu/rpc/inbound_call.h new file mode 100644 index 0000000..6bed18f --- /dev/null +++ b/be/src/kudu/rpc/inbound_call.h @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_INBOUND_CALL_H +#define KUDU_RPC_INBOUND_CALL_H + +#include <glog/logging.h> +#include <string> +#include <vector> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/remote_method.h" +#include "kudu/rpc/service_if.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/faststring.h" +#include "kudu/util/monotime.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace kudu { + +class Histogram; +class Trace; + +namespace rpc { + +class Connection; +class DumpRunningRpcsRequestPB; +class RemoteUser; +class RpcCallInProgressPB; +struct RpcMethodInfo; +class RpcSidecar; + +struct InboundCallTiming { + MonoTime time_received; // Time the call was first accepted. + MonoTime time_handled; // Time the call handler was kicked off. + MonoTime time_completed; // Time the call handler completed. + + MonoDelta TotalDuration() const { + return time_completed - time_received; + } +}; + +// Inbound call on server +class InboundCall { + public: + explicit InboundCall(Connection* conn); + ~InboundCall(); + + // Parse an inbound call message. + // + // This only deserializes the call header, populating the 'header_' and + // 'serialized_request_' member variables. The actual call parameter is + // not deserialized, as this may be CPU-expensive, and this is called + // from the reactor thread. + Status ParseFrom(gscoped_ptr<InboundTransfer> transfer); + + // Return the serialized request parameter protobuf. + const Slice& serialized_request() const { + DCHECK(transfer_) << "Transfer discarded before parameter parsing"; + return serialized_request_; + } + + const RemoteMethod& remote_method() const { + return remote_method_; + } + + const int32_t call_id() const { + return header_.call_id(); + } + + // Serializes 'response' into the InboundCall's internal buffer, and marks + // the call as a success. Enqueues the response back to the connection + // that made the call. + // + // This method deletes the InboundCall object, so no further calls may be + // made after this one. + void RespondSuccess(const google::protobuf::MessageLite& response); + + // Serializes a failure response into the internal buffer, marking the + // call as a failure. Enqueues the response back to the connection that + // made the call. + // + // This method deletes the InboundCall object, so no further calls may be + // made after this one. + void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code, + const Status &status); + + void RespondUnsupportedFeature(const std::vector<uint32_t>& unsupported_features); + + void RespondApplicationError(int error_ext_id, const std::string& message, + const google::protobuf::MessageLite& app_error_pb); + + // Convert an application error extension to an ErrorStatusPB. + // These ErrorStatusPB objects are what are returned in application error responses. + static void ApplicationErrorToPB(int error_ext_id, const std::string& message, + const google::protobuf::MessageLite& app_error_pb, + ErrorStatusPB* err); + + // Serialize the response packet for the finished call. + // The resulting slices refer to memory in this object. + void SerializeResponseTo(std::vector<Slice>* slices) const; + + // See RpcContext::AddRpcSidecar() + Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx); + + std::string ToString() const; + + void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp); + + const RemoteUser& remote_user() const; + + const Sockaddr& remote_address() const; + + const scoped_refptr<Connection>& connection() const; + + Trace* trace(); + + const InboundCallTiming& timing() const { + return timing_; + } + + const RequestHeader& header() const { + return header_; + } + + // Associate this call with a particular method that will be invoked + // by the service. + void set_method_info(scoped_refptr<RpcMethodInfo> info) { + method_info_ = std::move(info); + } + + // Return the method associated with this call. This is set just before + // the call is enqueued onto the service queue, and therefore may be + // 'nullptr' for much of the lifecycle of a call. + RpcMethodInfo* method_info() { + return method_info_.get(); + } + + // When this InboundCall was received (instantiated). + // Should only be called once on a given instance. + // Not thread-safe. Should only be called by the current "owner" thread. + void RecordCallReceived(); + + // When RPC call Handle() was called on the server side. + // Updates the Histogram with time elapsed since the call was received, + // and should only be called once on a given instance. + // Not thread-safe. Should only be called by the current "owner" thread. + void RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time); + + // Return true if the deadline set by the client has already elapsed. + // In this case, the server may stop processing the call, since the + // call response will be ignored anyway. + bool ClientTimedOut() const; + + // Return an upper bound on the client timeout deadline. This does not + // account for transmission delays between the client and the server. + // If the client did not specify a deadline, returns MonoTime::Max(). + MonoTime GetClientDeadline() const; + + // Return the time when this call was received. + MonoTime GetTimeReceived() const; + + // Returns the set of application-specific feature flags required to service + // the RPC. + std::vector<uint32_t> GetRequiredFeatures() const; + + // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1, + // returns an error. + Status GetInboundSidecar(int idx, Slice* sidecar) const; + + // Releases the buffer that contains the request + sidecar data. It is an error to + // access sidecars or serialized_request() after this method is called. + void DiscardTransfer(); + + // Returns the size of the transfer buffer that backs this call. If the transfer does + // not exist (e.g. GetTransferSize() is called after DiscardTransfer()), returns 0. + size_t GetTransferSize(); + + private: + friend class RpczStore; + + // Serialize and queue the response. + void Respond(const google::protobuf::MessageLite& response, + bool is_success); + + // Serialize a response message for either success or failure. If it is a success, + // 'response' should be the user-defined response type for the call. If it is a + // failure, 'response' should be an ErrorStatusPB instance. + void SerializeResponseBuffer(const google::protobuf::MessageLite& response, + bool is_success); + + // When RPC call Handle() completed execution on the server side. + // Updates the Histogram with time elapsed since the call was started, + // and should only be called once on a given instance. + // Not thread-safe. Should only be called by the current "owner" thread. + void RecordHandlingCompleted(); + + // The connection on which this inbound call arrived. + scoped_refptr<Connection> conn_; + + // The header of the incoming call. Set by ParseFrom() + RequestHeader header_; + + // The serialized bytes of the request param protobuf. Set by ParseFrom(). + // This references memory held by 'transfer_'. + Slice serialized_request_; + + // The transfer that produced the call. + // This is kept around because it retains the memory referred to + // by 'serialized_request_' above. + gscoped_ptr<InboundTransfer> transfer_; + + // The buffers for serialized response. Set by SerializeResponseBuffer(). + faststring response_hdr_buf_; + faststring response_msg_buf_; + + // Vector of additional sidecars that are tacked on to the call's response + // after serialization of the protobuf. See rpc/rpc_sidecar.h for more info. + std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_; + + // Inbound sidecars from the request. The slices are views onto transfer_. There are as + // many slices as header_.sidecar_offsets_size(). + Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars]; + + // The trace buffer. + scoped_refptr<Trace> trace_; + + // Timing information related to this RPC call. + InboundCallTiming timing_; + + // Proto service this calls belongs to. Used for routing. + // This field is filled in when the inbound request header is parsed. + RemoteMethod remote_method_; + + // After the method has been looked up within the service, this is filled in + // to point to the information about this method. Acts as a pointer back to + // per-method info such as tracing. + scoped_refptr<RpcMethodInfo> method_info_; + + DISALLOW_COPY_AND_ASSIGN(InboundCall); +}; + +} // namespace rpc +} // namespace kudu + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/messenger.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc new file mode 100644 index 0000000..28fea55 --- /dev/null +++ b/be/src/kudu/rpc/messenger.cc @@ -0,0 +1,488 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/messenger.h" + +#include <arpa/inet.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <list> +#include <mutex> +#include <set> +#include <string> + +#include <boost/algorithm/string/predicate.hpp> +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/acceptor_pool.h" +#include "kudu/rpc/connection.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/reactor.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/rpc_service.h" +#include "kudu/rpc/rpcz_store.h" +#include "kudu/rpc/sasl_common.h" +#include "kudu/rpc/server_negotiation.h" +#include "kudu/rpc/transfer.h" +#include "kudu/security/tls_context.h" +#include "kudu/security/token_verifier.h" +#include "kudu/util/errno.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/flag_validators.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/status.h" +#include "kudu/util/threadpool.h" +#include "kudu/util/trace.h" + +using std::string; +using std::shared_ptr; +using std::make_shared; +using strings::Substitute; + +DEFINE_string(rpc_authentication, "optional", + "Whether to require RPC connections to authenticate. Must be one " + "of 'disabled', 'optional', or 'required'. If 'optional', " + "authentication will be used when the remote end supports it. If " + "'required', connections which are not able to authenticate " + "(because the remote end lacks support) are rejected. Secure " + "clusters should use 'required'."); +DEFINE_string(rpc_encryption, "optional", + "Whether to require RPC connections to be encrypted. Must be one " + "of 'disabled', 'optional', or 'required'. If 'optional', " + "encryption will be used when the remote end supports it. If " + "'required', connections which are not able to use encryption " + "(because the remote end lacks support) are rejected. If 'disabled', " + "encryption will not be used, and RPC authentication " + "(--rpc_authentication) must also be disabled as well. " + "Secure clusters should use 'required'."); +TAG_FLAG(rpc_authentication, evolving); +TAG_FLAG(rpc_encryption, evolving); + +DEFINE_string(rpc_certificate_file, "", + "Path to a PEM encoded X509 certificate to use for securing RPC " + "connections with SSL/TLS. If set, '--rpc_private_key_file' and " + "'--rpc_ca_certificate_file' must be set as well."); +DEFINE_string(rpc_private_key_file, "", + "Path to a PEM encoded private key paired with the certificate " + "from '--rpc_certificate_file'"); +DEFINE_string(rpc_ca_certificate_file, "", + "Path to the PEM encoded X509 certificate of the trusted external " + "certificate authority. The provided certificate should be the root " + "issuer of the certificate passed in '--rpc_certificate_file'."); + +// Setting TLS certs and keys via CLI flags is only necessary for external +// PKI-based security, which is not yet production ready. Instead, see +// internal PKI (ipki) and Kerberos-based authentication. +TAG_FLAG(rpc_certificate_file, experimental); +TAG_FLAG(rpc_private_key_file, experimental); +TAG_FLAG(rpc_ca_certificate_file, experimental); + +DEFINE_int32(rpc_default_keepalive_time_ms, 65000, + "If an RPC connection from a client is idle for this amount of time, the server " + "will disconnect the client."); +TAG_FLAG(rpc_default_keepalive_time_ms, advanced); + +DECLARE_string(keytab_file); + +namespace kudu { +namespace rpc { + +class Messenger; +class ServerBuilder; + +template <typename T> +static Status ParseTriState(const char* flag_name, const string& flag_value, T* tri_state) { + if (boost::iequals(flag_value, "required")) { + *tri_state = T::REQUIRED; + } else if (boost::iequals(flag_value, "optional")) { + *tri_state = T::OPTIONAL; + } else if (boost::iequals(flag_value, "disabled")) { + *tri_state = T::DISABLED; + } else { + return Status::InvalidArgument(Substitute( + "$0 flag must be one of 'required', 'optional', or 'disabled'", + flag_name)); + } + return Status::OK(); +} + +static bool ValidateRpcAuthentication(const char* flag_name, const string& flag_value) { + RpcAuthentication result; + Status s = ParseTriState(flag_name, flag_value, &result); + if (!s.ok()) { + LOG(ERROR) << s.message().ToString(); + return false; + } + return true; +} +DEFINE_validator(rpc_authentication, &ValidateRpcAuthentication); + +static bool ValidateRpcEncryption(const char* flag_name, const string& flag_value) { + RpcEncryption result; + Status s = ParseTriState(flag_name, flag_value, &result); + if (!s.ok()) { + LOG(ERROR) << s.message().ToString(); + return false; + } + return true; +} +DEFINE_validator(rpc_encryption, &ValidateRpcEncryption); + +static bool ValidateRpcAuthnFlags() { + RpcAuthentication authentication; + CHECK_OK(ParseTriState("--rpc_authentication", FLAGS_rpc_authentication, &authentication)); + + RpcEncryption encryption; + CHECK_OK(ParseTriState("--rpc_encryption", FLAGS_rpc_encryption, &encryption)); + + if (encryption == RpcEncryption::DISABLED && authentication != RpcAuthentication::DISABLED) { + LOG(ERROR) << "RPC authentication (--rpc_authentication) must be disabled " + "if RPC encryption (--rpc_encryption) is disabled"; + return false; + } + + const bool has_keytab = !FLAGS_keytab_file.empty(); + const bool has_cert = !FLAGS_rpc_certificate_file.empty(); + if (authentication == RpcAuthentication::REQUIRED && !has_keytab && !has_cert) { + LOG(ERROR) << "RPC authentication (--rpc_authentication) may not be " + "required unless Kerberos (--keytab_file) or external PKI " + "(--rpc_certificate_file et al) are configured"; + return false; + } + + return true; +} +GROUP_FLAG_VALIDATOR(rpc_authn_flags, ValidateRpcAuthnFlags); + +static bool ValidateExternalPkiFlags() { + bool has_cert = !FLAGS_rpc_certificate_file.empty(); + bool has_key = !FLAGS_rpc_private_key_file.empty(); + bool has_ca = !FLAGS_rpc_ca_certificate_file.empty(); + + if (has_cert != has_key || has_cert != has_ca) { + LOG(ERROR) << "--rpc_certificate_file, --rpc_private_key_file, and " + "--rpc_ca_certificate_file flags must be set as a group; " + "i.e. either set all or none of them."; + return false; + } + + return true; +} +GROUP_FLAG_VALIDATOR(external_pki_flags, ValidateExternalPkiFlags); + +MessengerBuilder::MessengerBuilder(std::string name) + : name_(std::move(name)), + connection_keepalive_time_( + MonoDelta::FromMilliseconds(FLAGS_rpc_default_keepalive_time_ms)), + num_reactors_(4), + min_negotiation_threads_(0), + max_negotiation_threads_(4), + coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)), + enable_inbound_tls_(false) { +} + +MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(const MonoDelta &keepalive) { + connection_keepalive_time_ = keepalive; + return *this; +} + +MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors) { + num_reactors_ = num_reactors; + return *this; +} + +MessengerBuilder& MessengerBuilder::set_min_negotiation_threads(int min_negotiation_threads) { + min_negotiation_threads_ = min_negotiation_threads; + return *this; +} + +MessengerBuilder& MessengerBuilder::set_max_negotiation_threads(int max_negotiation_threads) { + max_negotiation_threads_ = max_negotiation_threads; + return *this; +} + +MessengerBuilder& MessengerBuilder::set_coarse_timer_granularity(const MonoDelta &granularity) { + coarse_timer_granularity_ = granularity; + return *this; +} + +MessengerBuilder &MessengerBuilder::set_metric_entity( + const scoped_refptr<MetricEntity>& metric_entity) { + metric_entity_ = metric_entity; + return *this; +} + +MessengerBuilder& MessengerBuilder::enable_inbound_tls() { + enable_inbound_tls_ = true; + return *this; +} + +Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) { + RETURN_NOT_OK(SaslInit()); // Initialize SASL library before we start making requests + + Messenger* new_msgr(new Messenger(*this)); + + auto cleanup = MakeScopedCleanup([&] () { + new_msgr->AllExternalReferencesDropped(); + }); + + RETURN_NOT_OK(ParseTriState("--rpc_authentication", + FLAGS_rpc_authentication, + &new_msgr->authentication_)); + + RETURN_NOT_OK(ParseTriState("--rpc_encryption", + FLAGS_rpc_encryption, + &new_msgr->encryption_)); + + RETURN_NOT_OK(new_msgr->Init()); + if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) { + auto* tls_context = new_msgr->mutable_tls_context(); + + if (!FLAGS_rpc_certificate_file.empty()) { + CHECK(!FLAGS_rpc_private_key_file.empty()); + CHECK(!FLAGS_rpc_ca_certificate_file.empty()); + // TODO(KUDU-1920): should we try and enforce that the server + // is in the subject or alt names of the cert? + RETURN_NOT_OK(tls_context->LoadCertificateAuthority(FLAGS_rpc_ca_certificate_file)); + RETURN_NOT_OK(tls_context->LoadCertificateAndKey(FLAGS_rpc_certificate_file, + FLAGS_rpc_private_key_file)); + } else { + RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey()); + } + } + + // See docs on Messenger::retain_self_ for info about this odd hack. + cleanup.cancel(); + *msgr = shared_ptr<Messenger>(new_msgr, std::mem_fun(&Messenger::AllExternalReferencesDropped)); + return Status::OK(); +} + +// See comment on Messenger::retain_self_ member. +void Messenger::AllExternalReferencesDropped() { + Shutdown(); + CHECK(retain_self_.get()); + // If we have no more external references, then we no longer + // need to retain ourself. We'll destruct as soon as all our + // internal-facing references are dropped (ie those from reactor + // threads). + retain_self_.reset(); +} + +void Messenger::Shutdown() { + // Since we're shutting down, it's OK to block. + ThreadRestrictions::ScopedAllowWait allow_wait; + + std::lock_guard<percpu_rwlock> guard(lock_); + if (closing_) { + return; + } + VLOG(1) << "shutting down messenger " << name_; + closing_ = true; + + DCHECK(rpc_services_.empty()) << "Unregister RPC services before shutting down Messenger"; + rpc_services_.clear(); + + for (const shared_ptr<AcceptorPool>& acceptor_pool : acceptor_pools_) { + acceptor_pool->Shutdown(); + } + acceptor_pools_.clear(); + + // Need to shut down negotiation pool before the reactors, since the + // reactors close the Connection sockets, and may race against the negotiation + // threads' blocking reads & writes. + negotiation_pool_->Shutdown(); + + for (Reactor* reactor : reactors_) { + reactor->Shutdown(); + } + tls_context_.reset(); +} + +Status Messenger::AddAcceptorPool(const Sockaddr &accept_addr, + shared_ptr<AcceptorPool>* pool) { + // Before listening, if we expect to require Kerberos, we want to verify + // that everything is set up correctly. This way we'll generate errors on + // startup rather than later on when we first receive a client connection. + if (!FLAGS_keytab_file.empty()) { + RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(), + "GSSAPI/Kerberos not properly configured"); + } + + Socket sock; + RETURN_NOT_OK(sock.Init(0)); + RETURN_NOT_OK(sock.SetReuseAddr(true)); + RETURN_NOT_OK(sock.Bind(accept_addr)); + Sockaddr remote; + RETURN_NOT_OK(sock.GetSocketAddress(&remote)); + auto acceptor_pool(make_shared<AcceptorPool>(this, &sock, remote)); + + std::lock_guard<percpu_rwlock> guard(lock_); + acceptor_pools_.push_back(acceptor_pool); + pool->swap(acceptor_pool); + return Status::OK(); +} + +// Register a new RpcService to handle inbound requests. +Status Messenger::RegisterService(const string& service_name, + const scoped_refptr<RpcService>& service) { + DCHECK(service); + std::lock_guard<percpu_rwlock> guard(lock_); + if (InsertIfNotPresent(&rpc_services_, service_name, service)) { + return Status::OK(); + } else { + return Status::AlreadyPresent("This service is already present"); + } +} + +Status Messenger::UnregisterAllServices() { + std::lock_guard<percpu_rwlock> guard(lock_); + rpc_services_.clear(); + return Status::OK(); +} + +// Unregister an RpcService. +Status Messenger::UnregisterService(const string& service_name) { + std::lock_guard<percpu_rwlock> guard(lock_); + if (rpc_services_.erase(service_name)) { + return Status::OK(); + } else { + return Status::ServiceUnavailable(Substitute("service $0 not registered on $1", + service_name, name_)); + } +} + +void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) { + Reactor *reactor = RemoteToReactor(call->conn_id().remote()); + reactor->QueueOutboundCall(call); +} + +void Messenger::QueueInboundCall(gscoped_ptr<InboundCall> call) { + shared_lock<rw_spinlock> guard(lock_.get_lock()); + scoped_refptr<RpcService>* service = FindOrNull(rpc_services_, + call->remote_method().service_name()); + if (PREDICT_FALSE(!service)) { + Status s = Status::ServiceUnavailable(Substitute("service $0 not registered on $1", + call->remote_method().service_name(), name_)); + LOG(INFO) << s.ToString(); + call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, s); + return; + } + + call->set_method_info((*service)->LookupMethod(call->remote_method())); + + // The RpcService will respond to the client on success or failure. + WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call"); +} + +void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) { + Reactor *reactor = RemoteToReactor(remote); + reactor->RegisterInboundSocket(new_socket, remote); +} + +Messenger::Messenger(const MessengerBuilder &bld) + : name_(bld.name_), + closing_(false), + authentication_(RpcAuthentication::REQUIRED), + encryption_(RpcEncryption::REQUIRED), + tls_context_(new security::TlsContext()), + token_verifier_(new security::TokenVerifier()), + rpcz_store_(new RpczStore()), + metric_entity_(bld.metric_entity_), + retain_self_(this) { + for (int i = 0; i < bld.num_reactors_; i++) { + reactors_.push_back(new Reactor(retain_self_, i, bld)); + } + CHECK_OK(ThreadPoolBuilder("negotiator") + .set_min_threads(bld.min_negotiation_threads_) + .set_max_threads(bld.max_negotiation_threads_) + .Build(&negotiation_pool_)); +} + +Messenger::~Messenger() { + std::lock_guard<percpu_rwlock> guard(lock_); + CHECK(closing_) << "Should have already shut down"; + STLDeleteElements(&reactors_); +} + +Reactor* Messenger::RemoteToReactor(const Sockaddr &remote) { + uint32_t hashCode = remote.HashCode(); + int reactor_idx = hashCode % reactors_.size(); + // This is just a static partitioning; we could get a lot + // fancier with assigning Sockaddrs to Reactors. + return reactors_[reactor_idx]; +} + +Status Messenger::Init() { + RETURN_NOT_OK(tls_context_->Init()); + for (Reactor* r : reactors_) { + RETURN_NOT_OK(r->Init()); + } + + return Status::OK(); +} + +Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, + DumpRunningRpcsResponsePB* resp) { + shared_lock<rw_spinlock> guard(lock_.get_lock()); + for (Reactor* reactor : reactors_) { + RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp)); + } + return Status::OK(); +} + +void Messenger::ScheduleOnReactor(const boost::function<void(const Status&)>& func, + MonoDelta when) { + DCHECK(!reactors_.empty()); + + // If we're already running on a reactor thread, reuse it. + Reactor* chosen = nullptr; + for (Reactor* r : reactors_) { + if (r->IsCurrentThread()) { + chosen = r; + } + } + if (chosen == nullptr) { + // Not running on a reactor thread, pick one at random. + chosen = reactors_[rand() % reactors_.size()]; + } + + DelayedTask* task = new DelayedTask(func, when); + chosen->ScheduleReactorTask(task); +} + +const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_name) const { + std::lock_guard<percpu_rwlock> guard(lock_); + scoped_refptr<RpcService> service; + if (FindCopy(rpc_services_, service_name, &service)) { + return service; + } else { + return scoped_refptr<RpcService>(nullptr); + } +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/messenger.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h new file mode 100644 index 0000000..1ba76a7 --- /dev/null +++ b/be/src/kudu/rpc/messenger.h @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_MESSENGER_H +#define KUDU_RPC_MESSENGER_H + +#include <stdint.h> + +#include <list> +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> + +#include <boost/optional.hpp> +#include <gtest/gtest_prod.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/response_callback.h" +#include "kudu/security/token.pb.h" +#include "kudu/util/locks.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Socket; +class ThreadPool; + +namespace security { +class TlsContext; +class TokenVerifier; +} + +namespace rpc { + +class AcceptorPool; +class DumpRunningRpcsRequestPB; +class DumpRunningRpcsResponsePB; +class InboundCall; +class Messenger; +class OutboundCall; +class Reactor; +class ReactorThread; +class RpcService; +class RpczStore; + +struct AcceptorPoolInfo { + public: + explicit AcceptorPoolInfo(Sockaddr bind_address) + : bind_address_(std::move(bind_address)) {} + + Sockaddr bind_address() const { + return bind_address_; + } + + private: + Sockaddr bind_address_; +}; + +// Authentication configuration for RPC connections. +enum class RpcAuthentication { + DISABLED, + OPTIONAL, + REQUIRED, +}; + +// Encryption configuration for RPC connections. +enum class RpcEncryption { + DISABLED, + OPTIONAL, + REQUIRED, +}; + +// Used to construct a Messenger. +class MessengerBuilder { + public: + friend class Messenger; + friend class ReactorThread; + + explicit MessengerBuilder(std::string name); + + // Set the length of time we will keep a TCP connection will alive with no traffic. + MessengerBuilder &set_connection_keepalive_time(const MonoDelta &keepalive); + + // Set the number of reactor threads that will be used for sending and + // receiving. + MessengerBuilder &set_num_reactors(int num_reactors); + + // Set the minimum number of connection-negotiation threads that will be used + // to handle the blocking connection-negotiation step. + MessengerBuilder &set_min_negotiation_threads(int min_negotiation_threads); + + // Set the maximum number of connection-negotiation threads that will be used + // to handle the blocking connection-negotiation step. + MessengerBuilder &set_max_negotiation_threads(int max_negotiation_threads); + + // Set the granularity with which connections are checked for keepalive. + MessengerBuilder &set_coarse_timer_granularity(const MonoDelta &granularity); + + // Set metric entity for use by RPC systems. + MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity); + + // Configure the messenger to enable TLS encryption on inbound connections. + MessengerBuilder& enable_inbound_tls(); + + Status Build(std::shared_ptr<Messenger> *msgr); + + private: + const std::string name_; + MonoDelta connection_keepalive_time_; + int num_reactors_; + int min_negotiation_threads_; + int max_negotiation_threads_; + MonoDelta coarse_timer_granularity_; + scoped_refptr<MetricEntity> metric_entity_; + bool enable_inbound_tls_; +}; + +// A Messenger is a container for the reactor threads which run event loops +// for the RPC services. If the process is a server, a Messenger can also have +// one or more attached AcceptorPools which accept RPC connections. In this case, +// calls received over the connection are enqueued into the messenger's service_queue +// for processing by a ServicePool. +// +// Users do not typically interact with the Messenger directly except to create +// one as a singleton, and then make calls using Proxy objects. +// +// See rpc-test.cc and rpc-bench.cc for example usages. +class Messenger { + public: + friend class MessengerBuilder; + friend class Proxy; + friend class Reactor; + typedef std::vector<std::shared_ptr<AcceptorPool> > acceptor_vec_t; + typedef std::unordered_map<std::string, scoped_refptr<RpcService> > RpcServicesMap; + + static const uint64_t UNKNOWN_CALL_ID = 0; + + ~Messenger(); + + // Stop all communication and prevent further use. + // It's not required to call this -- dropping the shared_ptr provided + // from MessengerBuilder::Build will automatically call this method. + void Shutdown(); + + // Add a new acceptor pool listening to the given accept address. + // You can create any number of acceptor pools you want, including none. + // + // The created pool is returned in *pool. The Messenger also retains + // a reference to the pool, so the caller may safely drop this reference + // and the pool will remain live. + // + // NOTE: the returned pool is not initially started. You must call + // pool->Start(...) to begin accepting connections. + // + // If Kerberos is enabled, this also runs a pre-flight check that makes + // sure the environment is appropriately configured to authenticate + // clients via Kerberos. If not, this returns a RuntimeError. + Status AddAcceptorPool(const Sockaddr &accept_addr, + std::shared_ptr<AcceptorPool>* pool); + + // Register a new RpcService to handle inbound requests. + Status RegisterService(const std::string& service_name, + const scoped_refptr<RpcService>& service); + + // Unregister currently-registered RpcService. + Status UnregisterService(const std::string& service_name); + + Status UnregisterAllServices(); + + // Queue a call for transmission. This will pick the appropriate reactor, + // and enqueue a task on that reactor to assign and send the call. + void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call); + + // Enqueue a call for processing on the server. + void QueueInboundCall(gscoped_ptr<InboundCall> call); + + // Take ownership of the socket via Socket::Release + void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote); + + // Dump the current RPCs into the given protobuf. + Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, + DumpRunningRpcsResponsePB* resp); + + // Run 'func' on a reactor thread after 'when' time elapses. + // + // The status argument conveys whether 'func' was run correctly (i.e. + // after the elapsed time) or not. + void ScheduleOnReactor(const boost::function<void(const Status&)>& func, + MonoDelta when); + + const security::TlsContext& tls_context() const { return *tls_context_; } + security::TlsContext* mutable_tls_context() { return tls_context_.get(); } + + const security::TokenVerifier& token_verifier() const { return *token_verifier_; } + security::TokenVerifier* mutable_token_verifier() { return token_verifier_.get(); } + std::shared_ptr<security::TokenVerifier> shared_token_verifier() const { + return token_verifier_; + } + + boost::optional<security::SignedTokenPB> authn_token() const { + std::lock_guard<simple_spinlock> l(authn_token_lock_); + return authn_token_; + } + void set_authn_token(const security::SignedTokenPB& token) { + std::lock_guard<simple_spinlock> l(authn_token_lock_); + authn_token_ = token; + } + + RpcAuthentication authentication() const { return authentication_; } + RpcEncryption encryption() const { return encryption_; } + + ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); } + + RpczStore* rpcz_store() { return rpcz_store_.get(); } + + int num_reactors() const { return reactors_.size(); } + + const std::string& name() const { + return name_; + } + + bool closing() const { + shared_lock<rw_spinlock> l(lock_.get_lock()); + return closing_; + } + + scoped_refptr<MetricEntity> metric_entity() const { return metric_entity_.get(); } + + const scoped_refptr<RpcService> rpc_service(const std::string& service_name) const; + + private: + FRIEND_TEST(TestRpc, TestConnectionKeepalive); + FRIEND_TEST(TestRpc, TestCredentialsPolicy); + FRIEND_TEST(TestRpc, TestReopenOutboundConnections); + + explicit Messenger(const MessengerBuilder &bld); + + Reactor* RemoteToReactor(const Sockaddr &remote); + Status Init(); + void RunTimeoutThread(); + void UpdateCurTime(); + + // Called by external-facing shared_ptr when the user no longer holds + // any references. See 'retain_self_' for more info. + void AllExternalReferencesDropped(); + + const std::string name_; + + // Protects closing_, acceptor_pools_, rpc_services_. + mutable percpu_rwlock lock_; + + bool closing_; + + // Whether to require authentication and encryption on the connections managed + // by this messenger. + // TODO(KUDU-1928): scope these to individual proxies, so that messengers can be + // reused by different clients. + RpcAuthentication authentication_; + RpcEncryption encryption_; + + // Pools which are listening on behalf of this messenger. + // Note that the user may have called Shutdown() on one of these + // pools, so even though we retain the reference, it may no longer + // be listening. + acceptor_vec_t acceptor_pools_; + + // RPC services that handle inbound requests. + RpcServicesMap rpc_services_; + + std::vector<Reactor*> reactors_; + + gscoped_ptr<ThreadPool> negotiation_pool_; + + std::unique_ptr<security::TlsContext> tls_context_; + + // A TokenVerifier, which can verify client provided authentication tokens. + std::shared_ptr<security::TokenVerifier> token_verifier_; + + // An optional token, which can be used to authenticate to a server. + mutable simple_spinlock authn_token_lock_; + boost::optional<security::SignedTokenPB> authn_token_; + + std::unique_ptr<RpczStore> rpcz_store_; + + scoped_refptr<MetricEntity> metric_entity_; + + // The ownership of the Messenger object is somewhat subtle. The pointer graph + // looks like this: + // + // [User Code ] | [ Internal code ] + // | + // shared_ptr[1] | + // | | + // v + // Messenger <------------ shared_ptr[2] --- Reactor + // ^ | ----------- bare pointer --> Reactor + // \__/ + // shared_ptr[2] + // (retain_self_) + // + // shared_ptr[1] instances use Messenger::AllExternalReferencesDropped() + // as a deleter. + // shared_ptr[2] are "traditional" shared_ptrs which call 'delete' on the + // object. + // + // The teardown sequence is as follows: + // Option 1): User calls "Shutdown()" explicitly: + // - Messenger::Shutdown tells Reactors to shut down + // - When each reactor thread finishes, it drops its shared_ptr[2] + // - the Messenger::retain_self instance remains, keeping the Messenger + // alive. + // - The user eventually drops its shared_ptr[1], which calls + // Messenger::AllExternalReferencesDropped. This drops retain_self_ + // and results in object destruction. + // Option 2): User drops all of its shared_ptr[1] references + // - Though the Reactors still reference the Messenger, AllExternalReferencesDropped + // will get called, which triggers Messenger::Shutdown. + // - AllExternalReferencesDropped drops retain_self_, so the only remaining + // references are from Reactor threads. But the reactor threads are shutting down. + // - When the last Reactor thread dies, there will be no more shared_ptr[1] references + // and the Messenger will be destroyed. + // + // The main goal of all of this confusion is that the reactor threads need to be able + // to shut down asynchronously, and we need to keep the Messenger alive until they + // do so. So, handing out a normal shared_ptr to users would force the Messenger + // destructor to Join() the reactor threads, which causes a problem if the user + // tries to destruct the Messenger from within a Reactor thread itself. + std::shared_ptr<Messenger> retain_self_; + + DISALLOW_COPY_AND_ASSIGN(Messenger); +}; + +} // namespace rpc +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/mt-rpc-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/mt-rpc-test.cc b/be/src/kudu/rpc/mt-rpc-test.cc new file mode 100644 index 0000000..73e3a13 --- /dev/null +++ b/be/src/kudu/rpc/mt-rpc-test.cc @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc-test-base.h" + +#include <string> + +#include <boost/bind.hpp> +#include <gtest/gtest.h> + +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/metrics.h" +#include "kudu/util/test_util.h" + +METRIC_DECLARE_counter(rpc_connections_accepted); +METRIC_DECLARE_counter(rpcs_queue_overflow); + +using std::string; +using std::shared_ptr; +using strings::Substitute; + +namespace kudu { +namespace rpc { + +class MultiThreadedRpcTest : public RpcTestBase { + public: + // Make a single RPC call. + void SingleCall(Sockaddr server_addr, const char* method_name, + Status* result, CountDownLatch* latch) { + LOG(INFO) << "Connecting to " << server_addr.ToString(); + shared_ptr<Messenger> client_messenger(CreateMessenger("ClientSC")); + Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name()); + *result = DoTestSyncCall(p, method_name); + latch->CountDown(); + } + + // Make RPC calls until we see a failure. + void HammerServer(Sockaddr server_addr, const char* method_name, + Status* last_result) { + shared_ptr<Messenger> client_messenger(CreateMessenger("ClientHS")); + HammerServerWithMessenger(server_addr, method_name, last_result, client_messenger); + } + + void HammerServerWithMessenger( + Sockaddr server_addr, const char* method_name, Status* last_result, + const shared_ptr<Messenger>& messenger) { + LOG(INFO) << "Connecting to " << server_addr.ToString(); + Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name()); + + int i = 0; + while (true) { + i++; + Status s = DoTestSyncCall(p, method_name); + if (!s.ok()) { + // Return on first failure. + LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: " + << s.ToString(); + *last_result = s; + return; + } + } + } +}; + +static void AssertShutdown(kudu::Thread* thread, const Status* status) { + ASSERT_OK(ThreadJoiner(thread).warn_every_ms(500).Join()); + string msg = status->ToString(); + ASSERT_TRUE(msg.find("Service unavailable") != string::npos || + msg.find("Network error") != string::npos) + << "Status is actually: " << msg; +} + +// Test making several concurrent RPC calls while shutting down. +// Simply verify that we don't hit any CHECK errors. +TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) { + // Set up server. + Sockaddr server_addr; + StartTestServer(&server_addr); + + const int kNumThreads = 4; + scoped_refptr<kudu::Thread> threads[kNumThreads]; + Status statuses[kNumThreads]; + for (int i = 0; i < kNumThreads; i++) { + ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), + &MultiThreadedRpcTest::HammerServer, this, server_addr, + GenericCalculatorService::kAddMethodName, &statuses[i], &threads[i])); + } + + SleepFor(MonoDelta::FromMilliseconds(50)); + + // Shut down server. + ASSERT_OK(server_messenger_->UnregisterService(service_name_)); + service_pool_->Shutdown(); + server_messenger_->Shutdown(); + + for (int i = 0; i < kNumThreads; i++) { + AssertShutdown(threads[i].get(), &statuses[i]); + } +} + +// Test shutting down the client messenger exactly as a thread is about to start +// a new connection. This is a regression test for KUDU-104. +TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) { + // Set up server. + Sockaddr server_addr; + StartTestServer(&server_addr); + + shared_ptr<Messenger> client_messenger(CreateMessenger("Client")); + + scoped_refptr<kudu::Thread> thread; + Status status; + ASSERT_OK(kudu::Thread::Create("test", "test", + &MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr, + GenericCalculatorService::kAddMethodName, &status, client_messenger, &thread)); + + // Shut down the messenger after a very brief sleep. This often will race so that the + // call gets submitted to the messenger before shutdown, but the negotiation won't have + // started yet. In a debug build this fails about half the time without the bug fix. + // See KUDU-104. + SleepFor(MonoDelta::FromMicroseconds(10)); + client_messenger->Shutdown(); + client_messenger.reset(); + + ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join()); + ASSERT_TRUE(status.IsAborted() || + status.IsServiceUnavailable()); + string msg = status.ToString(); + SCOPED_TRACE(msg); + ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos || + msg.find("reactor is shutting down") != string::npos || + msg.find("Unable to start connection negotiation thread") != string::npos) + << "Status is actually: " << msg; +} + +// This bogus service pool leaves the service queue full. +class BogusServicePool : public ServicePool { + public: + BogusServicePool(gscoped_ptr<ServiceIf> service, + const scoped_refptr<MetricEntity>& metric_entity, + size_t service_queue_length) + : ServicePool(std::move(service), metric_entity, service_queue_length) { + } + virtual Status Init(int num_threads) OVERRIDE { + // Do nothing + return Status::OK(); + } +}; + +void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) { + string msg = status->ToString(); + if (msg.find("service queue is full") != string::npos) { + ++(*backpressure); + } else if (msg.find("shutting down") != string::npos) { + ++(*shutdown); + } else if (msg.find("got EOF from remote") != string::npos) { + ++(*shutdown); + } else { + FAIL() << "Unexpected status message: " << msg; + } +} + +// Test that we get a Service Unavailable error when we max out the incoming RPC service queue. +TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) { + const size_t kMaxConcurrency = 2; + + MessengerBuilder bld("messenger1"); + bld.set_num_reactors(kMaxConcurrency); + bld.set_metric_entity(metric_entity_); + CHECK_OK(bld.Build(&server_messenger_)); + + shared_ptr<AcceptorPool> pool; + ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool)); + ASSERT_OK(pool->Start(kMaxConcurrency)); + Sockaddr server_addr = pool->bind_address(); + + gscoped_ptr<ServiceIf> service(new GenericCalculatorService()); + service_name_ = service->service_name(); + service_pool_ = new BogusServicePool(std::move(service), + server_messenger_->metric_entity(), + kMaxConcurrency); + ASSERT_OK(service_pool_->Init(n_worker_threads_)); + server_messenger_->RegisterService(service_name_, service_pool_); + + scoped_refptr<kudu::Thread> threads[3]; + Status status[3]; + CountDownLatch latch(1); + for (int i = 0; i < 3; i++) { + ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), + &MultiThreadedRpcTest::SingleCall, this, server_addr, + GenericCalculatorService::kAddMethodName, &status[i], &latch, &threads[i])); + } + + // One should immediately fail due to backpressure. The latch is only initialized + // to wait for the first of three threads to finish. + latch.Wait(); + + // The rest would time out after 10 sec, but we help them along. + ASSERT_OK(server_messenger_->UnregisterService(service_name_)); + service_pool_->Shutdown(); + server_messenger_->Shutdown(); + + for (const auto& thread : threads) { + ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join()); + } + + // Verify that one error was due to backpressure. + int errors_backpressure = 0; + int errors_shutdown = 0; + + for (const auto& s : status) { + IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown); + } + + ASSERT_EQ(1, errors_backpressure); + ASSERT_EQ(2, errors_shutdown); + + // Check that RPC queue overflow metric is 1 + Counter *rpcs_queue_overflow = + METRIC_rpcs_queue_overflow.Instantiate(server_messenger_->metric_entity()).get(); + ASSERT_EQ(1, rpcs_queue_overflow->value()); +} + +static void HammerServerWithTCPConns(const Sockaddr& addr) { + while (true) { + Socket socket; + CHECK_OK(socket.Init(0)); + Status s; + LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") { + s = socket.Connect(addr); + } + if (!s.ok()) { + CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString(); + return; + } + CHECK_OK(socket.Close()); + } +} + +// Regression test for KUDU-128. +// Test that shuts down the server while new TCP connections are incoming. +TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) { + // Set up server. + Sockaddr server_addr; + StartTestServer(&server_addr); + + // Start a number of threads which just hammer the server with TCP connections. + vector<scoped_refptr<kudu::Thread> > threads; + for (int i = 0; i < 8; i++) { + scoped_refptr<kudu::Thread> new_thread; + CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i), + &HammerServerWithTCPConns, server_addr, &new_thread)); + threads.push_back(new_thread); + } + + // Sleep until the server has started to actually accept some connections from the + // test threads. + scoped_refptr<Counter> conns_accepted = + METRIC_rpc_connections_accepted.Instantiate(server_messenger_->metric_entity()); + while (conns_accepted->value() == 0) { + SleepFor(MonoDelta::FromMicroseconds(100)); + } + + // Shutdown while there are still new connections appearing. + ASSERT_OK(server_messenger_->UnregisterService(service_name_)); + service_pool_->Shutdown(); + server_messenger_->Shutdown(); + + for (scoped_refptr<kudu::Thread>& t : threads) { + ASSERT_OK(ThreadJoiner(t.get()).warn_every_ms(500).Join()); + } +} + +} // namespace rpc +} // namespace kudu +
