http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/outbound_call.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/outbound_call.h b/be/src/kudu/rpc/outbound_call.h new file mode 100644 index 0000000..ebed9b5 --- /dev/null +++ b/be/src/kudu/rpc/outbound_call.h @@ -0,0 +1,363 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_CLIENT_CALL_H +#define KUDU_RPC_CLIENT_CALL_H + +#include <set> +#include <string> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/macros.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/rpc_sidecar.h" +#include "kudu/rpc/remote_method.h" +#include "kudu/rpc/response_callback.h" +#include "kudu/rpc/transfer.h" +#include "kudu/rpc/user_credentials.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/slice.h" +#include "kudu/util/status.h" + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace kudu { +namespace rpc { + +class CallResponse; +class Connection; +class DumpRunningRpcsRequestPB; +class InboundTransfer; +class RpcCallInProgressPB; +class RpcController; +class RpcSidecar; + +// Used to key on Connection information. +// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual. +// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that). +class ConnectionId { + public: + ConnectionId(); + + // Copy constructor required for use with STL unordered_map. + ConnectionId(const ConnectionId& other); + + // Convenience constructor. + ConnectionId(const Sockaddr& remote, UserCredentials user_credentials); + + // The remote address. + void set_remote(const Sockaddr& remote); + const Sockaddr& remote() const { return remote_; } + + // The credentials of the user associated with this connection, if any. + void set_user_credentials(UserCredentials user_credentials); + const UserCredentials& user_credentials() const { return user_credentials_; } + UserCredentials* mutable_user_credentials() { return &user_credentials_; } + + // Copy state from another object to this one. + void CopyFrom(const ConnectionId& other); + + // Returns a string representation of the object, not including the password field. + std::string ToString() const; + + size_t HashCode() const; + bool Equals(const ConnectionId& other) const; + + private: + // Remember to update HashCode() and Equals() when new fields are added. + Sockaddr remote_; + UserCredentials user_credentials_; + + // Implementation of CopyFrom that can be shared with copy constructor. + void DoCopyFrom(const ConnectionId& other); + + // Disable assignment operator. + void operator=(const ConnectionId&); +}; + +class ConnectionIdHash { + public: + std::size_t operator() (const ConnectionId& conn_id) const; +}; + +class ConnectionIdEqual { + public: + bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const; +}; + +// Tracks the status of a call on the client side. +// +// This is an internal-facing class -- clients interact with the +// RpcController class. +// +// This is allocated by the Proxy when a call is first created, +// then passed to the reactor thread to send on the wire. It's typically +// kept using a shared_ptr because a call may terminate in any number +// of different threads, making it tricky to enforce single ownership. +class OutboundCall { + public: + + // Phases of an outbound RPC. Making an outbound RPC might involve establishing + // a connection to the remote server first, and the actual call is made only + // once the connection to the server is established. + enum class Phase { + // The phase of connection negotiation between the caller and the callee. + CONNECTION_NEGOTIATION, + + // The phase of sending a call over already established connection. + REMOTE_CALL, + }; + + OutboundCall(const ConnectionId& conn_id, const RemoteMethod& remote_method, + google::protobuf::Message* response_storage, + RpcController* controller, ResponseCallback callback); + + ~OutboundCall(); + + // Serialize the given request PB into this call's internal storage, and assume + // ownership of any sidecars that should accompany this request. + // + // Because the request data is fully serialized by this call, 'req' may be subsequently + // mutated with no ill effects. + void SetRequestPayload(const google::protobuf::Message& req, + std::vector<std::unique_ptr<RpcSidecar>>&& sidecars); + + // Assign the call ID for this call. This is called from the reactor + // thread once a connection has been assigned. Must only be called once. + void set_call_id(int32_t call_id) { + DCHECK_EQ(header_.call_id(), kInvalidCallId) << "Already has a call ID"; + header_.set_call_id(call_id); + } + + // Serialize the call for the wire. Requires that SetRequestPayload() + // is called first. This is called from the Reactor thread. + Status SerializeTo(std::vector<Slice>* slices); + + // Callback after the call has been put on the outbound connection queue. + void SetQueued(); + + // Update the call state to show that the request has started being sent + // on the socket. + void SetSending(); + + // Update the call state to show that the request has been sent. + void SetSent(); + + // Mark the call as failed. This also triggers the callback to notify + // the caller. If the call failed due to a remote error, then err_pb + // should be set to the error returned by the remote server. Takes + // ownership of 'err_pb'. + void SetFailed(const Status& status, + Phase phase = Phase::REMOTE_CALL, + ErrorStatusPB* err_pb = nullptr); + + // Mark the call as timed out. This also triggers the callback to notify + // the caller. + void SetTimedOut(Phase phase); + bool IsTimedOut() const; + + bool IsNegotiationError() const; + + // Is the call finished? + bool IsFinished() const; + + // Fill in the call response. + void SetResponse(gscoped_ptr<CallResponse> resp); + + const std::set<RpcFeatureFlag>& required_rpc_features() const { + return required_rpc_features_; + } + + std::string ToString() const; + + void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp); + + //////////////////////////////////////////////////////////// + // Getters + //////////////////////////////////////////////////////////// + + const ConnectionId& conn_id() const { return conn_id_; } + const RemoteMethod& remote_method() const { return remote_method_; } + const ResponseCallback &callback() const { return callback_; } + RpcController* controller() { return controller_; } + const RpcController* controller() const { return controller_; } + + // Return true if a call ID has been assigned to this call. + bool call_id_assigned() const { + return header_.call_id() != kInvalidCallId; + } + + int32_t call_id() const { + DCHECK(call_id_assigned()); + return header_.call_id(); + } + + private: + friend class RpcController; + + // Various states the call propagates through. + // NB: if adding another state, be sure to update OutboundCall::IsFinished() + // and OutboundCall::StateName(State state) as well. + enum State { + READY = 0, + ON_OUTBOUND_QUEUE, + SENDING, + SENT, + NEGOTIATION_TIMED_OUT, + TIMED_OUT, + FINISHED_NEGOTIATION_ERROR, + FINISHED_ERROR, + FINISHED_SUCCESS + }; + + static std::string StateName(State state); + + void set_state(State new_state); + State state() const; + + // Same as set_state, but requires that the caller already holds + // lock_ + void set_state_unlocked(State new_state); + + // return current status + Status status() const; + + // Time when the call was first initiatied. + MonoTime start_time_; + + // Return the error protobuf, if a remote error occurred. + // This will only be non-NULL if status().IsRemoteError(). + const ErrorStatusPB* error_pb() const; + + // Lock for state_ status_, error_pb_ fields, since they + // may be mutated by the reactor thread while the client thread + // reads them. + mutable simple_spinlock lock_; + State state_; + Status status_; + gscoped_ptr<ErrorStatusPB> error_pb_; + + // Call the user-provided callback. + void CallCallback(); + + // The RPC header. + // Parts of this (eg the call ID) are only assigned once this call has been + // passed to the reactor thread and assigned a connection. + RequestHeader header_; + + // The remote method being called. + RemoteMethod remote_method_; + + // RPC-system features required to send this call. + std::set<RpcFeatureFlag> required_rpc_features_; + + const ConnectionId conn_id_; + ResponseCallback callback_; + RpcController* controller_; + + // Pointer for the protobuf where the response should be written. + google::protobuf::Message* response_; + + // Buffers for storing segments of the wire-format request. + faststring header_buf_; + faststring request_buf_; + + // Once a response has been received for this call, contains that response. + // Otherwise NULL. + gscoped_ptr<CallResponse> call_response_; + + // All sidecars to be sent with this call. + std::vector<std::unique_ptr<RpcSidecar>> sidecars_; + + // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload(). + int64_t sidecar_byte_size_ = -1; + + DISALLOW_COPY_AND_ASSIGN(OutboundCall); +}; + +// A response to a call, on the client side. +// Upon receiving a response, this is allocated in the reactor thread and filled +// into the OutboundCall instance via OutboundCall::SetResponse. +// +// This may either be a success or error response. +// +// This class takes care of separating out the distinct payload slices sent +// over. +class CallResponse { + public: + CallResponse(); + + // Parse the response received from a call. This must be called before any + // other methods on this object. + Status ParseFrom(gscoped_ptr<InboundTransfer> transfer); + + // Return true if the call succeeded. + bool is_success() const { + DCHECK(parsed_); + return !header_.is_error(); + } + + // Return the call ID that this response is related to. + int32_t call_id() const { + DCHECK(parsed_); + return header_.call_id(); + } + + // Return the serialized response data. This is just the response "body" -- + // either a serialized ErrorStatusPB, or the serialized user response protobuf. + const Slice &serialized_response() const { + DCHECK(parsed_); + return serialized_response_; + } + + // See RpcController::GetSidecar() + Status GetSidecar(int idx, Slice* sidecar) const; + + private: + // True once ParseFrom() is called. + bool parsed_; + + // The parsed header. + ResponseHeader header_; + + // The slice of data for the encoded protobuf response. + // This slice refers to memory allocated by transfer_ + Slice serialized_response_; + + // Slices of data for rpc sidecars. They point into memory owned by transfer_. + Slice sidecar_slices_[TransferLimits::kMaxSidecars]; + + // The incoming transfer data - retained because serialized_response_ + // and sidecar_slices_ refer into its data. + gscoped_ptr<InboundTransfer> transfer_; + + DISALLOW_COPY_AND_ASSIGN(CallResponse); +}; + +} // namespace rpc +} // namespace kudu + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/protoc-gen-krpc.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/protoc-gen-krpc.cc b/be/src/kudu/rpc/protoc-gen-krpc.cc new file mode 100644 index 0000000..de41aa9 --- /dev/null +++ b/be/src/kudu/rpc/protoc-gen-krpc.cc @@ -0,0 +1,674 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//////////////////////////////////////////////////////////////////////////////// +// Example usage: +// protoc --plugin=protoc-gen-krpc --krpc_out . --proto_path . <file>.proto +//////////////////////////////////////////////////////////////////////////////// + +#include <ctype.h> + +#include <iostream> +#include <map> +#include <memory> +#include <sstream> +#include <string> + +#include <boost/optional.hpp> +#include <glog/logging.h> +#include <google/protobuf/compiler/code_generator.h> +#include <google/protobuf/compiler/plugin.h> +#include <google/protobuf/descriptor.h> +#include <google/protobuf/descriptor.pb.h> +#include <google/protobuf/io/printer.h> +#include <google/protobuf/io/zero_copy_stream.h> +#include <google/protobuf/stubs/common.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/stringpiece.h" +#include "kudu/gutil/strings/strip.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/util/status.h" +#include "kudu/util/string_case.h" + +using boost::optional; +using google::protobuf::FileDescriptor; +using google::protobuf::io::Printer; +using google::protobuf::MethodDescriptor; +using google::protobuf::ServiceDescriptor; +using std::map; +using std::shared_ptr; +using std::set; +using std::string; +using std::vector; + +namespace kudu { +namespace rpc { + +namespace { + +// Return the name of the authorization method specified for this +// RPC method, or boost::none if none is specified. +// +// This handles fallback to the service-wide default. +optional<string> GetAuthzMethod(const MethodDescriptor& method) { + if (method.options().HasExtension(authz_method)) { + return method.options().GetExtension(authz_method); + } + if (method.service()->options().HasExtension(default_authz_method)) { + return method.service()->options().GetExtension(default_authz_method); + } + return boost::none; +} + +} // anonymous namespace + +class Substituter { + public: + virtual ~Substituter() {} + virtual void InitSubstitutionMap(map<string, string> *map) const = 0; +}; + +// NameInfo contains information about the output names. +class FileSubstitutions : public Substituter { + public: + static const std::string PROTO_EXTENSION; + + Status Init(const FileDescriptor *file) { + string path = file->name(); + map_["path"] = path; + + // Initialize path_ + // If path = /foo/bar/baz_stuff.proto, path_ = /foo/bar/baz_stuff + if (!TryStripSuffixString(path, PROTO_EXTENSION, &path_no_extension_)) { + return Status::InvalidArgument("file name " + path + + " did not end in " + PROTO_EXTENSION); + } + map_["path_no_extension"] = path_no_extension_; + + // If path = /foo/bar/baz_stuff.proto, base_ = baz_stuff + string base; + GetBaseName(path_no_extension_, &base); + map_["base"] = base; + + // If path = /foo/bar/baz_stuff.proto, camel_case_ = BazStuff + string camel_case; + SnakeToCamelCase(base, &camel_case); + map_["camel_case"] = camel_case; + + // If path = /foo/bar/baz_stuff.proto, upper_case_ = BAZ_STUFF + string upper_case; + ToUpperCase(base, &upper_case); + map_["upper_case"] = upper_case; + + map_["open_namespace"] = GenerateOpenNamespace(file->package()); + map_["close_namespace"] = GenerateCloseNamespace(file->package()); + + return Status::OK(); + } + + virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE { + typedef std::map<string, string>::value_type kv_pair; + for (const kv_pair &pair : map_) { + (*map)[pair.first] = pair.second; + } + } + + std::string service_header() const { + return path_no_extension_ + ".service.h"; + } + + std::string service() const { + return path_no_extension_ + ".service.cc"; + } + + std::string proxy_header() const { + return path_no_extension_ + ".proxy.h"; + } + + std::string proxy() const { + return path_no_extension_ + ".proxy.cc"; + } + + private: + // Extract the last filename component. + static void GetBaseName(const string &path, + string *base) { + size_t last_slash = path.find_last_of("/"); + if (last_slash != string::npos) { + *base = path.substr(last_slash + 1); + } else { + *base = path; + } + } + + static string GenerateOpenNamespace(const string &str) { + vector<string> components = strings::Split(str, "."); + string out; + for (const string &c : components) { + out.append("namespace ").append(c).append(" {\n"); + } + return out; + } + + static string GenerateCloseNamespace(const string &str) { + vector<string> components = strings::Split(str, "."); + string out; + for (auto c = components.crbegin(); c != components.crend(); c++) { + out.append("} // namespace ").append(*c).append("\n"); + } + return out; + } + + std::string path_no_extension_; + map<string, string> map_; +}; + +const std::string FileSubstitutions::PROTO_EXTENSION(".proto"); + +class MethodSubstitutions : public Substituter { + public: + explicit MethodSubstitutions(const MethodDescriptor *method) + : method_(method) { + } + + virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE { + + (*map)["rpc_name"] = method_->name(); + (*map)["rpc_full_name"] = method_->full_name(); + (*map)["rpc_full_name_plainchars"] = + StringReplace(method_->full_name(), ".", "_", true); + (*map)["request"] = + ReplaceNamespaceDelimiters( + StripNamespaceIfPossible(method_->service()->full_name(), + method_->input_type()->full_name())); + (*map)["response"] = + ReplaceNamespaceDelimiters( + StripNamespaceIfPossible(method_->service()->full_name(), + method_->output_type()->full_name())); + (*map)["metric_enum_key"] = strings::Substitute("kMetricIndex$0", method_->name()); + bool track_result = static_cast<bool>(method_->options().GetExtension(track_rpc_result)); + (*map)["track_result"] = track_result ? " true" : "false"; + (*map)["authz_method"] = GetAuthzMethod(*method_).get_value_or("AuthorizeAllowAll"); + } + + // Strips the package from method arguments if they are in the same package as + // the service, otherwise leaves them so that we can have fully qualified + // namespaces for method arguments. + static std::string StripNamespaceIfPossible(const std::string& service_full_name, + const std::string& arg_full_name) { + StringPiece service_package(service_full_name); + if (!service_package.contains(".")) { + return arg_full_name; + } + // remove the service name so that we are left with only the package, including + // the last '.' so that we account for different packages with the same prefix. + service_package.remove_suffix(service_package.length() - + service_package.find_last_of(".") - 1); + + StringPiece argfqn(arg_full_name); + if (argfqn.starts_with(service_package)) { + argfqn.remove_prefix(argfqn.find_last_of(".") + 1); + } + return argfqn.ToString(); + } + + static std::string ReplaceNamespaceDelimiters(const std::string& arg_full_name) { + return JoinStrings(strings::Split(arg_full_name, "."), "::"); + } + + private: + const MethodDescriptor *method_; +}; + +class ServiceSubstitutions : public Substituter { + public: + explicit ServiceSubstitutions(const ServiceDescriptor *service) + : service_(service) + {} + + virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE { + (*map)["service_name"] = service_->name(); + (*map)["full_service_name"] = service_->full_name(); + (*map)["service_method_count"] = SimpleItoa(service_->method_count()); + + // TODO: upgrade to protobuf 2.5.x and attach service comments + // to the generated service classes using the SourceLocation API. + } + + private: + const ServiceDescriptor *service_; +}; + + +class SubstitutionContext { + public: + // Takes ownership of the substituter + void Push(const Substituter *sub) { + subs_.push_back(shared_ptr<const Substituter>(sub)); + } + + void PushMethod(const MethodDescriptor *method) { + Push(new MethodSubstitutions(method)); + } + + void PushService(const ServiceDescriptor *service) { + Push(new ServiceSubstitutions(service)); + } + + void Pop() { + CHECK(!subs_.empty()); + subs_.pop_back(); + } + + void InitSubstitutionMap(map<string, string> *subs) const { + for (const shared_ptr<const Substituter> &sub : subs_) { + sub->InitSubstitutionMap(subs); + } + } + + private: + vector<shared_ptr<const Substituter> > subs_; +}; + + + +class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator { + public: + CodeGenerator() { } + + ~CodeGenerator() { } + + bool Generate(const google::protobuf::FileDescriptor *file, + const std::string &/* parameter */, + google::protobuf::compiler::GeneratorContext *gen_context, + std::string *error) const OVERRIDE { + auto name_info = new FileSubstitutions(); + Status ret = name_info->Init(file); + if (!ret.ok()) { + *error = "name_info.Init failed: " + ret.ToString(); + return false; + } + + SubstitutionContext subs; + subs.Push(name_info); + + gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> ih_output( + gen_context->Open(name_info->service_header())); + Printer ih_printer(ih_output.get(), '$'); + GenerateServiceIfHeader(&ih_printer, &subs, file); + + gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> i_output( + gen_context->Open(name_info->service())); + Printer i_printer(i_output.get(), '$'); + GenerateServiceIf(&i_printer, &subs, file); + + gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> ph_output( + gen_context->Open(name_info->proxy_header())); + Printer ph_printer(ph_output.get(), '$'); + GenerateProxyHeader(&ph_printer, &subs, file); + + gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> p_output( + gen_context->Open(name_info->proxy())); + Printer p_printer(p_output.get(), '$'); + GenerateProxy(&p_printer, &subs, file); + + return true; + } + + private: + void Print(Printer *printer, + const SubstitutionContext &sub, + const char *text) const { + map<string, string> subs; + sub.InitSubstitutionMap(&subs); + printer->Print(subs, text); + } + + void GenerateServiceIfHeader(Printer *printer, + SubstitutionContext *subs, + const FileDescriptor *file) const { + Print(printer, *subs, + "// THIS FILE IS AUTOGENERATED FROM $path$\n" + "\n" + "#ifndef KUDU_RPC_$upper_case$_SERVICE_IF_DOT_H\n" + "#define KUDU_RPC_$upper_case$_SERVICE_IF_DOT_H\n" + "\n" + "#include \"$path_no_extension$.pb.h\"\n" + "\n" + "#include <functional>\n" + "#include <memory>\n" + "#include <string>\n" + "\n" + "#include \"kudu/rpc/rpc_header.pb.h\"\n" + "#include \"kudu/rpc/service_if.h\"\n" + "\n" + "namespace kudu {\n" + "class MetricEntity;\n" + "namespace rpc {\n" + "class Messenger;\n" + "class ResultTracker;\n" + "class RpcContext;\n" + "} // namespace rpc\n" + "} // namespace kudu\n" + "\n" + "$open_namespace$" + "\n" + ); + + for (int service_idx = 0; service_idx < file->service_count(); + ++service_idx) { + const ServiceDescriptor *service = file->service(service_idx); + subs->PushService(service); + + Print(printer, *subs, + "class $service_name$If : public ::kudu::rpc::GeneratedServiceIf {\n" + " public:\n" + " explicit $service_name$If(const scoped_refptr<::kudu::MetricEntity>& entity," + " const scoped_refptr<::kudu::rpc::ResultTracker>& result_tracker);\n" + " virtual ~$service_name$If();\n" + " std::string service_name() const override;\n" + " static std::string static_service_name();\n" + "\n" + ); + + set<string> authz_methods; + for (int method_idx = 0; method_idx < service->method_count(); + ++method_idx) { + const MethodDescriptor *method = service->method(method_idx); + subs->PushMethod(method); + + Print(printer, *subs, + " virtual void $rpc_name$(const $request$ *req,\n" + " $response$ *resp, ::kudu::rpc::RpcContext *context) = 0;\n" + ); + subs->Pop(); + if (auto m = GetAuthzMethod(*method)) { + authz_methods.insert(m.get()); + } + } + + if (!authz_methods.empty()) { + printer->Print( + "\n\n" + " // Authorization methods\n" + " // ---------------------\n\n"); + } + for (const string& m : authz_methods) { + printer->Print({ {"m", m} }, + " virtual bool $m$(const google::protobuf::Message* req,\n" + " google::protobuf::Message* resp, ::kudu::rpc::RpcContext *context) = 0;\n"); + } + + Print(printer, *subs, + "\n" + "};\n" + ); + + subs->Pop(); // Service + } + + Print(printer, *subs, + "\n" + "$close_namespace$\n" + "#endif\n"); + } + + void GenerateServiceIf(Printer *printer, + SubstitutionContext *subs, + const FileDescriptor *file) const { + Print(printer, *subs, + "// THIS FILE IS AUTOGENERATED FROM $path$\n" + "\n" + "#include \"$path_no_extension$.pb.h\"\n" + "#include \"$path_no_extension$.service.h\"\n" + "\n" + "#include <glog/logging.h>\n" + "\n" + "#include \"kudu/rpc/inbound_call.h\"\n" + "#include \"kudu/rpc/remote_method.h\"\n" + "#include \"kudu/rpc/rpc_context.h\"\n" + "#include \"kudu/rpc/service_if.h\"\n" + "#include \"kudu/util/metrics.h\"\n" + "\n"); + + // Define metric prototypes for each method in the service. + for (int service_idx = 0; service_idx < file->service_count(); + ++service_idx) { + const ServiceDescriptor *service = file->service(service_idx); + subs->PushService(service); + + for (int method_idx = 0; method_idx < service->method_count(); + ++method_idx) { + const MethodDescriptor *method = service->method(method_idx); + subs->PushMethod(method); + Print(printer, *subs, + "METRIC_DEFINE_histogram(server, handler_latency_$rpc_full_name_plainchars$,\n" + " \"$rpc_full_name$ RPC Time\",\n" + " kudu::MetricUnit::kMicroseconds,\n" + " \"Microseconds spent handling $rpc_full_name$() RPC requests\",\n" + " 60000000LU, 2);\n" + "\n"); + subs->Pop(); + } + + subs->Pop(); + } + + Print(printer, *subs, + "using google::protobuf::Message;\n" + "using kudu::MetricEntity;\n" + "using kudu::rpc::ResultTracker;\n" + "using kudu::rpc::RpcContext;\n" + "using kudu::rpc::RpcMethodInfo;\n" + "using std::unique_ptr;\n" + "\n" + "$open_namespace$" + "\n"); + + for (int service_idx = 0; service_idx < file->service_count(); + ++service_idx) { + const ServiceDescriptor *service = file->service(service_idx); + subs->PushService(service); + + Print(printer, *subs, + "$service_name$If::$service_name$If(const scoped_refptr<MetricEntity>& entity," + " const scoped_refptr<ResultTracker>& result_tracker) {\n" + "result_tracker_ = result_tracker;\n" + ); + for (int method_idx = 0; method_idx < service->method_count(); + ++method_idx) { + const MethodDescriptor *method = service->method(method_idx); + subs->PushMethod(method); + + Print(printer, *subs, + " {\n" + " scoped_refptr<RpcMethodInfo> mi(new RpcMethodInfo());\n" + " mi->req_prototype.reset(new $request$());\n" + " mi->resp_prototype.reset(new $response$());\n" + " mi->authz_method = [this](const Message* req, Message* resp,\n" + " RpcContext* ctx) {\n" + " return this->$authz_method$(static_cast<const $request$*>(req),\n" + " static_cast<$response$*>(resp),\n" + " ctx);\n" + " };\n" + " mi->track_result = $track_result$;\n" + " mi->handler_latency_histogram =\n" + " METRIC_handler_latency_$rpc_full_name_plainchars$.Instantiate(entity);\n" + " mi->func = [this](const Message* req, Message* resp, RpcContext* ctx) {\n" + " this->$rpc_name$(static_cast<const $request$*>(req),\n" + " static_cast<$response$*>(resp),\n" + " ctx);\n" + " };\n" + " methods_by_name_[\"$rpc_name$\"] = std::move(mi);\n" + " }\n"); + subs->Pop(); + } + + Print(printer, *subs, + "}\n" + "\n" + "$service_name$If::~$service_name$If() {\n" + "}\n" + "\n" + "std::string $service_name$If::service_name() const {\n" + " return \"$full_service_name$\";\n" + "}\n" + "std::string $service_name$If::static_service_name() {\n" + " return \"$full_service_name$\";\n" + "}\n" + "\n" + ); + + subs->Pop(); + } + + Print(printer, *subs, + "$close_namespace$" + ); + } + + void GenerateProxyHeader(Printer *printer, + SubstitutionContext *subs, + const FileDescriptor *file) const { + Print(printer, *subs, + "// THIS FILE IS AUTOGENERATED FROM $path$\n" + "\n" + "#ifndef KUDU_RPC_$upper_case$_PROXY_DOT_H\n" + "#define KUDU_RPC_$upper_case$_PROXY_DOT_H\n" + "\n" + "#include \"$path_no_extension$.pb.h\"\n" + "\n" + "#include \"kudu/rpc/proxy.h\"\n" + "#include \"kudu/util/status.h\"\n" + "\n" + "namespace kudu { class Sockaddr; }\n" + "namespace kudu { namespace rpc { class UserCredentials; } }\n" + "$open_namespace$" + "\n" + "\n" + ); + + for (int service_idx = 0; service_idx < file->service_count(); + ++service_idx) { + const ServiceDescriptor *service = file->service(service_idx); + subs->PushService(service); + + Print(printer, *subs, + "class $service_name$Proxy : public ::kudu::rpc::Proxy {\n" + " public:\n" + " $service_name$Proxy(const std::shared_ptr< ::kudu::rpc::Messenger>\n" + " &messenger, const ::kudu::Sockaddr &sockaddr);\n" + " ~$service_name$Proxy();\n" + "\n" + ); + + for (int method_idx = 0; method_idx < service->method_count(); + ++method_idx) { + const MethodDescriptor *method = service->method(method_idx); + subs->PushMethod(method); + + Print(printer, *subs, + "\n" + " ::kudu::Status $rpc_name$(const $request$ &req, $response$ *resp,\n" + " ::kudu::rpc::RpcController *controller);\n" + " void $rpc_name$Async(const $request$ &req,\n" + " $response$ *response,\n" + " ::kudu::rpc::RpcController *controller,\n" + " const ::kudu::rpc::ResponseCallback &callback);\n" + ); + subs->Pop(); + } + Print(printer, *subs, + "};\n"); + subs->Pop(); + } + Print(printer, *subs, + "\n" + "$close_namespace$" + "\n" + "#endif\n" + ); + } + + void GenerateProxy(Printer *printer, + SubstitutionContext *subs, + const FileDescriptor *file) const { + Print(printer, *subs, + "// THIS FILE IS AUTOGENERATED FROM $path$\n" + "\n" + "#include \"$path_no_extension$.proxy.h\"\n" + "\n" + "#include \"kudu/rpc/outbound_call.h\"\n" + "#include \"kudu/util/net/sockaddr.h\"\n" + "\n" + "$open_namespace$" + "\n" + ); + + for (int service_idx = 0; service_idx < file->service_count(); + ++service_idx) { + const ServiceDescriptor *service = file->service(service_idx); + subs->PushService(service); + Print(printer, *subs, + "$service_name$Proxy::$service_name$Proxy(\n" + " const std::shared_ptr< ::kudu::rpc::Messenger> &messenger,\n" + " const ::kudu::Sockaddr &remote)\n" + " : Proxy(messenger, remote, \"$full_service_name$\") {\n" + "}\n" + "\n" + "$service_name$Proxy::~$service_name$Proxy() {\n" + "}\n" + "\n" + "\n"); + for (int method_idx = 0; method_idx < service->method_count(); + ++method_idx) { + const MethodDescriptor *method = service->method(method_idx); + subs->PushMethod(method); + Print(printer, *subs, + "::kudu::Status $service_name$Proxy::$rpc_name$(const $request$ &req, $response$ *resp,\n" + " ::kudu::rpc::RpcController *controller) {\n" + " return SyncRequest(\"$rpc_name$\", req, resp, controller);\n" + "}\n" + "\n" + "void $service_name$Proxy::$rpc_name$Async(const $request$ &req,\n" + " $response$ *resp, ::kudu::rpc::RpcController *controller,\n" + " const ::kudu::rpc::ResponseCallback &callback) {\n" + " AsyncRequest(\"$rpc_name$\", req, resp, controller, callback);\n" + "}\n" + "\n"); + subs->Pop(); + } + + subs->Pop(); + } + Print(printer, *subs, + "$close_namespace$"); + } +}; +} // namespace rpc +} // namespace kudu + +int main(int argc, char *argv[]) { + kudu::rpc::CodeGenerator generator; + return google::protobuf::compiler::PluginMain(argc, argv, &generator); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/proxy.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/proxy.cc b/be/src/kudu/rpc/proxy.cc new file mode 100644 index 0000000..45ad5dd --- /dev/null +++ b/be/src/kudu/rpc/proxy.cc @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/proxy.h" + +#include <boost/bind.hpp> +#include <glog/logging.h> +#include <inttypes.h> +#include <memory> +#include <stdint.h> + +#include <iostream> +#include <sstream> +#include <vector> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/outbound_call.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/remote_method.h" +#include "kudu/rpc/response_callback.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/status.h" +#include "kudu/util/user.h" + +using google::protobuf::Message; +using std::string; +using std::shared_ptr; + +namespace kudu { +namespace rpc { + +Proxy::Proxy(std::shared_ptr<Messenger> messenger, + const Sockaddr& remote, string service_name) + : service_name_(std::move(service_name)), + messenger_(std::move(messenger)), + is_started_(false) { + CHECK(messenger_ != nullptr); + DCHECK(!service_name_.empty()) << "Proxy service name must not be blank"; + + // By default, we set the real user to the currently logged-in user. + // Effective user and password remain blank. + string real_user; + Status s = GetLoggedInUser(&real_user); + if (!s.ok()) { + LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: " + << s.ToString() << " before connecting to remote: " << remote.ToString(); + } + + conn_id_.set_remote(remote); + conn_id_.mutable_user_credentials()->set_real_user(real_user); +} + +Proxy::~Proxy() { +} + +void Proxy::AsyncRequest(const string& method, + const google::protobuf::Message& req, + google::protobuf::Message* response, + RpcController* controller, + const ResponseCallback& callback) const { + CHECK(!controller->call_) << "Controller should be reset"; + base::subtle::NoBarrier_Store(&is_started_, true); + RemoteMethod remote_method(service_name_, method); + controller->call_.reset( + new OutboundCall(conn_id_, remote_method, response, controller, callback)); + controller->SetRequestParam(req); + + // If this fails to queue, the callback will get called immediately + // and the controller will be in an ERROR state. + messenger_->QueueOutboundCall(controller->call_); +} + + +Status Proxy::SyncRequest(const string& method, + const google::protobuf::Message& req, + google::protobuf::Message* resp, + RpcController* controller) const { + CountDownLatch latch(1); + AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller, + boost::bind(&CountDownLatch::CountDown, boost::ref(latch))); + + latch.Wait(); + return controller->status(); +} + +void Proxy::set_user_credentials(const UserCredentials& user_credentials) { + CHECK(base::subtle::NoBarrier_Load(&is_started_) == false) + << "It is illegal to call set_user_credentials() after request processing has started"; + conn_id_.set_user_credentials(user_credentials); +} + +std::string Proxy::ToString() const { + return strings::Substitute("$0@$1", service_name_, conn_id_.ToString()); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/proxy.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/proxy.h b/be/src/kudu/rpc/proxy.h new file mode 100644 index 0000000..ddbbe60 --- /dev/null +++ b/be/src/kudu/rpc/proxy.h @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_PROXY_H +#define KUDU_RPC_PROXY_H + +#include <memory> +#include <string> + +#include "kudu/gutil/atomicops.h" +#include "kudu/rpc/outbound_call.h" +#include "kudu/rpc/response_callback.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/status.h" + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace kudu { +namespace rpc { + +class Messenger; + +// Interface to send calls to a remote service. +// +// Proxy objects do not map one-to-one with TCP connections. The underlying TCP +// connection is not established until the first call, and may be torn down and +// re-established as necessary by the messenger. Additionally, the messenger is +// likely to multiplex many Proxy objects on the same connection. +// +// Proxy objects are thread-safe after initialization only. +// Setters on the Proxy are not thread-safe, and calling a setter after any RPC +// request has started will cause a fatal error. +// +// After initialization, multiple threads may make calls using the same proxy object. +class Proxy { + public: + Proxy(std::shared_ptr<Messenger> messenger, const Sockaddr& remote, + std::string service_name); + ~Proxy(); + + // Call a remote method asynchronously. + // + // Typically, users will not call this directly, but rather through + // a generated Proxy subclass. + // + // method: the method name to invoke on the remote server. + // + // req: the request protobuf. This will be serialized immediately, + // so the caller may free or otherwise mutate 'req' safely. + // + // resp: the response protobuf. This protobuf will be mutated upon + // completion of the call. The RPC system does not take ownership + // of this storage. + // + // NOTE: 'req' and 'resp' should be the appropriate protocol buffer implementation + // class corresponding to the parameter and result types of the service method + // defined in the service's '.proto' file. + // + // controller: the RpcController to associate with this call. Each call + // must use a unique controller object. Does not take ownership. + // + // callback: the callback to invoke upon call completion. This callback may + // be invoked before AsyncRequest() itself returns, or any time + // thereafter. It may be invoked either on the caller's thread + // or by an RPC IO thread, and thus should take care to not + // block or perform any heavy CPU work. + void AsyncRequest(const std::string& method, + const google::protobuf::Message& req, + google::protobuf::Message* resp, + RpcController* controller, + const ResponseCallback& callback) const; + + // The same as AsyncRequest(), except that the call blocks until the call + // finishes. If the call fails, returns a non-OK result. + Status SyncRequest(const std::string& method, + const google::protobuf::Message& req, + google::protobuf::Message* resp, + RpcController* controller) const; + + // Set the user credentials which should be used to log in. + void set_user_credentials(const UserCredentials& user_credentials); + + // Get the user credentials which should be used to log in. + const UserCredentials& user_credentials() const { return conn_id_.user_credentials(); } + + std::string ToString() const; + + private: + const std::string service_name_; + std::shared_ptr<Messenger> messenger_; + ConnectionId conn_id_; + mutable Atomic32 is_started_; + + DISALLOW_COPY_AND_ASSIGN(Proxy); +}; + +} // namespace rpc +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/reactor-test.cc b/be/src/kudu/rpc/reactor-test.cc new file mode 100644 index 0000000..2faac2a --- /dev/null +++ b/be/src/kudu/rpc/reactor-test.cc @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/reactor.h" + +#include "kudu/rpc/rpc-test-base.h" +#include "kudu/util/countdown_latch.h" + +using std::shared_ptr; + +namespace kudu { +namespace rpc { + +class ReactorTest : public RpcTestBase { + public: + ReactorTest() + : messenger_(CreateMessenger("my_messenger", 4)), + latch_(1) { + } + + void ScheduledTask(const Status& status, const Status& expected_status) { + CHECK_EQ(expected_status.CodeAsString(), status.CodeAsString()); + latch_.CountDown(); + } + + void ScheduledTaskCheckThread(const Status& status, const Thread* thread) { + CHECK_OK(status); + CHECK_EQ(thread, Thread::current_thread()); + latch_.CountDown(); + } + + void ScheduledTaskScheduleAgain(const Status& status) { + messenger_->ScheduleOnReactor( + boost::bind(&ReactorTest::ScheduledTaskCheckThread, this, _1, + Thread::current_thread()), + MonoDelta::FromMilliseconds(0)); + latch_.CountDown(); + } + + protected: + const shared_ptr<Messenger> messenger_; + CountDownLatch latch_; +}; + +TEST_F(ReactorTest, TestFunctionIsCalled) { + messenger_->ScheduleOnReactor( + boost::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()), + MonoDelta::FromSeconds(0)); + latch_.Wait(); +} + +TEST_F(ReactorTest, TestFunctionIsCalledAtTheRightTime) { + MonoTime before = MonoTime::Now(); + messenger_->ScheduleOnReactor( + boost::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()), + MonoDelta::FromMilliseconds(100)); + latch_.Wait(); + MonoTime after = MonoTime::Now(); + MonoDelta delta = after - before; + CHECK_GE(delta.ToMilliseconds(), 100); +} + +TEST_F(ReactorTest, TestFunctionIsCalledIfReactorShutdown) { + messenger_->ScheduleOnReactor( + boost::bind(&ReactorTest::ScheduledTask, this, _1, + Status::Aborted("doesn't matter")), + MonoDelta::FromSeconds(60)); + messenger_->Shutdown(); + latch_.Wait(); +} + +TEST_F(ReactorTest, TestReschedulesOnSameReactorThread) { + // Our scheduled task will schedule yet another task. + latch_.Reset(2); + + messenger_->ScheduleOnReactor( + boost::bind(&ReactorTest::ScheduledTaskScheduleAgain, this, _1), + MonoDelta::FromSeconds(0)); + latch_.Wait(); + latch_.Wait(); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc new file mode 100644 index 0000000..e235dd4 --- /dev/null +++ b/be/src/kudu/rpc/reactor.cc @@ -0,0 +1,750 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/reactor.h" + +#include <arpa/inet.h> +#include <netinet/in.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> + +#include <memory> +#include <mutex> +#include <string> + +#include <boost/intrusive/list.hpp> +#include <boost/optional.hpp> +#include <ev++.h> +#include <glog/logging.h> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/client_negotiation.h" +#include "kudu/rpc/connection.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/negotiation.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_introspection.pb.h" +#include "kudu/rpc/server_negotiation.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/debug/sanitizer_scopes.h" +#include "kudu/util/errno.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" +#include "kudu/util/thread_restrictions.h" +#include "kudu/util/threadpool.h" +#include "kudu/util/trace.h" + +// When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop. +// Otherwise we run into problems because 'select' can't handle connections when more than 1024 +// file descriptors are open by the process. +#if defined(__APPLE__) +static const int kDefaultLibEvFlags = ev::KQUEUE; +#else +static const int kDefaultLibEvFlags = ev::AUTO; +#endif + +using std::string; +using std::shared_ptr; +using std::unique_ptr; +using strings::Substitute; + +DEFINE_int64(rpc_negotiation_timeout_ms, 3000, + "Timeout for negotiating an RPC connection."); +TAG_FLAG(rpc_negotiation_timeout_ms, advanced); +TAG_FLAG(rpc_negotiation_timeout_ms, runtime); + +DEFINE_bool(rpc_reopen_outbound_connections, false, + "Open a new connection to the server for every RPC call. " + "If not enabled, an already existing connection to a " + "server is reused upon making another call to the same server. " + "When this flag is enabled, an already existing _idle_ connection " + "to the server is closed upon making another RPC call which would " + "reuse the connection otherwise. " + "Used by tests only."); +TAG_FLAG(rpc_reopen_outbound_connections, unsafe); +TAG_FLAG(rpc_reopen_outbound_connections, runtime); + +namespace kudu { +namespace rpc { + +namespace { +Status ShutdownError(bool aborted) { + const char* msg = "reactor is shutting down"; + return aborted ? + Status::Aborted(msg, "", ESHUTDOWN) : + Status::ServiceUnavailable(msg, "", ESHUTDOWN); +} +} // anonymous namespace + +ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld) + : loop_(kDefaultLibEvFlags), + cur_time_(MonoTime::Now()), + last_unused_tcp_scan_(cur_time_), + reactor_(reactor), + connection_keepalive_time_(bld.connection_keepalive_time_), + coarse_timer_granularity_(bld.coarse_timer_granularity_), + total_client_conns_cnt_(0), + total_server_conns_cnt_(0) { +} + +Status ReactorThread::Init() { + DCHECK(thread_.get() == nullptr) << "Already started"; + DVLOG(6) << "Called ReactorThread::Init()"; + // Register to get async notifications in our epoll loop. + async_.set(loop_); + async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this); + async_.start(); + + // Register the timer watcher. + // The timer is used for closing old TCP connections and applying + // backpressure. + timer_.set(loop_); + timer_.set<ReactorThread, &ReactorThread::TimerHandler>(this); // NOLINT(*) + timer_.start(coarse_timer_granularity_.ToSeconds(), + coarse_timer_granularity_.ToSeconds()); + + // Create Reactor thread. + return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_); +} + +void ReactorThread::Shutdown() { + CHECK(reactor_->closing()) << "Should be called after setting closing_ flag"; + + VLOG(1) << name() << ": shutting down Reactor thread."; + WakeThread(); +} + +void ReactorThread::ShutdownInternal() { + DCHECK(IsCurrentThread()); + + // Tear down any outbound TCP connections. + Status service_unavailable = ShutdownError(false); + VLOG(1) << name() << ": tearing down outbound TCP connections..."; + for (const auto& elem : client_conns_) { + const auto& conn = elem.second; + VLOG(1) << name() << ": shutting down " << conn->ToString(); + conn->Shutdown(service_unavailable); + } + client_conns_.clear(); + + // Tear down any inbound TCP connections. + VLOG(1) << name() << ": tearing down inbound TCP connections..."; + for (const auto& conn : server_conns_) { + VLOG(1) << name() << ": shutting down " << conn->ToString(); + conn->Shutdown(service_unavailable); + } + server_conns_.clear(); + + // Abort any scheduled tasks. + // + // These won't be found in the ReactorThread's list of pending tasks + // because they've been "run" (that is, they've been scheduled). + Status aborted = ShutdownError(true); // aborted + for (DelayedTask* task : scheduled_tasks_) { + task->Abort(aborted); // should also free the task. + } + scheduled_tasks_.clear(); + + // Remove the OpenSSL thread state. + ERR_remove_thread_state(nullptr); +} + +ReactorTask::ReactorTask() { +} +ReactorTask::~ReactorTask() { +} + +Status ReactorThread::GetMetrics(ReactorMetrics* metrics) { + DCHECK(IsCurrentThread()); + metrics->num_client_connections_ = client_conns_.size(); + metrics->num_server_connections_ = server_conns_.size(); + metrics->total_client_connections_ = total_client_conns_cnt_; + metrics->total_server_connections_ = total_server_conns_cnt_; + return Status::OK(); +} + +Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, + DumpRunningRpcsResponsePB* resp) { + DCHECK(IsCurrentThread()); + for (const scoped_refptr<Connection>& conn : server_conns_) { + RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections())); + } + for (const conn_multimap_t::value_type& entry : client_conns_) { + Connection* conn = entry.second.get(); + RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections())); + } + return Status::OK(); +} + +void ReactorThread::WakeThread() { + // libev uses some lock-free synchronization, but doesn't have TSAN annotations. + // See http://lists.schmorp.de/pipermail/libev/2013q2/002178.html or KUDU-366 + // for examples. + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + async_.send(); +} + +// Handle async events. These events are sent to the reactor by other +// threads that want to bring something to our attention, like the fact that +// we're shutting down, or the fact that there is a new outbound Transfer +// ready to send. +void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) { + DCHECK(IsCurrentThread()); + + if (PREDICT_FALSE(reactor_->closing())) { + ShutdownInternal(); + loop_.break_loop(); // break the epoll loop and terminate the thread + return; + } + + boost::intrusive::list<ReactorTask> tasks; + reactor_->DrainTaskQueue(&tasks); + + while (!tasks.empty()) { + ReactorTask& task = tasks.front(); + tasks.pop_front(); + task.Run(this); + } +} + +void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) { + DCHECK(IsCurrentThread()); + + Status s = StartConnectionNegotiation(conn); + if (PREDICT_FALSE(!s.ok())) { + LOG(ERROR) << "Server connection negotiation failed: " << s.ToString(); + DestroyConnection(conn.get(), s); + return; + } + ++total_server_conns_cnt_; + server_conns_.emplace_back(std::move(conn)); +} + +void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) { + DCHECK(IsCurrentThread()); + scoped_refptr<Connection> conn; + + Status s = FindOrStartConnection(call->conn_id(), + call->controller()->credentials_policy(), + &conn); + if (PREDICT_FALSE(!s.ok())) { + call->SetFailed(s, OutboundCall::Phase::CONNECTION_NEGOTIATION); + return; + } + + conn->QueueOutboundCall(call); +} + +// +// Handles timer events. The periodic timer: +// +// 1. updates Reactor::cur_time_ +// 2. every tcp_conn_timeo_ seconds, close down connections older than +// tcp_conn_timeo_ seconds. +// +void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) { + DCHECK(IsCurrentThread()); + if (EV_ERROR & revents) { + LOG(WARNING) << "Reactor " << name() << " got an error in " + "the timer handler."; + return; + } + cur_time_ = MonoTime::Now(); + + ScanIdleConnections(); +} + +void ReactorThread::RegisterTimeout(ev::timer *watcher) { + watcher->set(loop_); +} + +void ReactorThread::ScanIdleConnections() { + DCHECK(IsCurrentThread()); + // Enforce TCP connection timeouts: server-side connections. + const auto server_conns_end = server_conns_.end(); + uint64_t timed_out = 0; + for (auto it = server_conns_.begin(); it != server_conns_end; ) { + Connection* conn = it->get(); + if (!conn->Idle()) { + VLOG(10) << "Connection " << conn->ToString() << " not idle"; + ++it; + continue; + } + + const MonoDelta connection_delta(cur_time_ - conn->last_activity_time()); + if (connection_delta <= connection_keepalive_time_) { + ++it; + continue; + } + + conn->Shutdown(Status::NetworkError( + Substitute("connection timed out after $0", connection_keepalive_time_.ToString()))); + VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for " + << connection_delta.ToString(); + ++timed_out; + it = server_conns_.erase(it); + } + + // Take care of idle client-side connections marked for shutdown. + uint64_t shutdown = 0; + for (auto it = client_conns_.begin(); it != client_conns_.end();) { + Connection* conn = it->second.get(); + if (conn->scheduled_for_shutdown() && conn->Idle()) { + conn->Shutdown(Status::NetworkError( + "connection has been marked for shutdown")); + it = client_conns_.erase(it); + ++shutdown; + } else { + ++it; + } + } + // TODO(aserbin): clients may want to set their keepalive timeout for idle + // but not scheduled for shutdown connections. + + VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections."; + VLOG_IF(1, shutdown > 0) << name() << ": shutdown " << shutdown << " TCP connections."; +} + +const std::string& ReactorThread::name() const { + return reactor_->name(); +} + +MonoTime ReactorThread::cur_time() const { + return cur_time_; +} + +Reactor *ReactorThread::reactor() { + return reactor_; +} + +bool ReactorThread::IsCurrentThread() const { + return thread_.get() == kudu::Thread::current_thread(); +} + +void ReactorThread::RunThread() { + ThreadRestrictions::SetWaitAllowed(false); + ThreadRestrictions::SetIOAllowed(false); + DVLOG(6) << "Calling ReactorThread::RunThread()..."; + loop_.run(0); + VLOG(1) << name() << " thread exiting."; + + // No longer need the messenger. This causes the messenger to + // get deleted when all the reactors exit. + reactor_->messenger_.reset(); +} + +Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id, + CredentialsPolicy cred_policy, + scoped_refptr<Connection>* conn) { + DCHECK(IsCurrentThread()); + const auto range = client_conns_.equal_range(conn_id); + scoped_refptr<Connection> found_conn; + for (auto it = range.first; it != range.second;) { + const auto& c = it->second.get(); + // * Do not use connections scheduled for shutdown to place new calls. + // + // * Do not use a connection with a non-compliant credentials policy. + // Instead, open a new one, while marking the former as scheduled for + // shutdown. This process converges: any connection that satisfies the + // PRIMARY_CREDENTIALS policy automatically satisfies the ANY_CREDENTIALS + // policy as well. The idea is to keep only one usable connection + // identified by the specified 'conn_id'. + // + // * If the test-only 'one-connection-per-RPC' mode is enabled, connections + // are re-established at every RPC call. + if (c->scheduled_for_shutdown() || + !c->SatisfiesCredentialsPolicy(cred_policy) || + PREDICT_FALSE(FLAGS_rpc_reopen_outbound_connections)) { + if (c->Idle()) { + // Shutdown idle connections to the target destination. Non-idle ones + // will be taken care of later by the idle connection scanner. + DCHECK_EQ(Connection::CLIENT, c->direction()); + c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy")); + it = client_conns_.erase(it); + continue; + } + c->set_scheduled_for_shutdown(); + } else { + DCHECK(!found_conn); + found_conn = c; + // Appropriate connection is found; continue further to take care of the + // rest of connections to mark them for shutdown if they are not + // satisfying the policy. + } + ++it; + } + if (found_conn) { + // Found matching not-to-be-shutdown connection: return it as the result. + conn->swap(found_conn); + return Status::OK(); + } + + // No connection to this remote. Need to create one. + VLOG(2) << name() << " FindOrStartConnection: creating " + << "new connection for " << conn_id.remote().ToString(); + + // Create a new socket and start connecting to the remote. + Socket sock; + RETURN_NOT_OK(CreateClientSocket(&sock)); + RETURN_NOT_OK(StartConnect(&sock, conn_id.remote())); + + unique_ptr<Socket> new_socket(new Socket(sock.Release())); + + // Register the new connection in our map. + *conn = new Connection( + this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy); + (*conn)->set_local_user_credentials(conn_id.user_credentials()); + + // Kick off blocking client connection negotiation. + Status s = StartConnectionNegotiation(*conn); + if (s.IsIllegalState()) { + // Return a nicer error message to the user indicating -- if we just + // forward the status we'd get something generic like "ThreadPool is closing". + return Status::ServiceUnavailable("Client RPC Messenger shutting down"); + } + // Propagate any other errors as-is. + RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread"); + + // Insert into the client connection map to avoid duplicate connection requests. + client_conns_.emplace(conn_id, *conn); + ++total_client_conns_cnt_; + + return Status::OK(); +} + +Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>& conn) { + DCHECK(IsCurrentThread()); + + // Set a limit on how long the server will negotiate with a new client. + MonoTime deadline = MonoTime::Now() + + MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms); + + scoped_refptr<Trace> trace(new Trace()); + ADOPT_TRACE(trace.get()); + TRACE("Submitting negotiation task for $0", conn->ToString()); + auto authentication = reactor()->messenger()->authentication(); + auto encryption = reactor()->messenger()->encryption(); + RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure( + Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline))); + return Status::OK(); +} + +void ReactorThread::CompleteConnectionNegotiation( + const scoped_refptr<Connection>& conn, + const Status& status, + unique_ptr<ErrorStatusPB> rpc_error) { + DCHECK(IsCurrentThread()); + if (PREDICT_FALSE(!status.ok())) { + DestroyConnection(conn.get(), status, std::move(rpc_error)); + return; + } + + // Switch the socket back to non-blocking mode after negotiation. + Status s = conn->SetNonBlocking(true); + if (PREDICT_FALSE(!s.ok())) { + LOG(DFATAL) << "Unable to set connection to non-blocking mode: " << s.ToString(); + DestroyConnection(conn.get(), s, std::move(rpc_error)); + return; + } + + conn->MarkNegotiationComplete(); + conn->EpollRegister(loop_); +} + +Status ReactorThread::CreateClientSocket(Socket *sock) { + Status ret = sock->Init(Socket::FLAG_NONBLOCKING); + if (ret.ok()) { + ret = sock->SetNoDelay(true); + } + LOG_IF(WARNING, !ret.ok()) + << "failed to create an outbound connection because a new socket could not be created: " + << ret.ToString(); + return ret; +} + +Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote) { + const Status ret = sock->Connect(remote); + if (ret.ok()) { + VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString(); + return Status::OK(); + } + + int posix_code = ret.posix_code(); + if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) { + VLOG(3) << "StartConnect: connect in progress for " << remote.ToString(); + return Status::OK(); + } + + LOG(WARNING) << "Failed to create an outbound connection to " << remote.ToString() + << " because connect() failed: " << ret.ToString(); + return ret; +} + +void ReactorThread::DestroyConnection(Connection *conn, + const Status& conn_status, + unique_ptr<ErrorStatusPB> rpc_error) { + DCHECK(IsCurrentThread()); + + conn->Shutdown(conn_status, std::move(rpc_error)); + + // Unlink connection from lists. + if (conn->direction() == Connection::CLIENT) { + ConnectionId conn_id(conn->remote(), conn->local_user_credentials()); + const auto range = client_conns_.equal_range(conn_id); + CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString(); + // The client_conns_ container is a multi-map. + for (auto it = range.first; it != range.second;) { + if (it->second.get() == conn) { + it = client_conns_.erase(it); + break; + } + ++it; + } + } else if (conn->direction() == Connection::SERVER) { + auto it = server_conns_.begin(); + while (it != server_conns_.end()) { + if ((*it).get() == conn) { + server_conns_.erase(it); + break; + } + ++it; + } + } +} + +DelayedTask::DelayedTask(boost::function<void(const Status&)> func, + MonoDelta when) + : func_(std::move(func)), + when_(when), + thread_(nullptr) { +} + +void DelayedTask::Run(ReactorThread* thread) { + DCHECK(thread_ == nullptr) << "Task has already been scheduled"; + DCHECK(thread->IsCurrentThread()); + + // Schedule the task to run later. + thread_ = thread; + timer_.set(thread->loop_); + timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); + timer_.start(when_.ToSeconds(), // after + 0); // repeat + thread_->scheduled_tasks_.insert(this); +} + +void DelayedTask::Abort(const Status& abort_status) { + func_(abort_status); + delete this; +} + +void DelayedTask::TimerHandler(ev::timer& watcher, int revents) { + // We will free this task's memory. + thread_->scheduled_tasks_.erase(this); + + if (EV_ERROR & revents) { + string msg = "Delayed task got an error in its timer handler"; + LOG(WARNING) << msg; + Abort(Status::Aborted(msg)); // Will delete 'this'. + } else { + func_(Status::OK()); + delete this; + } +} + +Reactor::Reactor(shared_ptr<Messenger> messenger, + int index, const MessengerBuilder& bld) + : messenger_(std::move(messenger)), + name_(StringPrintf("%s_R%03d", messenger_->name().c_str(), index)), + closing_(false), + thread_(this, bld) { +} + +Status Reactor::Init() { + DVLOG(6) << "Called Reactor::Init()"; + return thread_.Init(); +} + +void Reactor::Shutdown() { + { + std::lock_guard<LockType> l(lock_); + if (closing_) { + return; + } + closing_ = true; + } + + thread_.Shutdown(); + + // Abort all pending tasks. No new tasks can get scheduled after this + // because ScheduleReactorTask() tests the closing_ flag set above. + Status aborted = ShutdownError(true); + while (!pending_tasks_.empty()) { + ReactorTask& task = pending_tasks_.front(); + pending_tasks_.pop_front(); + task.Abort(aborted); + } +} + +Reactor::~Reactor() { + Shutdown(); +} + +const std::string& Reactor::name() const { + return name_; +} + +bool Reactor::closing() const { + std::lock_guard<LockType> l(lock_); + return closing_; +} + +// Task to call an arbitrary function within the reactor thread. +class RunFunctionTask : public ReactorTask { + public: + explicit RunFunctionTask(boost::function<Status()> f) + : function_(std::move(f)), latch_(1) {} + + void Run(ReactorThread* /*reactor*/) override { + status_ = function_(); + latch_.CountDown(); + } + void Abort(const Status& status) override { + status_ = status; + latch_.CountDown(); + } + + // Wait until the function has completed, and return the Status + // returned by the function. + Status Wait() { + latch_.Wait(); + return status_; + } + + private: + boost::function<Status()> function_; + Status status_; + CountDownLatch latch_; +}; + +Status Reactor::GetMetrics(ReactorMetrics *metrics) { + return RunOnReactorThread(boost::bind(&ReactorThread::GetMetrics, &thread_, metrics)); +} + +Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) { + RunFunctionTask task(f); + ScheduleReactorTask(&task); + return task.Wait(); +} + +Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, + DumpRunningRpcsResponsePB* resp) { + return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_, + boost::ref(req), resp)); +} + +class RegisterConnectionTask : public ReactorTask { + public: + explicit RegisterConnectionTask(scoped_refptr<Connection> conn) + : conn_(std::move(conn)) { + } + + void Run(ReactorThread* reactor) override { + reactor->RegisterConnection(std::move(conn_)); + delete this; + } + + void Abort(const Status& /*status*/) override { + // We don't need to Shutdown the connection since it was never registered. + // This is only used for inbound connections, and inbound connections will + // never have any calls added to them until they've been registered. + delete this; + } + + private: + scoped_refptr<Connection> conn_; +}; + +void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) { + VLOG(3) << name_ << ": new inbound connection to " << remote.ToString(); + unique_ptr<Socket> new_socket(new Socket(socket->Release())); + auto task = new RegisterConnectionTask( + new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER)); + ScheduleReactorTask(task); +} + +// Task which runs in the reactor thread to assign an outbound call +// to a connection. +class AssignOutboundCallTask : public ReactorTask { + public: + explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call) + : call_(std::move(call)) {} + + void Run(ReactorThread* reactor) override { + reactor->AssignOutboundCall(call_); + delete this; + } + + void Abort(const Status& status) override { + // It doesn't matter what is the actual phase of the OutboundCall: just set + // it to Phase::REMOTE_CALL to finilize the state of the call. + call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL); + delete this; + } + + private: + shared_ptr<OutboundCall> call_; +}; + +void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) { + DVLOG(3) << name_ << ": queueing outbound call " + << call->ToString() << " to remote " << call->conn_id().remote().ToString(); + ScheduleReactorTask(new AssignOutboundCallTask(call)); +} + +void Reactor::ScheduleReactorTask(ReactorTask *task) { + { + std::unique_lock<LockType> l(lock_); + if (closing_) { + // We guarantee the reactor lock is not taken when calling Abort(). + l.unlock(); + task->Abort(ShutdownError(false)); + return; + } + pending_tasks_.push_back(*task); + } + thread_.WakeThread(); +} + +bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*) + std::lock_guard<LockType> l(lock_); + if (closing_) { + return false; + } + tasks->swap(pending_tasks_); + return true; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h new file mode 100644 index 0000000..37eedd4 --- /dev/null +++ b/be/src/kudu/rpc/reactor.h @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_REACTOR_H +#define KUDU_RPC_REACTOR_H + +#include <stdint.h> + +#include <list> +#include <map> +#include <memory> +#include <set> +#include <string> + +#include <boost/function.hpp> +#include <boost/intrusive/list.hpp> +#include <boost/optional.hpp> +#include <ev++.h> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/connection.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/thread.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Socket; + +namespace rpc { + +typedef std::list<scoped_refptr<Connection>> conn_list_t; + +class DumpRunningRpcsRequestPB; +class DumpRunningRpcsResponsePB; +class Messenger; +class MessengerBuilder; +class Reactor; +enum class CredentialsPolicy; + +// Simple metrics information from within a reactor. +struct ReactorMetrics { + // Number of client RPC connections currently connected. + int32_t num_client_connections_; + // Number of server RPC connections currently connected. + int32_t num_server_connections_; + + // Total number of client RPC connections opened during Reactor's lifetime. + uint64_t total_client_connections_; + // Total number of server RPC connections opened during Reactor's lifetime. + uint64_t total_server_connections_; +}; + +// A task which can be enqueued to run on the reactor thread. +class ReactorTask : public boost::intrusive::list_base_hook<> { + public: + ReactorTask(); + + // Run the task. 'reactor' is guaranteed to be the current thread. + virtual void Run(ReactorThread *reactor) = 0; + + // Abort the task, in the case that the reactor shut down before the + // task could be processed. This may or may not run on the reactor thread + // itself. + // + // The Reactor guarantees that the Reactor lock is free when this + // method is called. + virtual void Abort(const Status &abort_status) {} + + virtual ~ReactorTask(); + + private: + DISALLOW_COPY_AND_ASSIGN(ReactorTask); +}; + +// A ReactorTask that is scheduled to run at some point in the future. +// +// Semantically it works like RunFunctionTask with a few key differences: +// 1. The user function is called during Abort. Put another way, the +// user function is _always_ invoked, even during reactor shutdown. +// 2. To differentiate between Abort and non-Abort, the user function +// receives a Status as its first argument. +class DelayedTask : public ReactorTask { + public: + DelayedTask(boost::function<void(const Status &)> func, MonoDelta when); + + // Schedules the task for running later but doesn't actually run it yet. + void Run(ReactorThread* thread) override; + + // Behaves like ReactorTask::Abort. + void Abort(const Status& abort_status) override; + + private: + // libev callback for when the registered timer fires. + void TimerHandler(ev::timer& watcher, int revents); + + // User function to invoke when timer fires or when task is aborted. + const boost::function<void(const Status&)> func_; + + // Delay to apply to this task. + const MonoDelta when_; + + // Link back to registering reactor thread. + ReactorThread* thread_; + + // libev timer. Set when Run() is invoked. + ev::timer timer_; +}; + +// A ReactorThread is a libev event handler thread which manages I/O +// on a list of sockets. +// +// All methods in this class are _only_ called from the reactor thread itself +// except where otherwise specified. New methods should DCHECK(IsCurrentThread()) +// to ensure this. +class ReactorThread { + public: + friend class Connection; + + // Client-side connection map. Multiple connections could be open to a remote + // server if multiple credential policies are used for individual RPCs. + typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>, + ConnectionIdHash, ConnectionIdEqual> + conn_multimap_t; + + ReactorThread(Reactor *reactor, const MessengerBuilder &bld); + + // This may be called from another thread. + Status Init(); + + // Add any connections on this reactor thread into the given status dump. + Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, + DumpRunningRpcsResponsePB* resp); + + // Block until the Reactor thread is shut down + // + // This must be called from another thread. + void Shutdown(); + + // This method is thread-safe. + void WakeThread(); + + // libev callback for handling async notifications in our epoll thread. + void AsyncHandler(ev::async &watcher, int revents); + + // libev callback for handling timer events in our epoll thread. + void TimerHandler(ev::timer &watcher, int revents); + + // Register an epoll timer watcher with our event loop. + // Does not set a timeout or start it. + void RegisterTimeout(ev::timer *watcher); + + // This may be called from another thread. + const std::string &name() const; + + MonoTime cur_time() const; + + // This may be called from another thread. + Reactor *reactor(); + + // Return true if this reactor thread is the thread currently + // running. Should be used in DCHECK assertions. + bool IsCurrentThread() const; + + // Begin the process of connection negotiation. + // Must be called from the reactor thread. + Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn); + + // Transition back from negotiating to processing requests. + // Must be called from the reactor thread. + void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn, + const Status& status, + std::unique_ptr<ErrorStatusPB> rpc_error); + + // Collect metrics. + // Must be called from the reactor thread. + Status GetMetrics(ReactorMetrics *metrics); + + private: + friend class AssignOutboundCallTask; + friend class RegisterConnectionTask; + friend class DelayedTask; + + // Run the main event loop of the reactor. + void RunThread(); + + // Find or create a new connection to the given remote. + // If such a connection already exists, returns that, otherwise creates a new one. + // May return a bad Status if the connect() call fails. + // The resulting connection object is managed internally by the reactor thread. + Status FindOrStartConnection(const ConnectionId& conn_id, + CredentialsPolicy cred_policy, + scoped_refptr<Connection>* conn); + + // Shut down the given connection, removing it from the connection tracking + // structures of this reactor. + // + // The connection is not explicitly deleted -- shared_ptr reference counting + // may hold on to the object after this, but callers should assume that it + // _may_ be deleted by this call. + void DestroyConnection(Connection *conn, const Status &conn_status, + std::unique_ptr<ErrorStatusPB> rpc_error = {}); + + // Scan any open connections for idle ones that have been idle longer than + // connection_keepalive_time_ + void ScanIdleConnections(); + + // Create a new client socket (non-blocking, NODELAY) + static Status CreateClientSocket(Socket *sock); + + // Initiate a new connection on the given socket. + static Status StartConnect(Socket *sock, const Sockaddr &remote); + + // Assign a new outbound call to the appropriate connection object. + // If this fails, the call is marked failed and completed. + void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call); + + // Register a new connection. + void RegisterConnection(scoped_refptr<Connection> conn); + + // Actually perform shutdown of the thread, tearing down any connections, + // etc. This is called from within the thread. + void ShutdownInternal(); + + scoped_refptr<kudu::Thread> thread_; + + // our epoll object (or kqueue, etc). + ev::dynamic_loop loop_; + + // Used by other threads to notify the reactor thread + ev::async async_; + + // Handles the periodic timer. + ev::timer timer_; + + // Scheduled (but not yet run) delayed tasks. + // + // Each task owns its own memory and must be freed by its TaskRun and + // Abort members, provided it was allocated on the heap. + std::set<DelayedTask*> scheduled_tasks_; + + // The current monotonic time. Updated every coarse_timer_granularity_secs_. + MonoTime cur_time_; + + // last time we did TCP timeouts. + MonoTime last_unused_tcp_scan_; + + // Map of sockaddrs to Connection objects for outbound (client) connections. + conn_multimap_t client_conns_; + + // List of current connections coming into the server. + conn_list_t server_conns_; + + Reactor *reactor_; + + // If a connection has been idle for this much time, it is torn down. + const MonoDelta connection_keepalive_time_; + + // Scan for idle connections on this granularity. + const MonoDelta coarse_timer_granularity_; + + // Total number of client connections opened during Reactor's lifetime. + uint64_t total_client_conns_cnt_; + + // Total number of server connections opened during Reactor's lifetime. + uint64_t total_server_conns_cnt_; +}; + +// A Reactor manages a ReactorThread +class Reactor { + public: + Reactor(std::shared_ptr<Messenger> messenger, + int index, + const MessengerBuilder &bld); + Status Init(); + + // Block until the Reactor is shut down + void Shutdown(); + + ~Reactor(); + + const std::string &name() const; + + // Collect metrics about the reactor. + Status GetMetrics(ReactorMetrics *metrics); + + // Add any connections on this reactor thread into the given status dump. + Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req, + DumpRunningRpcsResponsePB* resp); + + // Queue a new incoming connection. Takes ownership of the underlying fd from + // 'socket', but not the Socket object itself. + // If the reactor is already shut down, takes care of closing the socket. + void RegisterInboundSocket(Socket *socket, const Sockaddr &remote); + + // Queue a new call to be sent. If the reactor is already shut down, marks + // the call as failed. + void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call); + + // Schedule the given task's Run() method to be called on the + // reactor thread. + // If the reactor shuts down before it is run, the Abort method will be + // called. + // Does _not_ take ownership of 'task' -- the task should take care of + // deleting itself after running if it is allocated on the heap. + void ScheduleReactorTask(ReactorTask *task); + + Status RunOnReactorThread(const boost::function<Status()>& f); + + // If the Reactor is closing, returns false. + // Otherwise, drains the pending_tasks_ queue into the provided list. + bool DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks); + + Messenger *messenger() const { + return messenger_.get(); + } + + // Indicates whether the reactor is shutting down. + // + // This method is thread-safe. + bool closing() const; + + // Is this reactor's thread the current thread? + bool IsCurrentThread() const { + return thread_.IsCurrentThread(); + } + + private: + friend class ReactorThread; + typedef simple_spinlock LockType; + mutable LockType lock_; + + // parent messenger + std::shared_ptr<Messenger> messenger_; + + const std::string name_; + + // Whether the reactor is shutting down. + // Guarded by lock_. + bool closing_; + + // Tasks to be run within the reactor thread. + // Guarded by lock_. + boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use) + + ReactorThread thread_; + + DISALLOW_COPY_AND_ASSIGN(Reactor); +}; + +} // namespace rpc +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_method.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/remote_method.cc b/be/src/kudu/rpc/remote_method.cc new file mode 100644 index 0000000..32ec40d --- /dev/null +++ b/be/src/kudu/rpc/remote_method.cc @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/remote_method.h" +#include "kudu/rpc/rpc_header.pb.h" + +namespace kudu { +namespace rpc { + +using strings::Substitute; + +RemoteMethod::RemoteMethod(std::string service_name, + const std::string method_name) + : service_name_(std::move(service_name)), method_name_(method_name) {} + +void RemoteMethod::FromPB(const RemoteMethodPB& pb) { + DCHECK(pb.IsInitialized()) << "PB is uninitialized: " << pb.InitializationErrorString(); + service_name_ = pb.service_name(); + method_name_ = pb.method_name(); +} + +void RemoteMethod::ToPB(RemoteMethodPB* pb) const { + pb->set_service_name(service_name_); + pb->set_method_name(method_name_); +} + +string RemoteMethod::ToString() const { + return Substitute("$0.$1", service_name_, method_name_); +} + +} // namespace rpc +} // namespace kudu
