http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_controller.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_controller.h b/be/src/kudu/rpc/rpc_controller.h new file mode 100644 index 0000000..ab611a8 --- /dev/null +++ b/be/src/kudu/rpc/rpc_controller.h @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_RPC_CONTROLLER_H +#define KUDU_RPC_RPC_CONTROLLER_H + +#include <memory> +#include <unordered_set> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/util/locks.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace kudu { + +namespace rpc { + +class ErrorStatusPB; +class OutboundCall; +class RequestIdPB; +class RpcSidecar; + +// Authentication credentials policy for outbound RPCs. Some RPC methods +// (e.g. MasterService::ConnectToMaster) behave differently depending on the +// type of credentials used for authentication when establishing the connection. +// The client expecting some particular results from the call should specify +// the required policy on a per-call basis using RpcController. By default, +// RpcController uses ANY_CREDENTIALS. +enum class CredentialsPolicy { + // It's acceptable to use authentication credentials of any type, primary or + // secondary ones. + ANY_CREDENTIALS, + + // Only primary credentials are acceptable. Primary credentials are Kerberos + // tickets, TLS certificate. Secondary credentials are authentication tokens: + // they are 'derived' in the sense that it's possible to acquire them using + // 'primary' credentials. + PRIMARY_CREDENTIALS, +}; + +// Controller for managing properties of a single RPC call, on the client side. +// +// An RpcController maps to exactly one call and is not thread-safe. The client +// may use this class prior to sending an RPC in order to set properties such +// as the call's timeout. +// +// After the call has been sent (e.g using Proxy::AsyncRequest()) the user +// may invoke methods on the RpcController object in order to probe the status +// of the call. +class RpcController { + public: + RpcController(); + ~RpcController(); + + // Swap the state of the controller (including ownership of sidecars, buffers, + // etc) with another one. + void Swap(RpcController* other); + + // Reset this controller so it may be used with another call. + // Note that this resets the required server features. + void Reset(); + + // Return true if the call has finished. + // A call is finished if the server has responded, or if the call + // has timed out. + bool finished() const; + + // Whether the call failed due to connection negotiation error. + bool negotiation_failed() const; + + // Return the current status of a call. + // + // A call is "OK" status until it finishes, at which point it may + // either remain in "OK" status (if the call was successful), or + // change to an error status. Error status indicates that there was + // some RPC-layer issue with making the call, for example, one of: + // + // * failed to establish a connection to the server + // * the server was too busy to handle the request + // * the server was unable to interpret the request (eg due to a version + // mismatch) + // * a network error occurred which caused the connection to be torn + // down + // * the call timed out + Status status() const; + + // If status() returns a RemoteError object, then this function returns + // the error response provided by the server. Service implementors may + // use protobuf Extensions to add application-specific data to this PB. + // + // If Status was not a RemoteError, this returns NULL. + // The returned pointer is only valid as long as the controller object. + const ErrorStatusPB* error_response() const; + + // Set the timeout for the call to be made with this RPC controller. + // + // The configured timeout applies to the entire time period between + // the AsyncRequest() method call and getting a response. For example, + // if it takes too long to establish a connection to the remote host, + // or to DNS-resolve the remote host, those will be accounted as part + // of the timeout period. + // + // Timeouts must be set prior to making the request -- the timeout may + // not currently be adjusted for an already-sent call. + // + // Using an uninitialized timeout will result in a call which never + // times out (not recommended!) + void set_timeout(const MonoDelta& timeout); + + // Like a timeout, but based on a fixed point in time instead of a delta. + // + // Using an uninitialized deadline means the call won't time out. + void set_deadline(const MonoTime& deadline); + + // Allows setting the request id for the next request sent to the server. + // A request id allows the server to identify each request sent by the client uniquely, + // in some cases even when sent to multiple servers, enabling exactly once semantics. + void SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id); + + // Returns whether a request id has been set on RPC header. + bool has_request_id() const; + + // Returns the currently set request id. + // When the request is sent to the server, it gets "moved" from RpcController + // so an absence of a request after send doesn't mean one wasn't sent. + // REQUIRES: the controller has a request ID set. + const RequestIdPB& request_id() const; + + // Add a requirement that the server side must support a feature with the + // given identifier. The set of required features is sent to the server + // with the RPC call, and if any required feature is not supported, the + // call will fail with a NotSupported() status. + // + // This can be used when an RPC call changes in a way that is protobuf-compatible, + // but for which it would not be appropriate for the server to simply ignore + // an added field. For example, consider an API call like: + // + // message DeleteAccount { + // optional string username = 1; + // optional bool dry_run = 2; // ADDED LATER! + // } + // + // In this case, if a new client which supports the 'dry_run' flag sends the RPC + // to an old server, the old server will simply ignore the unrecognized parameter, + // with highly problematic results. To solve this problem, the new version can + // add a feature flag: + // + // In .proto file + // ---------------- + // enum MyFeatureFlags { + // UNKNOWN = 0; + // DELETE_ACCOUNT_SUPPORTS_DRY_RUN = 1; + // } + // + // In client code: + // --------------- + // if (dry_run) { + // rpc.RequireServerFeature(DELETE_ACCOUNT_SUPPORTS_DRY_RUN); + // req.set_dry_run(true); + // } + // + // This has the effect of (a) maintaining compatibility when dry_run is not specified + // and (b) rejecting the RPC with a "NotSupported" error when it is. + // + // NOTE: 'feature' is an int rather than an enum type because each service + // must define its own enum of supported features, and protobuf doesn't support + // any ability to 'extend' enum types. Implementers should define an enum in the + // service's protobuf definition as shown above. + void RequireServerFeature(uint32_t feature); + + // Executes the provided function with a reference to the required server + // features. + const std::unordered_set<uint32_t>& required_server_features() const { + return required_server_features_; + } + + // Return the configured timeout. + MonoDelta timeout() const; + + CredentialsPolicy credentials_policy() const { + return credentials_policy_; + } + + void set_credentials_policy(CredentialsPolicy policy) { + credentials_policy_ = policy; + } + + // Fills the 'sidecar' parameter with the slice pointing to the i-th + // sidecar upon success. + // + // Should only be called if the call's finished, but the controller has not + // been Reset(). + // + // May fail if index is invalid. + Status GetInboundSidecar(int idx, Slice* sidecar) const; + + // Adds a sidecar to the outbound request. The index of the sidecar is written to + // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added + // to this request. + Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx); + + private: + friend class OutboundCall; + friend class Proxy; + + // Set the outbound call_'s request parameter, and transfer ownership of + // outbound_sidecars_ to call_ in preparation for serialization. + void SetRequestParam(const google::protobuf::Message& req); + + MonoDelta timeout_; + std::unordered_set<uint32_t> required_server_features_; + + // RPC authentication policy for outbound calls. + CredentialsPolicy credentials_policy_; + + mutable simple_spinlock lock_; + + // The id of this request. + // Ownership is transferred to OutboundCall once the call is sent. + std::unique_ptr<RequestIdPB> request_id_; + + // Once the call is sent, it is tracked here. + std::shared_ptr<OutboundCall> call_; + + std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_; + + DISALLOW_COPY_AND_ASSIGN(RpcController); +}; + +} // namespace rpc +} // namespace kudu +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_header.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_header.proto b/be/src/kudu/rpc/rpc_header.proto new file mode 100644 index 0000000..1d55b6a --- /dev/null +++ b/be/src/kudu/rpc/rpc_header.proto @@ -0,0 +1,365 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; + +option optimize_for = SPEED; + +package kudu.rpc; + +option java_package = "org.apache.kudu.rpc"; + +import "google/protobuf/descriptor.proto"; +import "kudu/security/token.proto"; +import "kudu/util/pb_util.proto"; + +// The Kudu RPC protocol is similar to the RPC protocol of Hadoop and HBase. +// See the following for reference on those other protocols: +// - https://issues.apache.org/jira/browse/HBASE-7898 +// - https://issues.apache.org/jira/browse/HADOOP-8990 +// +// For a description of the Kudu protocol, see 'README' in this directory. + +// User Information proto. Included in ConnectionContextPB on connection setup. +message UserInformationPB { + optional string effective_user = 1; + required string real_user = 2; +} + +// The connection context is sent as part of the connection establishment. +// It establishes the context for ALL RPC calls within the connection. +// This is sent on connection setup after the connection preamble is sent +// and SASL has been negotiated. +// No response is sent from the server to the client. +message ConnectionContextPB { + // UserInfo beyond what is determined as part of security handshake + // at connection time (kerberos, tokens etc). + // + // DEPRECATED: No longer used in Kudu 1.1 and later. + // The 'real_user' should be taken from the SASL negotiation. + // Impersonation (effective user) was never supported, so we'll have + // to add that back at some point later. + optional UserInformationPB DEPRECATED_user_info = 2; + + // If the server sends a nonce to the client during the SASL_SUCCESS + // negotiation step, the client is required to encode it with SASL integrity + // protection and return it in this field. The nonce protects the server + // against a Kerberos replay attack. + optional bytes encoded_nonce = 3 [(REDACT) = true]; +} + +// Features supported by the RPC system itself. +// +// Note that this should be used to evolve the RPC _system_, not the semantics +// or compatibility of individual calls. +// +// For example, if we were to add a feature like call or response wire +// compression in the future, we could add a flag here to indicate that the +// client or server supports that feature. Optional features which may safely be +// ignored by the receiver do not need a feature flag, instead the optional +// field feature of ProtoBuf may be utilized. +enum RpcFeatureFlag { + UNKNOWN = 0; + + // The RPC system is required to support application feature flags in the + // request and response headers. + APPLICATION_FEATURE_FLAGS = 1; + + // The RPC system supports TLS protected connections. If both sides support + // this flag, the connection will automatically be wrapped in a TLS protected + // channel following a TLS handshake. + TLS = 2; + + // If both sides advertise TLS_AUTHENTICATION_ONLY, this means that they + // agree that, after handshaking TLS, they will *not* wrap the connection + // in a TLS-protected channel. Instead, they will use TLS only for its + // handshake-based authentication. + // + // This is currently used for loopback connections only, so that compute + // frameworks which schedule for locality don't pay encryption overhead. + TLS_AUTHENTICATION_ONLY = 3; +}; + +// An authentication type. This is modeled as a oneof in case any of these +// authentication types, or any authentication types in the future, need to add +// extra type-specific parameters during negotiation. +message AuthenticationTypePB { + message Sasl {}; + message Token {}; + message Certificate {}; + + oneof type { + // The server and client mutually authenticate via SASL. + Sasl sasl = 1; + + // The server authenticates the client via a signed token, and the client + // authenticates the server by verifying its certificate has been signed by + // a trusted CA. + // + // Token authentication requires the connection to be TLS encrypted. + Token token = 2; + + // The server and client mutually authenticate by certificate. + // + // Certificate authentication requires the connection to be TLS encrypted. + Certificate certificate = 3; + } +} + +// Message type passed back & forth for the SASL negotiation. +message NegotiatePB { + enum NegotiateStep { + UNKNOWN = 999; + NEGOTIATE = 1; + SASL_SUCCESS = 0; + SASL_INITIATE = 2; + SASL_CHALLENGE = 3; + SASL_RESPONSE = 4; + TLS_HANDSHAKE = 5; + TOKEN_EXCHANGE = 6; + } + + message SaslMechanism { + // The SASL mechanism, i.e. 'PLAIN' or 'GSSAPI'. + required string mechanism = 2; + + // Deprecated: no longer used. + // optional string method = 1; + // optional bytes challenge = 5 [(REDACT) = true]; + } + + // When the client sends its NEGOTIATE step message, it sends its set of + // supported RPC system features. In the response to this message, the server + // sends back its own. This allows the two peers to agree on whether newer + // extensions of the RPC system may be used on this connection. We use a list + // of features rather than a simple version number to make it easier for the + // Java and C++ clients to implement features in different orders while still + // maintaining compatibility, as well as to simplify backporting of features + // out-of-order. + repeated RpcFeatureFlag supported_features = 1; + + // The current negotiation step. + required NegotiateStep step = 2; + + // The SASL token, containing either the challenge during the SASL_CHALLENGE + // step, or the response during the SASL_RESPONSE step. + optional bytes token = 3 [(REDACT) = true]; + + // During the TLS_HANDSHAKE step, contains the TLS handshake message. + optional bytes tls_handshake = 5 [(REDACT) = true]; + + // The tls-server-end-point channel bindings as specified in RFC 5929. Sent + // from the server to the client during the SASL_SUCCESS step when the + // Kerberos (GSSAPI) SASL mechanism is used with TLS, in order to bind the + // Kerberos authenticated channel to the TLS channel. The value is integrity + // protected through SASL. The client is responsible for validating that the + // value matches the expected value. + optional bytes channel_bindings = 6 [(REDACT) = true]; + + // A random nonce sent from the server to the client during the SASL_SUCCESS + // step when the Kerberos (GSSAPI) SASL mechanism is used with TLS. The nonce + // must be sent back to the server, wrapped in SASL integrity protection, as + // part of the connection context. + optional bytes nonce = 9 [(REDACT) = true]; + + // During the NEGOTIATE step, contains the supported SASL mechanisms. + // During the SASL_INITIATE step, contains the single chosen SASL mechanism. + repeated SaslMechanism sasl_mechanisms = 4; + + // During the client to server NEGOTIATE step, contains the supported authentication types. + // During the server to client NEGOTIATE step, contains the chosen authentication type. + repeated AuthenticationTypePB authn_types = 7; + + // During the TOKEN_EXCHANGE step, contains the client's signed authentication token. + optional security.SignedTokenPB authn_token = 8; +} + +message RemoteMethodPB { + // Service name for the RPC layer. + // The client created a proxy with this service name. + // Example: kudu.rpc_test.CalculatorService + required string service_name = 1; + + // Name of the RPC method. + required string method_name = 2; +}; + +// The Id of a retriable RPC, whose results should be tracked on the server (see result_tracker.h). +// This also includes some information that is useful for execution/garbage collection. +message RequestIdPB { + // The (globally unique) id of the client performing this RPC. + required string client_id = 1; + + // The (per-client unique) sequence number of this RPC. + required int64 seq_no = 2; + + // The sequence number of the first RPC that has not been marked as completed by the client. + // Unset if there isn't an incomplete RPC. + required int64 first_incomplete_seq_no = 3; + + // The number of times this RPC has been tried. + // Set to 1 in the first attempt. + required int64 attempt_no = 4; +} + +// The header for the RPC request frame. +message RequestHeader { + // A sequence number that uniquely identifies a call to a single remote server. This number is + // sent back in the Response and allows to match it to the original Request. + // Hadoop specifies a uint32 and casts it to a signed int. That is counterintuitive, so we use an + // int32 instead. Allowed values (inherited from Hadoop): + // 0 through INT32_MAX: Regular RPC call IDs. + // -2: Invalid call ID. + // -3: Connection context call ID. + // -33: SASL negotiation call ID. + // + // NOTE: these calls must be increasing but may have gaps. + required int32 call_id = 3; + + // RPC method being invoked. + // Not used for "connection setup" calls. + optional RemoteMethodPB remote_method = 6; + + // Propagate the timeout as specified by the user. Note that, since there is some + // transit time between the client and server, if you wait exactly this amount of + // time and then respond, you are likely to cause a timeout on the client. + optional uint32 timeout_millis = 10; + + // Feature flags that the service must support in order to properly interpret this + // request. The client can pass any set of flags, and if the server doesn't + // support any of them, then it will fail the request. + // + // NOTE: these are for evolving features at the level of the application, not + // the RPC framework. Hence, we have to use a generic int type rather than a + // particular enum. + // NOTE: the server will only interpret this field if it supports the + // APPLICATION_FEATURE_FLAGS flag. + repeated uint32 required_feature_flags = 11; + + // The unique id of this request, if it's retriable and if the results are to be tracked. + // The request id is unique per logical request, i.e. retries of the same RPC must have the + // same request id. + // Note that this is different from 'call_id' in that a call_id is unique to a server while a + // request_id is unique to a logical request (i.e. the request_id remains the same when a request + // is retried on a different server). + // Optional for requests that are naturally idempotent or to maintain compatibility with + // older clients for requests that are not. + optional RequestIdPB request_id = 15; + + // Byte offsets for side cars in the main body of the request message. + // These offsets are counted AFTER the message header, i.e., offset 0 + // is the first byte after the bytes for this protobuf. + repeated uint32 sidecar_offsets = 16; +} + +message ResponseHeader { + required int32 call_id = 1; + + // If this is set, then this is an error response and the + // response message will be of type ErrorStatusPB instead of + // the expected response type. + optional bool is_error = 2 [ default = false ]; + + // Byte offsets for side cars in the main body of the response message. + // These offsets are counted AFTER the message header, i.e., offset 0 + // is the first byte after the bytes for this protobuf. + repeated uint32 sidecar_offsets = 3; +} + +// Sent as response when is_error == true. +message ErrorStatusPB { + + // These codes have all been inherited from Hadoop's RPC mechanism. + enum RpcErrorCodePB { + FATAL_UNKNOWN = 10; + + // Non-fatal RPC errors. Connection should be left open for future RPC calls. + //------------------------------------------------------------ + // The application generated an error status. See the message field for + // more details. + ERROR_APPLICATION = 1; + + // The specified method was not valid. + ERROR_NO_SUCH_METHOD = 2; + + // The specified service was not valid. + ERROR_NO_SUCH_SERVICE = 3; + + // The server is overloaded - the client should try again shortly. + ERROR_SERVER_TOO_BUSY = 4; + + // The request parameter was not parseable, was missing required fields, + // or the server does not support the required feature flags. + ERROR_INVALID_REQUEST = 5; + + // The server might have previously received this request but its response is no + // longer cached. It's unknown whether the request was executed or not. + ERROR_REQUEST_STALE = 6; + + // The server is not able to complete the connection or request at this + // time. The client may try again later. + ERROR_UNAVAILABLE = 7; + + // FATAL_* errors indicate that the client should shut down the connection. + //------------------------------------------------------------ + // The RPC server is already shutting down. + FATAL_SERVER_SHUTTING_DOWN = 11; + // Fields of RpcHeader are invalid. + FATAL_INVALID_RPC_HEADER = 12; + // Could not deserialize RPC request. + FATAL_DESERIALIZING_REQUEST = 13; + // IPC Layer version mismatch. + FATAL_VERSION_MISMATCH = 14; + // Auth failed. + FATAL_UNAUTHORIZED = 15; + + // The authentication token is invalid or expired; + // the client should obtain a new one. + FATAL_INVALID_AUTHENTICATION_TOKEN = 16; + } + + required string message = 1; + + // TODO: Make code required? + optional RpcErrorCodePB code = 2; // Specific error identifier. + + // If the request is failed due to an unsupported feature flag, the particular + // flag(s) that were not supported will be sent back to the client. + repeated uint32 unsupported_feature_flags = 3; + + // Allow extensions. When the RPC returns ERROR_APPLICATION, the server + // should also fill in exactly one of these extension fields, which contains + // more details on the service-specific error. + extensions 100 to max; +} + +extend google.protobuf.MethodOptions { + // An option for RPC methods that allows to set whether that method's + // RPC results should be tracked with a ResultTracker. + optional bool track_rpc_result = 50006 [default=false]; + + // An option to set the authorization method for this particular + // RPC method. If this is not specified, the service's 'default_authz_method' + // is used. + optional string authz_method = 50007; +} + +extend google.protobuf.ServiceOptions { + // Set the default authorization method for the RPCs in this service. + // If this is not set, then the default authorization is to allow all + // RPCs. + optional string default_authz_method = 50007; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_introspection.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto new file mode 100644 index 0000000..9d2f9b5 --- /dev/null +++ b/be/src/kudu/rpc/rpc_introspection.proto @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Protobuf used for introspection of RPC services (eg listing in-flight RPCs, +// reflection, etc) +syntax = "proto2"; + +package kudu.rpc; + +option java_package = "org.apache.kudu"; + +import "kudu/rpc/rpc_header.proto"; + +message RpcCallInProgressPB { + required RequestHeader header = 1; + optional string trace_buffer = 2; + optional uint64 micros_elapsed = 3; + + enum State { + UNKNOWN = 999; + + // States for OutboundCall + ON_OUTBOUND_QUEUE = 1; + SENDING = 2; + SENT = 3; + TIMED_OUT = 4; + FINISHED_ERROR = 5; + FINISHED_SUCCESS = 6; + NEGOTIATION_TIMED_OUT = 7; + FINISHED_NEGOTIATION_ERROR = 8; + + // TODO(todd): add states for InboundCall + } + + optional State state = 4; +} + +message RpcConnectionPB { + enum StateType { + UNKNOWN = 999; + NEGOTIATING = 0; // Connection is still being negotiated. + OPEN = 1; // Connection is active. + }; + + required string remote_ip = 1; + required StateType state = 2; + // TODO: swap out for separate fields + optional string remote_user_credentials = 3; + repeated RpcCallInProgressPB calls_in_flight = 4; +} + +message DumpRunningRpcsRequestPB { + optional bool include_traces = 1 [ default = false ]; +} + +message DumpRunningRpcsResponsePB { + repeated RpcConnectionPB inbound_connections = 1; + repeated RpcConnectionPB outbound_connections = 2; +} + +//------------------------------------------------------------ + +// A particular TraceMetric key/value pair from a sampled RPC. +message TraceMetricPB { + // A '.'-separated path through the parent-child trace hierarchy. + optional string child_path = 1; + optional string key = 2; + optional int64 value = 3; +} + +// A single sampled RPC call. +message RpczSamplePB { + // The original request header. + optional RequestHeader header = 1; + // The stringified request trace. + optional string trace = 2; + // The number of millis that this call took to complete. + optional int32 duration_ms = 3; + // The metrics from the sampled trace. + repeated TraceMetricPB metrics = 4; +} + +// A set of samples for a particular RPC method. +message RpczMethodPB { + required string method_name = 1; + repeated RpczSamplePB samples = 2; +} + +// Request and response for dumping previously sampled RPC calls. +message DumpRpczStoreRequestPB { +} +message DumpRpczStoreResponsePB { + repeated RpczMethodPB methods = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_service.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_service.h b/be/src/kudu/rpc/rpc_service.h new file mode 100644 index 0000000..dcaa9c1 --- /dev/null +++ b/be/src/kudu/rpc/rpc_service.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_SERVICE_H_ +#define KUDU_RPC_SERVICE_H_ + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace rpc { + +class RemoteMethod; +struct RpcMethodInfo; +class InboundCall; + +class RpcService : public RefCountedThreadSafe<RpcService> { + public: + virtual ~RpcService() {} + + // Enqueue a call for processing. + // On failure, the RpcService::QueueInboundCall() implementation is + // responsible for responding to the client with a failure message. + virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) = 0; + + virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) { + return nullptr; + } +}; + +} // namespace rpc +} // namespace kudu + +#endif // KUDU_RPC_SERVICE_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_sidecar.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_sidecar.cc b/be/src/kudu/rpc/rpc_sidecar.cc new file mode 100644 index 0000000..580c6eb --- /dev/null +++ b/be/src/kudu/rpc/rpc_sidecar.cc @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpc_sidecar.h" + +#include "kudu/util/status.h" +#include "kudu/rpc/transfer.h" +#include "kudu/gutil/strings/substitute.h" + +using std::unique_ptr; + +namespace kudu { +namespace rpc { + +// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not +// owned by this class, and it's the caller's responsibility to ensure it has a lifetime +// at least as long as this sidecar. +class SliceSidecar : public RpcSidecar { + public: + explicit SliceSidecar(Slice slice) : slice_(slice) { } + Slice AsSlice() const override { return slice_; } + + private: + const Slice slice_; +}; + +class FaststringSidecar : public RpcSidecar { + public: + explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { } + Slice AsSlice() const override { return *data_; } + + private: + const unique_ptr<faststring> data_; +}; + +unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) { + return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data))); +} + +unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) { + return unique_ptr<RpcSidecar>(new SliceSidecar(slice)); +} + + +Status RpcSidecar::ParseSidecars( + const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets, + Slice buffer, Slice* sidecars) { + if (offsets.size() == 0) return Status::OK(); + + int last = offsets.size() - 1; + if (last >= TransferLimits::kMaxSidecars) { + return Status::Corruption(strings::Substitute( + "Received $0 additional payload slices, expected at most %d", + last, TransferLimits::kMaxSidecars)); + } + + for (int i = 0; i < last; ++i) { + int64_t cur_offset = offsets.Get(i); + int64_t next_offset = offsets.Get(i + 1); + if (next_offset > buffer.size()) { + return Status::Corruption(strings::Substitute( + "Invalid sidecar offsets; sidecar $0 apparently starts at $1," + " has length $2, but the entire message has length $3", + i, cur_offset, (next_offset - cur_offset), buffer.size())); + } + if (next_offset < cur_offset) { + return Status::Corruption(strings::Substitute( + "Invalid sidecar offsets; sidecar $0 apparently starts at $1," + " but ends before that at offset $1.", i, cur_offset, next_offset)); + } + + sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset); + } + + int64_t cur_offset = offsets.Get(last); + if (cur_offset > buffer.size()) { + return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 " + "starts at offset $1after message ends (message length $2).", last, + cur_offset, buffer.size())); + } + sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset); + + return Status::OK(); +} + + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_sidecar.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_sidecar.h b/be/src/kudu/rpc/rpc_sidecar.h new file mode 100644 index 0000000..00d6e4b --- /dev/null +++ b/be/src/kudu/rpc/rpc_sidecar.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_RPC_SIDECAR_H +#define KUDU_RPC_RPC_SIDECAR_H + +#include <google/protobuf/repeated_field.h> +#include <memory> + +#include "kudu/util/faststring.h" +#include "kudu/util/slice.h" + +namespace kudu { +namespace rpc { + +// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data +// without extra copies. In other words, whenever a protobuf would have a large field +// where additional copies become expensive, one may opt instead to use an RpcSidecar. +// +// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and +// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the +// client and server respectively. They are ignorant of the sidecar's format, requiring +// only that it can be represented as a Slice. Data is copied from the Slice returned from +// AsSlice() to the socket that is responding to the original RPC. The slice should remain +// valid for as long as the call it is attached to takes to complete. +// +// In order to distinguish between separate sidecars, whenever a sidecar is added to the +// RPC response on the server side, an index for that sidecar is returned. This index must +// then in some way (i.e., via protobuf) be communicated to the recipient. +// +// After reconstructing the array of sidecars, servers and clients may retrieve the +// sidecar data through the RpcContext or RpcController interfaces respectively. +class RpcSidecar { + public: + static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data); + static std::unique_ptr<RpcSidecar> FromSlice(Slice slice); + + // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and + // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and + // will be filled from index 0. + // TODO(henryr): Consider a vector instead here if there's no perf. impact. + static Status ParseSidecars( + const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets, + Slice buffer, Slice* sidecars); + + // Returns a Slice representation of the sidecar's data. + virtual Slice AsSlice() const = 0; + virtual ~RpcSidecar() { } +}; + +} // namespace rpc +} // namespace kudu + + +#endif /* KUDU_RPC_RPC_SIDECAR_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc_stub-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc_stub-test.cc b/be/src/kudu/rpc/rpc_stub-test.cc new file mode 100644 index 0000000..2fe0708 --- /dev/null +++ b/be/src/kudu/rpc/rpc_stub-test.cc @@ -0,0 +1,679 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <atomic> +#include <memory> +#include <thread> +#include <vector> + +#include <glog/logging.h> +#include <glog/stl_logging.h> +#include <gtest/gtest.h> +#include <boost/bind.hpp> + +#include "kudu/gutil/stl_util.h" +#include "kudu/rpc/rpc_introspection.pb.h" +#include "kudu/rpc/rpcz_store.h" +#include "kudu/rpc/rtest.proxy.h" +#include "kudu/rpc/rtest.service.h" +#include "kudu/rpc/rpc-test-base.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/metrics.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/subprocess.h" +#include "kudu/util/test_util.h" +#include "kudu/util/user.h" + +DEFINE_bool(is_panic_test_child, false, "Used by TestRpcPanic"); +DECLARE_bool(socket_inject_short_recvs); + +using std::shared_ptr; +using std::unique_ptr; +using std::vector; + +namespace kudu { +namespace rpc { + +class RpcStubTest : public RpcTestBase { + public: + virtual void SetUp() OVERRIDE { + RpcTestBase::SetUp(); + // Use a shorter queue length since some tests below need to start enough + // threads to saturate the queue. + service_queue_length_ = 10; + StartTestServerWithGeneratedCode(&server_addr_); + client_messenger_ = CreateMessenger("Client"); + } + protected: + void SendSimpleCall() { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + RpcController controller; + AddRequestPB req; + req.set_x(10); + req.set_y(20); + AddResponsePB resp; + ASSERT_OK(p.Add(req, &resp, &controller)); + ASSERT_EQ(30, resp.result()); + } + + Sockaddr server_addr_; + shared_ptr<Messenger> client_messenger_; +}; + +TEST_F(RpcStubTest, TestSimpleCall) { + SendSimpleCall(); +} + +// Regression test for a bug in which we would not properly parse a call +// response when recv() returned a 'short read'. This injects such short +// reads and then makes a number of calls. +TEST_F(RpcStubTest, TestShortRecvs) { + FLAGS_socket_inject_short_recvs = true; + CalculatorServiceProxy p(client_messenger_, server_addr_); + + for (int i = 0; i < 100; i++) { + NO_FATALS(SendSimpleCall()); + } +} + +// Test calls which are rather large. +// This test sends many of them at once using the async API and then +// waits for them all to return. This is meant to ensure that the +// IO threads can deal with read/write calls that don't succeed +// in sending the entire data in one go. +TEST_F(RpcStubTest, TestBigCallData) { + const int kNumSentAtOnce = 20; + const size_t kMessageSize = 5 * 1024 * 1024; + string data; + data.resize(kMessageSize); + + CalculatorServiceProxy p(client_messenger_, server_addr_); + + EchoRequestPB req; + req.set_data(data); + + vector<unique_ptr<EchoResponsePB>> resps; + vector<unique_ptr<RpcController>> controllers; + + CountDownLatch latch(kNumSentAtOnce); + for (int i = 0; i < kNumSentAtOnce; i++) { + resps.emplace_back(new EchoResponsePB); + controllers.emplace_back(new RpcController); + + p.EchoAsync(req, resps.back().get(), controllers.back().get(), + boost::bind(&CountDownLatch::CountDown, boost::ref(latch))); + } + + latch.Wait(); + + for (const auto& c : controllers) { + ASSERT_OK(c->status()); + } +} + +TEST_F(RpcStubTest, TestRespondDeferred) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + RpcController controller; + SleepRequestPB req; + req.set_sleep_micros(1000); + req.set_deferred(true); + SleepResponsePB resp; + ASSERT_OK(p.Sleep(req, &resp, &controller)); +} + +// Test that the default user credentials are propagated to the server. +TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + string expected; + ASSERT_OK(GetLoggedInUser(&expected)); + + RpcController controller; + WhoAmIRequestPB req; + WhoAmIResponsePB resp; + ASSERT_OK(p.WhoAmI(req, &resp, &controller)); + ASSERT_EQ(expected, resp.credentials().real_user()); + ASSERT_FALSE(resp.credentials().has_effective_user()); +} + +// Test that the user can specify other credentials. +TEST_F(RpcStubTest, TestCustomCredentialsPropagated) { + const char* const kFakeUserName = "some fake user"; + CalculatorServiceProxy p(client_messenger_, server_addr_); + + UserCredentials creds; + creds.set_real_user(kFakeUserName); + p.set_user_credentials(creds); + + RpcController controller; + WhoAmIRequestPB req; + WhoAmIResponsePB resp; + ASSERT_OK(p.WhoAmI(req, &resp, &controller)); + ASSERT_EQ(kFakeUserName, resp.credentials().real_user()); + ASSERT_FALSE(resp.credentials().has_effective_user()); +} + +TEST_F(RpcStubTest, TestAuthorization) { + // First test calling WhoAmI() as user "alice", who is disallowed. + { + CalculatorServiceProxy p(client_messenger_, server_addr_); + UserCredentials creds; + creds.set_real_user("alice"); + p.set_user_credentials(creds); + + // Alice is disallowed by all RPCs. + RpcController controller; + WhoAmIRequestPB req; + WhoAmIResponsePB resp; + Status s = p.WhoAmI(req, &resp, &controller); + ASSERT_FALSE(s.ok()); + ASSERT_EQ(s.ToString(), + "Remote error: Not authorized: alice is not allowed to call this method"); + } + + // Try some calls as "bob". + { + CalculatorServiceProxy p(client_messenger_, server_addr_); + UserCredentials creds; + creds.set_real_user("bob"); + p.set_user_credentials(creds); + + // "bob" is allowed to call WhoAmI(). + { + RpcController controller; + WhoAmIRequestPB req; + WhoAmIResponsePB resp; + ASSERT_OK(p.WhoAmI(req, &resp, &controller)); + } + + // "bob" is not allowed to call "Sleep". + { + RpcController controller; + SleepRequestPB req; + req.set_sleep_micros(10); + SleepResponsePB resp; + Status s = p.Sleep(req, &resp, &controller); + ASSERT_EQ(s.ToString(), + "Remote error: Not authorized: bob is not allowed to call this method"); + } + } +} + +// Test that the user's remote address is accessible to the server. +TEST_F(RpcStubTest, TestRemoteAddress) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + RpcController controller; + WhoAmIRequestPB req; + WhoAmIResponsePB resp; + ASSERT_OK(p.WhoAmI(req, &resp, &controller)); + ASSERT_STR_CONTAINS(resp.address(), "127.0.0.1:"); +} + +//////////////////////////////////////////////////////////// +// Tests for error cases +//////////////////////////////////////////////////////////// + +// Test sending a PB parameter with a missing field, where the client +// thinks it has sent a full PB. (eg due to version mismatch) +TEST_F(RpcStubTest, TestCallWithInvalidParam) { + Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name()); + + rpc_test::AddRequestPartialPB req; + req.set_x(rand()); + // AddRequestPartialPB is missing the 'y' field. + AddResponsePB resp; + RpcController controller; + Status s = p.SyncRequest("Add", req, &resp, &controller); + ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), + "Invalid argument: invalid parameter for call " + "kudu.rpc_test.CalculatorService.Add: " + "missing fields: y"); +} + +// Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old' +// value, and our callback needs to be a void function. +static void DoIncrement(Atomic32* count) { + base::subtle::Barrier_AtomicIncrement(count, 1); +} + +// Test sending a PB parameter with a missing field on the client side. +// This also ensures that the async callback is only called once +// (regression test for a previously-encountered bug). +TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + RpcController controller; + AddRequestPB req; + req.set_x(10); + // Request is missing the 'y' field. + AddResponsePB resp; + Atomic32 callback_count = 0; + p.AddAsync(req, &resp, &controller, boost::bind(&DoIncrement, &callback_count)); + while (NoBarrier_Load(&callback_count) == 0) { + SleepFor(MonoDelta::FromMicroseconds(10)); + } + SleepFor(MonoDelta::FromMicroseconds(100)); + ASSERT_EQ(1, NoBarrier_Load(&callback_count)); + ASSERT_STR_CONTAINS(controller.status().ToString(), + "Invalid argument: invalid parameter for call " + "kudu.rpc_test.CalculatorService.Add: missing fields: y"); +} + +TEST_F(RpcStubTest, TestResponseWithMissingField) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + RpcController rpc; + TestInvalidResponseRequestPB req; + TestInvalidResponseResponsePB resp; + req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_MISSING_REQUIRED_FIELD); + Status s = p.TestInvalidResponse(req, &resp, &rpc); + ASSERT_STR_CONTAINS(s.ToString(), + "invalid RPC response, missing fields: response"); +} + +// Test case where the server responds with a message which is larger than the maximum +// configured RPC message size. The server should send the response, but the client +// will reject it. +TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + RpcController rpc; + TestInvalidResponseRequestPB req; + TestInvalidResponseResponsePB resp; + req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_RESPONSE_TOO_LARGE); + Status s = p.TestInvalidResponse(req, &resp, &rpc); + ASSERT_STR_CONTAINS(s.ToString(), "Network error: RPC frame had a length of"); +} + +// Test sending a call which isn't implemented by the server. +TEST_F(RpcStubTest, TestCallMissingMethod) { + Proxy p(client_messenger_, server_addr_, CalculatorService::static_service_name()); + + Status s = DoTestSyncCall(p, "DoesNotExist"); + ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist"); +} + +TEST_F(RpcStubTest, TestApplicationError) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + RpcController controller; + SleepRequestPB req; + SleepResponsePB resp; + req.set_sleep_micros(1); + req.set_return_app_error(true); + Status s = p.Sleep(req, &resp, &controller); + ASSERT_TRUE(s.IsRemoteError()); + EXPECT_EQ("Remote error: Got some error", s.ToString()); + EXPECT_EQ("message: \"Got some error\"\n" + "[kudu.rpc_test.CalculatorError.app_error_ext] {\n" + " extra_error_data: \"some application-specific error data\"\n" + "}\n", + SecureDebugString(*controller.error_response())); +} + +TEST_F(RpcStubTest, TestRpcPanic) { + if (!FLAGS_is_panic_test_child) { + // This is a poor man's death test. We call this same + // test case, but set the above flag, and verify that + // it aborted. gtest death tests don't work here because + // there are already threads started up. + vector<string> argv; + string executable_path; + CHECK_OK(env_->GetExecutablePath(&executable_path)); + argv.push_back(executable_path); + argv.push_back("--is_panic_test_child"); + argv.push_back("--gtest_filter=RpcStubTest.TestRpcPanic"); + Subprocess subp(argv); + subp.ShareParentStderr(false); + CHECK_OK(subp.Start()); + FILE* in = fdopen(subp.from_child_stderr_fd(), "r"); + PCHECK(in); + + // Search for string "Test method panicking!" somewhere in stderr + char buf[1024]; + bool found_string = false; + while (fgets(buf, sizeof(buf), in)) { + if (strstr(buf, "Test method panicking!")) { + found_string = true; + break; + } + } + CHECK(found_string); + + // Check return status + int wait_status = 0; + CHECK_OK(subp.Wait(&wait_status)); + CHECK(!WIFEXITED(wait_status)); // should not have been successful + if (WIFSIGNALED(wait_status)) { + CHECK_EQ(WTERMSIG(wait_status), SIGABRT); + } else { + // On some systems, we get exit status 134 from SIGABRT rather than + // WIFSIGNALED getting flagged. + CHECK_EQ(WEXITSTATUS(wait_status), 134); + } + return; + } else { + // Before forcing the panic, explicitly remove the test directory. This + // should be safe; this test doesn't generate any data. + CHECK_OK(env_->DeleteRecursively(test_dir_)); + + // Make an RPC which causes the server to abort. + CalculatorServiceProxy p(client_messenger_, server_addr_); + RpcController controller; + PanicRequestPB req; + PanicResponsePB resp; + p.Panic(req, &resp, &controller); + } +} + +struct AsyncSleep { + AsyncSleep() : latch(1) {} + + RpcController rpc; + SleepRequestPB req; + SleepResponsePB resp; + CountDownLatch latch; +}; + +TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + vector<AsyncSleep*> sleeps; + ElementDeleter d(&sleeps); + + // Send enough sleep calls to occupy the worker threads. + for (int i = 0; i < n_worker_threads_; i++) { + gscoped_ptr<AsyncSleep> sleep(new AsyncSleep); + sleep->rpc.set_timeout(MonoDelta::FromSeconds(1)); + sleep->req.set_sleep_micros(100*1000); // 100ms + p.SleepAsync(sleep->req, &sleep->resp, &sleep->rpc, + boost::bind(&CountDownLatch::CountDown, &sleep->latch)); + sleeps.push_back(sleep.release()); + } + + // We asynchronously sent the RPCs above, but the RPCs might still + // be in the queue. Because the RPC we send next has a lower timeout, + // it would take priority over the long-timeout RPCs. So, we have to + // wait until the above RPCs are being processed before we continue + // the test. + const Histogram* queue_time_metric = service_pool_->IncomingQueueTimeMetricForTests(); + while (queue_time_metric->TotalCount() < n_worker_threads_) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + + // Send another call with a short timeout. This shouldn't get processed, because + // it'll get stuck in the queue for longer than its timeout. + RpcController rpc; + SleepRequestPB req; + SleepResponsePB resp; + req.set_sleep_micros(1000); + rpc.set_timeout(MonoDelta::FromMilliseconds(1)); + Status s = p.Sleep(req, &resp, &rpc); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); + + for (AsyncSleep* s : sleeps) { + s->latch.Wait(); + } + + // Verify that the timedout call got short circuited before being processed. + const Counter* timed_out_in_queue = service_pool_->RpcsTimedOutInQueueMetricForTests(); + ASSERT_EQ(1, timed_out_in_queue->value()); +} + +// Test which ensures that the RPC queue accepts requests with the earliest +// deadline first (EDF), and upon overflow rejects requests with the latest deadlines. +// +// In particular, this simulates a workload experienced with Impala where the local +// impalad would spawn more scanner threads than the total number of handlers plus queue +// slots, guaranteeing that some of those clients would see SERVER_TOO_BUSY rejections on +// scan requests and be forced to back off and retry. Without EDF scheduling, we saw that +// the "unlucky" threads that got rejected would likely continue to get rejected upon +// retries, and some would be starved continually until they missed their overall deadline +// and failed the query. +// +// With EDF scheduling, the retries take priority over the original requests (because +// they retain their original deadlines). This prevents starvation of unlucky threads. +TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) { + const int num_client_threads = service_queue_length_ + n_worker_threads_ + 5; + vector<std::thread> threads; + vector<int> successes(num_client_threads); + std::atomic<bool> done(false); + for (int thread_id = 0; thread_id < num_client_threads; thread_id++) { + threads.emplace_back([&, thread_id] { + Random rng(thread_id); + CalculatorServiceProxy p(client_messenger_, server_addr_); + while (!done.load()) { + // Set a deadline in the future. We'll keep using this same deadline + // on each of our retries. + MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(8); + + for (int attempt = 1; !done.load(); attempt++) { + RpcController controller; + SleepRequestPB req; + SleepResponsePB resp; + controller.set_deadline(deadline); + req.set_sleep_micros(100000); + Status s = p.Sleep(req, &resp, &controller); + if (s.ok()) { + successes[thread_id]++; + break; + } + // We expect to get SERVER_TOO_BUSY errors because we have more clients than the + // server has handlers and queue slots. No other errors are expected. + CHECK(s.IsRemoteError() && + controller.error_response()->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY) + << "Unexpected RPC failure: " << s.ToString(); + // Randomized exponential backoff (similar to that done by the scanners in the Kudu + // client.). + int backoff = (0.5 + rng.NextDoubleFraction() * 0.5) * (std::min(1 << attempt, 1000)); + VLOG(1) << "backoff " << backoff << "ms"; + SleepFor(MonoDelta::FromMilliseconds(backoff)); + } + } + }); + } + // Let the threads run for 5 seconds before stopping them. + SleepFor(MonoDelta::FromSeconds(5)); + done.store(true); + for (auto& t : threads) { + t.join(); + } + + // Before switching to earliest-deadline-first scheduling, the results + // here would typically look something like: + // 1 1 0 1 10 17 6 1 12 12 17 10 8 7 12 9 16 15 + // With the fix, we see something like: + // 9 9 9 8 9 9 9 9 9 9 9 9 9 9 9 9 9 + LOG(INFO) << "thread RPC success counts: " << successes; + + int sum = 0; + int min = std::numeric_limits<int>::max(); + for (int x : successes) { + sum += x; + min = std::min(min, x); + } + int avg = sum / successes.size(); + ASSERT_GT(min, avg / 2) + << "expected the least lucky thread to have at least half as many successes " + << "as the average thread: min=" << min << " avg=" << avg; +} + +TEST_F(RpcStubTest, TestDumpCallsInFlight) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + AsyncSleep sleep; + sleep.req.set_sleep_micros(100 * 1000); // 100ms + p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc, + boost::bind(&CountDownLatch::CountDown, &sleep.latch)); + + // Check the running RPC status on the client messenger. + DumpRunningRpcsRequestPB dump_req; + DumpRunningRpcsResponsePB dump_resp; + dump_req.set_include_traces(true); + + ASSERT_OK(client_messenger_->DumpRunningRpcs(dump_req, &dump_resp)); + LOG(INFO) << "client messenger: " << SecureDebugString(dump_resp); + ASSERT_EQ(1, dump_resp.outbound_connections_size()); + ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size()); + ASSERT_EQ("Sleep", dump_resp.outbound_connections(0).calls_in_flight(0). + header().remote_method().method_name()); + ASSERT_GT(dump_resp.outbound_connections(0).calls_in_flight(0).micros_elapsed(), 0); + + // And the server messenger. + // We have to loop this until we find a result since the actual call is sent + // asynchronously off of the main thread (ie the server may not be handling it yet) + for (int i = 0; i < 100; i++) { + dump_resp.Clear(); + ASSERT_OK(server_messenger_->DumpRunningRpcs(dump_req, &dump_resp)); + if (dump_resp.inbound_connections_size() > 0 && + dump_resp.inbound_connections(0).calls_in_flight_size() > 0) { + break; + } + SleepFor(MonoDelta::FromMilliseconds(1)); + } + + LOG(INFO) << "server messenger: " << SecureDebugString(dump_resp); + ASSERT_EQ(1, dump_resp.inbound_connections_size()); + ASSERT_EQ(1, dump_resp.inbound_connections(0).calls_in_flight_size()); + ASSERT_EQ("Sleep", dump_resp.inbound_connections(0).calls_in_flight(0). + header().remote_method().method_name()); + ASSERT_GT(dump_resp.inbound_connections(0).calls_in_flight(0).micros_elapsed(), 0); + ASSERT_STR_CONTAINS(dump_resp.inbound_connections(0).calls_in_flight(0).trace_buffer(), + "Inserting onto call queue"); + sleep.latch.Wait(); +} + +TEST_F(RpcStubTest, TestDumpSampledCalls) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + // Issue two calls that fall into different latency buckets. + AsyncSleep sleeps[2]; + sleeps[0].req.set_sleep_micros(150 * 1000); // 150ms + sleeps[1].req.set_sleep_micros(1500 * 1000); // 1500ms + + for (auto& sleep : sleeps) { + p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc, + boost::bind(&CountDownLatch::CountDown, &sleep.latch)); + } + for (auto& sleep : sleeps) { + sleep.latch.Wait(); + } + + // Dump the sampled RPCs and expect to see the calls + // above. + + DumpRpczStoreResponsePB sampled_rpcs; + server_messenger_->rpcz_store()->DumpPB(DumpRpczStoreRequestPB(), &sampled_rpcs); + EXPECT_EQ(sampled_rpcs.methods_size(), 1); + ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), + " metrics {\n" + " key: \"test_sleep_us\"\n" + " value: 150000\n" + " }\n"); + ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), + " metrics {\n" + " key: \"test_sleep_us\"\n" + " value: 1500000\n" + " }\n"); + ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), + " metrics {\n" + " child_path: \"test_child\"\n" + " key: \"related_trace_metric\"\n" + " value: 1\n" + " }"); + ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "SleepRequestPB"); + ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "duration_ms"); +} + +namespace { +struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> { +}; + +// Test callback which takes a refcounted pointer. +// We don't use this parameter, but it's used to validate that the bound callback +// is cleared in TestCallbackClearedAfterRunning. +void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest> my_refptr) { + latch->CountDown(); +} +} // anonymous namespace + +// Verify that, after a call has returned, no copy of the call's callback +// is held. This is important when the callback holds a refcounted ptr, +// since we expect to be able to release that pointer when the call is done. +TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) { + CalculatorServiceProxy p(client_messenger_, server_addr_); + + CountDownLatch latch(1); + scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest); + RpcController controller; + AddRequestPB req; + req.set_x(10); + req.set_y(20); + AddResponsePB resp; + p.AddAsync(req, &resp, &controller, + boost::bind(MyTestCallback, &latch, my_refptr)); + latch.Wait(); + + // The ref count should go back down to 1. However, we need to loop a little + // bit, since the deref is happening on another thread. If the other thread gets + // descheduled directly after calling our callback, we'd fail without these sleeps. + for (int i = 0; i < 100 && !my_refptr->HasOneRef(); i++) { + SleepFor(MonoDelta::FromMilliseconds(1)); + } + ASSERT_TRUE(my_refptr->HasOneRef()); +} + +// Regression test for KUDU-1409: if the client reactor thread is blocked (e.g due to a +// process-wide pause or a slow callback) then we should not cause RPC calls to time out. +TEST_F(RpcStubTest, DontTimeOutWhenReactorIsBlocked) { + CHECK_EQ(client_messenger_->num_reactors(), 1) + << "This test requires only a single reactor. Otherwise the injected sleep might " + << "be scheduled on a different reactor than the RPC call."; + + CalculatorServiceProxy p(client_messenger_, server_addr_); + + // Schedule a 1-second sleep on the reactor thread. + // + // This will cause us the reactor to be blocked while the call response is received, and + // still be blocked when the timeout would normally occur. Despite this, the call should + // not time out. + // + // 0s 0.5s 1.2s 1.5s + // RPC call running + // |---------------------| + // Reactor blocked in sleep + // |----------------------| + // \_ RPC would normally time out + + client_messenger_->ScheduleOnReactor([](const Status& s) { + ThreadRestrictions::ScopedAllowWait allow_wait; + SleepFor(MonoDelta::FromSeconds(1)); + }, MonoDelta::FromSeconds(0.5)); + + RpcController controller; + SleepRequestPB req; + SleepResponsePB resp; + req.set_sleep_micros(800 * 1000); + controller.set_timeout(MonoDelta::FromMilliseconds(1200)); + ASSERT_OK(p.Sleep(req, &resp, &controller)); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpcz_store.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpcz_store.cc b/be/src/kudu/rpc/rpcz_store.cc new file mode 100644 index 0000000..66c23d0 --- /dev/null +++ b/be/src/kudu/rpc/rpcz_store.cc @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/rpcz_store.h" + +#include <algorithm> +#include <array> +#include <glog/stl_logging.h> +#include <mutex> // for unique_lock +#include <string> +#include <utility> +#include <vector> + +#include "kudu/gutil/walltime.h" +#include "kudu/rpc/inbound_call.h" +#include "kudu/rpc/rpc_introspection.pb.h" +#include "kudu/rpc/service_if.h" +#include "kudu/util/atomic.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/monotime.h" +#include "kudu/util/trace.h" + + +DEFINE_bool(rpc_dump_all_traces, false, + "If true, dump all RPC traces at INFO level"); +TAG_FLAG(rpc_dump_all_traces, advanced); +TAG_FLAG(rpc_dump_all_traces, runtime); + +using std::pair; +using std::vector; +using std::unique_ptr; + +namespace kudu { +namespace rpc { + +// Sample an RPC call once every N milliseconds within each +// bucket. If the current sample in a latency bucket is older +// than this threshold, a new sample will be taken. +static const int kSampleIntervalMs = 1000; + +static const int kBucketThresholdsMs[] = {10, 100, 1000}; +static constexpr int kNumBuckets = arraysize(kBucketThresholdsMs) + 1; + +// An instance of this class is created For each RPC method implemented +// on the server. It keeps several recent samples for each RPC, currently +// based on fixed time buckets. +class MethodSampler { + public: + MethodSampler() {} + ~MethodSampler() {} + + // Potentially sample a single call. + void SampleCall(InboundCall* call); + + // Dump the current samples. + void GetSamplePBs(RpczMethodPB* pb); + + private: + // Convert the trace metrics from 't' into protobuf entries in 'sample_pb'. + // This function recurses through the parent-child relationship graph, + // keeping the current tree path in 'child_path' (empty at the root). + static void GetTraceMetrics(const Trace& t, + const std::string& child_path, + RpczSamplePB* sample_pb); + + // An individual recorded sample. + struct Sample { + RequestHeader header; + scoped_refptr<Trace> trace; + int duration_ms; + }; + + // A sample, including the particular time at which it was + // sampled, and a lock protecting it. + struct SampleBucket { + SampleBucket() : last_sample_time(0) {} + + AtomicInt<int64_t> last_sample_time; + simple_spinlock sample_lock; + Sample sample; + }; + std::array<SampleBucket, kNumBuckets> buckets_; + + DISALLOW_COPY_AND_ASSIGN(MethodSampler); +}; + +MethodSampler* RpczStore::SamplerForCall(InboundCall* call) { + if (PREDICT_FALSE(!call->method_info())) { + return nullptr; + } + + // Most likely, we already have a sampler created for the call. + { + shared_lock<rw_spinlock> l(samplers_lock_.get_lock()); + auto it = method_samplers_.find(call->method_info()); + if (PREDICT_TRUE(it != method_samplers_.end())) { + return it->second.get(); + } + } + + // If missing, create a new sampler for this method and try to insert it. + unique_ptr<MethodSampler> ms(new MethodSampler()); + std::lock_guard<percpu_rwlock> lock(samplers_lock_); + auto it = method_samplers_.find(call->method_info()); + if (it != method_samplers_.end()) { + return it->second.get(); + } + auto* ret = ms.get(); + method_samplers_[call->method_info()] = std::move(ms); + return ret; +} + +void MethodSampler::SampleCall(InboundCall* call) { + // First determine which sample bucket to put this in. + int duration_ms = call->timing().TotalDuration().ToMilliseconds(); + + SampleBucket* bucket = &buckets_[kNumBuckets - 1]; + for (int i = 0 ; i < kNumBuckets - 1; i++) { + if (duration_ms < kBucketThresholdsMs[i]) { + bucket = &buckets_[i]; + break; + } + } + + MicrosecondsInt64 now = GetMonoTimeMicros(); + int64_t us_since_trace = now - bucket->last_sample_time.Load(); + if (us_since_trace > kSampleIntervalMs * 1000) { + Sample new_sample = {call->header(), call->trace(), duration_ms}; + { + std::unique_lock<simple_spinlock> lock(bucket->sample_lock, std::try_to_lock); + // If another thread is already taking a sample, it's not worth waiting. + if (!lock.owns_lock()) { + return; + } + std::swap(bucket->sample, new_sample); + bucket->last_sample_time.Store(now); + } + VLOG(1) << "Sampled call " << call->ToString(); + } +} + +void MethodSampler::GetTraceMetrics(const Trace& t, + const string& child_path, + RpczSamplePB* sample_pb) { + auto m = t.metrics().Get(); + for (const auto& e : m) { + auto* pb = sample_pb->add_metrics(); + pb->set_key(e.first); + pb->set_value(e.second); + if (!child_path.empty()) { + pb->set_child_path(child_path); + } + } + + for (const auto& child_pair : t.ChildTraces()) { + string path = child_path; + if (!path.empty()) { + path += "."; + } + path += child_pair.first.ToString(); + GetTraceMetrics(*child_pair.second.get(), path, sample_pb); + } +} + +void MethodSampler::GetSamplePBs(RpczMethodPB* method_pb) { + for (auto& bucket : buckets_) { + if (bucket.last_sample_time.Load() == 0) continue; + + std::unique_lock<simple_spinlock> lock(bucket.sample_lock); + auto* sample_pb = method_pb->add_samples(); + sample_pb->mutable_header()->CopyFrom(bucket.sample.header); + sample_pb->set_trace(bucket.sample.trace->DumpToString(Trace::INCLUDE_TIME_DELTAS)); + + GetTraceMetrics(*bucket.sample.trace.get(), "", sample_pb); + sample_pb->set_duration_ms(bucket.sample.duration_ms); + } +} + +RpczStore::RpczStore() {} +RpczStore::~RpczStore() {} + +void RpczStore::AddCall(InboundCall* call) { + LogTrace(call); + auto* sampler = SamplerForCall(call); + if (PREDICT_FALSE(!sampler)) return; + + sampler->SampleCall(call); +} + +void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req, + DumpRpczStoreResponsePB* resp) { + vector<pair<RpcMethodInfo*, MethodSampler*>> samplers; + { + shared_lock<rw_spinlock> l(samplers_lock_.get_lock()); + for (const auto& p : method_samplers_) { + samplers.emplace_back(p.first, p.second.get()); + } + } + + for (const auto& p : samplers) { + auto* sampler = p.second; + + RpczMethodPB* method_pb = resp->add_methods(); + // TODO: use the actual RPC name instead of the request type name. + // Currently this isn't conveniently plumbed here, but the type name + // is close enough. + method_pb->set_method_name(p.first->req_prototype->GetTypeName()); + sampler->GetSamplePBs(method_pb); + } +} + +void RpczStore::LogTrace(InboundCall* call) { + int duration_ms = call->timing().TotalDuration().ToMilliseconds(); + + if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) { + double log_threshold = call->header_.timeout_millis() * 0.75f; + if (duration_ms > log_threshold) { + // TODO: consider pushing this onto another thread since it may be slow. + // The traces may also be too large to fit in a log message. + LOG(WARNING) << call->ToString() << " took " << duration_ms << "ms (client timeout " + << call->header_.timeout_millis() << ")."; + std::string s = call->trace()->DumpToString(); + if (!s.empty()) { + LOG(WARNING) << "Trace:\n" << s; + } + return; + } + } + + if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) { + LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. Trace:"; + call->trace()->Dump(&LOG(INFO), true); + } else if (duration_ms > 1000) { + LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. " + << "Request Metrics: " << call->trace()->MetricsAsJSON(); + } +} + + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpcz_store.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpcz_store.h b/be/src/kudu/rpc/rpcz_store.h new file mode 100644 index 0000000..48e4474 --- /dev/null +++ b/be/src/kudu/rpc/rpcz_store.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "kudu/gutil/macros.h" + +#include <memory> +#include <unordered_map> + +#include "kudu/util/locks.h" + +namespace kudu { +namespace rpc { + +class DumpRpczStoreRequestPB; +class DumpRpczStoreResponsePB; +class InboundCall; +class MethodSampler; +struct RpcMethodInfo; + +// Responsible for storing sampled traces associated with completed calls. +// Before each call is responded to, it is added to this store. +class RpczStore { + public: + RpczStore(); + ~RpczStore(); + + // Process a single call, potentially sampling it for later analysis. + // + // If the call is sampled, it might be mutated. For example, the request + // and response might be taken from the call and stored as part of the + // sample. This should be called just before a call response is sent + // to the client. + void AddCall(InboundCall* c); + + // Dump all of the collected RPC samples in response to a user query. + void DumpPB(const DumpRpczStoreRequestPB& req, + DumpRpczStoreResponsePB* resp); + + private: + // Look up or create the particular MethodSampler instance which should + // store samples for this call. + MethodSampler* SamplerForCall(InboundCall* call); + + // Log a WARNING message if the RPC response was slow enough that the + // client likely timed out. This is based on the client-provided timeout + // value. + // Also can be configured to log _all_ RPC traces for help debugging. + void LogTrace(InboundCall* call); + + percpu_rwlock samplers_lock_; + + // Protected by samplers_lock_. + std::unordered_map<RpcMethodInfo*, std::unique_ptr<MethodSampler>> method_samplers_; + + DISALLOW_COPY_AND_ASSIGN(RpczStore); +}; + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rtest.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rtest.proto b/be/src/kudu/rpc/rtest.proto new file mode 100644 index 0000000..1ef5ca6 --- /dev/null +++ b/be/src/kudu/rpc/rtest.proto @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Test protocol for kudu RPC. +syntax = "proto2"; +package kudu.rpc_test; + +import "kudu/rpc/rpc_header.proto"; +import "kudu/rpc/rtest_diff_package.proto"; + +message AddRequestPB { + required uint32 x = 1; + required uint32 y = 2; +} + +// Used by tests to simulate an old client which is missing +// a newly added required field. +message AddRequestPartialPB { + required uint32 x = 1; +} + +message AddResponsePB { + required uint32 result = 1; +} + +message SleepRequestPB { + required uint32 sleep_micros = 1; + + // Used in rpc_stub-test: if this is true, it will respond from a different + // thread than the one that receives the request. + optional bool deferred = 2 [ default = false ]; + + // If set, returns a CalculatorError response. + optional bool return_app_error = 3 [ default = false ]; + + // Used in rpc-test: if this is set to true and no client timeout is set, + // the service will respond to the client with an error. + optional bool client_timeout_defined = 4 [ default = false ]; +} + +message SleepResponsePB { +} + +message SendTwoStringsRequestPB { + required uint32 random_seed = 1; + required uint64 size1 = 2; + required uint64 size2 = 3; +} + +message SendTwoStringsResponsePB { + required uint32 sidecar1 = 1; + required uint32 sidecar2 = 2; +} + +// Push two strings to the server as part of the request, in sidecars. +message PushTwoStringsRequestPB { + required uint32 sidecar1_idx = 1; + required uint32 sidecar2_idx = 2; +} + +message PushTwoStringsResponsePB { + required uint32 size1 = 1; + required string data1 = 2; + required uint32 size2 = 3; + required string data2 = 4; +} + +message EchoRequestPB { + required string data = 1; +} +message EchoResponsePB { + required string data = 1; +} + +message WhoAmIRequestPB { +} +message WhoAmIResponsePB { + required kudu.rpc.UserInformationPB credentials = 1; + required string address = 2; +} + +message CalculatorError { + extend kudu.rpc.ErrorStatusPB { + optional CalculatorError app_error_ext = 101; + } + + required string extra_error_data = 1; +} + +message PanicRequestPB {} +message PanicResponsePB {} + +message TestInvalidResponseRequestPB { + enum ErrorType { + MISSING_REQUIRED_FIELD = 1; + RESPONSE_TOO_LARGE = 2; + } + required ErrorType error_type = 1; +} + +message TestInvalidResponseResponsePB { + required bytes response = 1; +} + +enum FeatureFlags { + UNKNOWN=0; + FOO=1; +} + +message ExactlyOnceRequestPB { + optional uint32 sleep_for_ms = 1 [default = 0]; + required uint32 value_to_add = 2; + optional bool randomly_fail = 3 [default = false]; +} +message ExactlyOnceResponsePB { + required uint32 current_val = 1; + required fixed64 current_time_micros = 2; +} + +service CalculatorService { + option (kudu.rpc.default_authz_method) = "AuthorizeDisallowAlice"; + + rpc Add(AddRequestPB) returns(AddResponsePB); + rpc Sleep(SleepRequestPB) returns(SleepResponsePB) { + option (kudu.rpc.authz_method) = "AuthorizeDisallowBob"; + }; + rpc Echo(EchoRequestPB) returns(EchoResponsePB); + rpc WhoAmI(WhoAmIRequestPB) returns (WhoAmIResponsePB); + rpc TestArgumentsInDiffPackage(kudu.rpc_test_diff_package.ReqDiffPackagePB) + returns(kudu.rpc_test_diff_package.RespDiffPackagePB); + rpc Panic(PanicRequestPB) returns (PanicResponsePB); + rpc AddExactlyOnce(ExactlyOnceRequestPB) returns (ExactlyOnceResponsePB) { + option (kudu.rpc.track_rpc_result) = true; + } + rpc TestInvalidResponse(TestInvalidResponseRequestPB) returns (TestInvalidResponseResponsePB); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rtest_diff_package.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rtest_diff_package.proto b/be/src/kudu/rpc/rtest_diff_package.proto new file mode 100644 index 0000000..f6f9b60 --- /dev/null +++ b/be/src/kudu/rpc/rtest_diff_package.proto @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Request/Response in different package to test that RPC methods +// handle arguments with packages different from the service itself. +syntax = "proto2"; +package kudu.rpc_test_diff_package; + +message ReqDiffPackagePB { +} +message RespDiffPackagePB { +}
