http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/connection.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc new file mode 100644 index 0000000..fc46d67 --- /dev/null +++ b/be/src/kudu/rpc/connection.cc @@ -0,0 +1,732 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/connection.h" + +#include <stdint.h> + +#include <algorithm> +#include <iostream> +#include <memory> +#include <set> +#include <string> +#include <unordered_set> +#include <vector> + +#include <boost/intrusive/list.hpp> +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/human_readable.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/client_negotiation.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/reactor.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/rpc_introspection.pb.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/status.h" +#include "kudu/util/trace.h" + +using std::function; +using std::includes; +using std::set; +using std::shared_ptr; +using std::unique_ptr; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace rpc { + +typedef OutboundCall::Phase Phase; + +/// +/// Connection +/// +Connection::Connection(ReactorThread *reactor_thread, + Sockaddr remote, + unique_ptr<Socket> socket, + Direction direction, + CredentialsPolicy policy) + : reactor_thread_(reactor_thread), + remote_(remote), + socket_(std::move(socket)), + direction_(direction), + last_activity_time_(MonoTime::Now()), + is_epoll_registered_(false), + next_call_id_(1), + credentials_policy_(policy), + negotiation_complete_(false), + scheduled_for_shutdown_(false) { +} + +Status Connection::SetNonBlocking(bool enabled) { + return socket_->SetNonBlocking(enabled); +} + +void Connection::EpollRegister(ev::loop_ref& loop) { + DCHECK(reactor_thread_->IsCurrentThread()); + DVLOG(4) << "Registering connection for epoll: " << ToString(); + write_io_.set(loop); + write_io_.set(socket_->GetFd(), ev::WRITE); + write_io_.set<Connection, &Connection::WriteHandler>(this); + if (direction_ == CLIENT && negotiation_complete_) { + write_io_.start(); + } + read_io_.set(loop); + read_io_.set(socket_->GetFd(), ev::READ); + read_io_.set<Connection, &Connection::ReadHandler>(this); + read_io_.start(); + is_epoll_registered_ = true; +} + +Connection::~Connection() { + // Must clear the outbound_transfers_ list before deleting. + CHECK(outbound_transfers_.begin() == outbound_transfers_.end()); + + // It's crucial that the connection is Shutdown first -- otherwise + // our destructor will end up calling read_io_.stop() and write_io_.stop() + // from a possibly non-reactor thread context. This can then make all + // hell break loose with libev. + CHECK(!is_epoll_registered_); +} + +bool Connection::Idle() const { + DCHECK(reactor_thread_->IsCurrentThread()); + // check if we're in the middle of receiving something + InboundTransfer *transfer = inbound_.get(); + if (transfer && (transfer->TransferStarted())) { + return false; + } + // check if we still need to send something + if (!outbound_transfers_.empty()) { + return false; + } + // can't kill a connection if calls are waiting response + if (!awaiting_response_.empty()) { + return false; + } + + if (!calls_being_handled_.empty()) { + return false; + } + + // We are not idle if we are in the middle of connection negotiation. + if (!negotiation_complete_) { + return false; + } + + return true; +} + +void Connection::Shutdown(const Status &status, + unique_ptr<ErrorStatusPB> rpc_error) { + DCHECK(reactor_thread_->IsCurrentThread()); + shutdown_status_ = status.CloneAndPrepend("RPC connection failed"); + + if (inbound_ && inbound_->TransferStarted()) { + double secs_since_active = + (reactor_thread_->cur_time() - last_activity_time_).ToSeconds(); + LOG(WARNING) << "Shutting down " << ToString() + << " with pending inbound data (" + << inbound_->StatusAsString() << ", last active " + << HumanReadableElapsedTime::ToShortString(secs_since_active) + << " ago, status=" << status.ToString() << ")"; + } + + // Clear any calls which have been sent and were awaiting a response. + for (const car_map_t::value_type &v : awaiting_response_) { + CallAwaitingResponse *c = v.second; + if (c->call) { + // Make sure every awaiting call receives the error info, if any. + unique_ptr<ErrorStatusPB> error; + if (rpc_error) { + error.reset(new ErrorStatusPB(*rpc_error)); + } + c->call->SetFailed(status, + negotiation_complete_ ? Phase::REMOTE_CALL + : Phase::CONNECTION_NEGOTIATION, + error.release()); + } + // And we must return the CallAwaitingResponse to the pool + car_pool_.Destroy(c); + } + awaiting_response_.clear(); + + // Clear any outbound transfers. + while (!outbound_transfers_.empty()) { + OutboundTransfer *t = &outbound_transfers_.front(); + outbound_transfers_.pop_front(); + delete t; + } + + read_io_.stop(); + write_io_.stop(); + is_epoll_registered_ = false; + if (socket_) { + WARN_NOT_OK(socket_->Close(), "Error closing socket"); + } +} + +void Connection::QueueOutbound(gscoped_ptr<OutboundTransfer> transfer) { + DCHECK(reactor_thread_->IsCurrentThread()); + + if (!shutdown_status_.ok()) { + // If we've already shut down, then we just need to abort the + // transfer rather than bothering to queue it. + transfer->Abort(shutdown_status_); + return; + } + + DVLOG(3) << "Queueing transfer: " << transfer->HexDump(); + + outbound_transfers_.push_back(*transfer.release()); + + if (negotiation_complete_ && !write_io_.is_active()) { + // If we weren't currently in the middle of sending anything, + // then our write_io_ interest is stopped. Need to re-start it. + // Only do this after connection negotiation is done doing its work. + write_io_.start(); + } +} + +Connection::CallAwaitingResponse::~CallAwaitingResponse() { + DCHECK(conn->reactor_thread_->IsCurrentThread()); +} + +void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int revents) { + if (remaining_timeout > 0) { + if (watcher.remaining() < -1.0) { + LOG(WARNING) << "RPC call timeout handler was delayed by " + << -watcher.remaining() << "s! This may be due to a process-wide " + << "pause such as swapping, logging-related delays, or allocator lock " + << "contention. Will allow an additional " + << remaining_timeout << "s for a response."; + } + + watcher.set(remaining_timeout, 0); + watcher.start(); + remaining_timeout = 0; + return; + } + + conn->HandleOutboundCallTimeout(this); +} + +void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) { + DCHECK(reactor_thread_->IsCurrentThread()); + DCHECK(car->call); + // The timeout timer is stopped by the car destructor exiting Connection::HandleCallResponse() + DCHECK(!car->call->IsFinished()); + + // Mark the call object as failed. + car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL + : Phase::CONNECTION_NEGOTIATION); + + // Drop the reference to the call. If the original caller has moved on after + // seeing the timeout, we no longer need to hold onto the allocated memory + // from the request. + car->call.reset(); + + // We still leave the CallAwaitingResponse in the map -- this is because we may still + // receive a response from the server, and we don't want a spurious log message + // when we do finally receive the response. The fact that CallAwaitingResponse::call + // is a NULL pointer indicates to the response processing code that the call + // already timed out. +} + +// Callbacks after sending a call on the wire. +// This notifies the OutboundCall object to change its state to SENT once it +// has been fully transmitted. +struct CallTransferCallbacks : public TransferCallbacks { + public: + explicit CallTransferCallbacks(shared_ptr<OutboundCall> call) + : call_(std::move(call)) {} + + virtual void NotifyTransferFinished() OVERRIDE { + // TODO: would be better to cancel the transfer while it is still on the queue if we + // timed out before the transfer started, but there is still a race in the case of + // a partial send that we have to handle here + if (call_->IsFinished()) { + DCHECK(call_->IsTimedOut()); + } else { + call_->SetSent(); + } + delete this; + } + + virtual void NotifyTransferAborted(const Status &status) OVERRIDE { + VLOG(1) << "Transfer of RPC call " << call_->ToString() << " aborted: " + << status.ToString(); + delete this; + } + + private: + shared_ptr<OutboundCall> call_; +}; + +void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) { + DCHECK(call); + DCHECK_EQ(direction_, CLIENT); + DCHECK(reactor_thread_->IsCurrentThread()); + + if (PREDICT_FALSE(!shutdown_status_.ok())) { + // Already shutdown + call->SetFailed(shutdown_status_, + negotiation_complete_ ? Phase::REMOTE_CALL + : Phase::CONNECTION_NEGOTIATION); + return; + } + + // At this point the call has a serialized request, but no call header, since we haven't + // yet assigned a call ID. + DCHECK(!call->call_id_assigned()); + + // Assign the call ID. + int32_t call_id = GetNextCallId(); + call->set_call_id(call_id); + + // Serialize the actual bytes to be put on the wire. + slices_tmp_.clear(); + Status s = call->SerializeTo(&slices_tmp_); + if (PREDICT_FALSE(!s.ok())) { + call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL + : Phase::CONNECTION_NEGOTIATION); + return; + } + + call->SetQueued(); + + scoped_car car(car_pool_.make_scoped_ptr(car_pool_.Construct())); + car->conn = this; + car->call = call; + + // Set up the timeout timer. + const MonoDelta &timeout = call->controller()->timeout(); + if (timeout.Initialized()) { + reactor_thread_->RegisterTimeout(&car->timeout_timer); + car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*) + &CallAwaitingResponse::HandleTimeout>(car.get()); + + // For calls with a timeout of at least 500ms, we actually run the timeout + // handler in two stages. The first timeout fires with a timeout 10% less + // than the user-specified one. It then schedules a second timeout for the + // remaining amount of time. + // + // The purpose of this two-stage timeout is to be more robust when the client + // has some process-wide pause, such as lock contention in tcmalloc, or a + // reactor callback that blocks in glog. Consider the following case: + // + // T = 0s user issues an RPC with 5 second timeout + // T = 0.5s - 6s process is blocked + // T = 6s process unblocks, and the timeout fires (1s late) + // + // Without the two-stage timeout, we would determine that the call had timed out, + // even though it's likely that the response is waiting on our TCP socket. + // With the two-stage timeout, we'll end up with: + // + // T = 0s user issues an RPC with 5 second timeout + // T = 0.5s - 6s process is blocked + // T = 6s process unblocks, and the first-stage timeout fires (1.5s late) + // T = 6s - 6.200s time for the client to read the response which is waiting + // T = 6.200s if the response was not actually available, we'll time out here + // + // We don't bother with this logic for calls with very short timeouts - assumedly + // a user setting such a short RPC timeout is well equipped to handle one. + double time = timeout.ToSeconds(); + if (time >= 0.5) { + car->remaining_timeout = time * 0.1; + time -= car->remaining_timeout; + } else { + car->remaining_timeout = 0; + } + + car->timeout_timer.set(time, 0); + car->timeout_timer.start(); + } + + TransferCallbacks *cb = new CallTransferCallbacks(call); + awaiting_response_[call_id] = car.release(); + QueueOutbound(gscoped_ptr<OutboundTransfer>( + OutboundTransfer::CreateForCallRequest(call_id, slices_tmp_, cb))); +} + +// Callbacks for sending an RPC call response from the server. +// This takes ownership of the InboundCall object so that, once it has +// been responded to, we can free up all of the associated memory. +struct ResponseTransferCallbacks : public TransferCallbacks { + public: + ResponseTransferCallbacks(gscoped_ptr<InboundCall> call, + Connection *conn) : + call_(std::move(call)), + conn_(conn) + {} + + ~ResponseTransferCallbacks() { + // Remove the call from the map. + InboundCall *call_from_map = EraseKeyReturnValuePtr( + &conn_->calls_being_handled_, call_->call_id()); + DCHECK_EQ(call_from_map, call_.get()); + } + + virtual void NotifyTransferFinished() OVERRIDE { + delete this; + } + + virtual void NotifyTransferAborted(const Status &status) OVERRIDE { + LOG(WARNING) << "Connection torn down before " << + call_->ToString() << " could send its response"; + delete this; + } + + private: + gscoped_ptr<InboundCall> call_; + Connection *conn_; +}; + +// Reactor task which puts a transfer on the outbound transfer queue. +class QueueTransferTask : public ReactorTask { + public: + QueueTransferTask(gscoped_ptr<OutboundTransfer> transfer, + Connection *conn) + : transfer_(std::move(transfer)), + conn_(conn) + {} + + virtual void Run(ReactorThread *thr) OVERRIDE { + conn_->QueueOutbound(std::move(transfer_)); + delete this; + } + + virtual void Abort(const Status &status) OVERRIDE { + transfer_->Abort(status); + delete this; + } + + private: + gscoped_ptr<OutboundTransfer> transfer_; + Connection *conn_; +}; + +void Connection::QueueResponseForCall(gscoped_ptr<InboundCall> call) { + // This is usually called by the IPC worker thread when the response + // is set, but in some circumstances may also be called by the + // reactor thread (e.g. if the service has shut down) + + DCHECK_EQ(direction_, SERVER); + + // If the connection is torn down, then the QueueOutbound() call that + // eventually runs in the reactor thread will take care of calling + // ResponseTransferCallbacks::NotifyTransferAborted. + + std::vector<Slice> slices; + call->SerializeResponseTo(&slices); + + TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this); + // After the response is sent, can delete the InboundCall object. + // We set a dummy call ID and required feature set, since these are not needed + // when sending responses. + gscoped_ptr<OutboundTransfer> t(OutboundTransfer::CreateForCallResponse(slices, cb)); + + QueueTransferTask *task = new QueueTransferTask(std::move(t), this); + reactor_thread_->reactor()->ScheduleReactorTask(task); +} + +bool Connection::SatisfiesCredentialsPolicy(CredentialsPolicy policy) const { + DCHECK_EQ(direction_, CLIENT); + return (policy == CredentialsPolicy::ANY_CREDENTIALS) || + (policy == credentials_policy_); +} + +RpczStore* Connection::rpcz_store() { + return reactor_thread_->reactor()->messenger()->rpcz_store(); +} + +void Connection::ReadHandler(ev::io &watcher, int revents) { + DCHECK(reactor_thread_->IsCurrentThread()); + + DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")"; + if (revents & EV_ERROR) { + reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() + + ": ReadHandler encountered an error")); + return; + } + last_activity_time_ = reactor_thread_->cur_time(); + + while (true) { + if (!inbound_) { + inbound_.reset(new InboundTransfer()); + } + Status status = inbound_->ReceiveBuffer(*socket_); + if (PREDICT_FALSE(!status.ok())) { + if (status.posix_code() == ESHUTDOWN) { + VLOG(1) << ToString() << " shut down by remote end."; + } else { + LOG(WARNING) << ToString() << " recv error: " << status.ToString(); + } + reactor_thread_->DestroyConnection(this, status); + return; + } + if (!inbound_->TransferFinished()) { + DVLOG(3) << ToString() << ": read is not yet finished yet."; + return; + } + DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size() << " bytes"; + + if (direction_ == CLIENT) { + HandleCallResponse(std::move(inbound_)); + } else if (direction_ == SERVER) { + HandleIncomingCall(std::move(inbound_)); + } else { + LOG(FATAL) << "Invalid direction: " << direction_; + } + + // TODO: it would seem that it would be good to loop around and see if + // there is more data on the socket by trying another recv(), but it turns + // out that it really hurts throughput to do so. A better approach + // might be for each InboundTransfer to actually try to read an extra byte, + // and if it succeeds, then we'd copy that byte into a new InboundTransfer + // and loop around, since it's likely the next call also arrived at the + // same time. + break; + } +} + +void Connection::HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer) { + DCHECK(reactor_thread_->IsCurrentThread()); + + gscoped_ptr<InboundCall> call(new InboundCall(this)); + Status s = call->ParseFrom(std::move(transfer)); + if (!s.ok()) { + LOG(WARNING) << ToString() << ": received bad data: " << s.ToString(); + // TODO: shutdown? probably, since any future stuff on this socket will be + // "unsynchronized" + return; + } + + if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get())) { + LOG(WARNING) << ToString() << ": received call ID " << call->call_id() << + " but was already processing this ID! Ignoring"; + reactor_thread_->DestroyConnection( + this, Status::RuntimeError("Received duplicate call id", + Substitute("$0", call->call_id()))); + return; + } + + reactor_thread_->reactor()->messenger()->QueueInboundCall(std::move(call)); +} + +void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) { + DCHECK(reactor_thread_->IsCurrentThread()); + gscoped_ptr<CallResponse> resp(new CallResponse); + CHECK_OK(resp->ParseFrom(std::move(transfer))); + + CallAwaitingResponse *car_ptr = + EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id()); + if (PREDICT_FALSE(car_ptr == nullptr)) { + LOG(WARNING) << ToString() << ": Got a response for call id " << resp->call_id() << " which " + << "was not pending! Ignoring."; + return; + } + + // The car->timeout_timer ev::timer will be stopped automatically by its destructor. + scoped_car car(car_pool_.make_scoped_ptr(car_ptr)); + + if (PREDICT_FALSE(car->call.get() == nullptr)) { + // The call already failed due to a timeout. + VLOG(1) << "Got response to call id " << resp->call_id() << " after client already timed out"; + return; + } + + car->call->SetResponse(std::move(resp)); +} + +void Connection::WriteHandler(ev::io &watcher, int revents) { + DCHECK(reactor_thread_->IsCurrentThread()); + + if (revents & EV_ERROR) { + reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() + + ": writeHandler encountered an error")); + return; + } + DVLOG(3) << ToString() << ": writeHandler: revents = " << revents; + + OutboundTransfer *transfer; + if (outbound_transfers_.empty()) { + LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is " + "nothing to write."; + write_io_.stop(); + return; + } + + while (!outbound_transfers_.empty()) { + transfer = &(outbound_transfers_.front()); + + if (!transfer->TransferStarted()) { + + if (transfer->is_for_outbound_call()) { + CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id()); + if (!car->call) { + // If the call has already timed out, then the 'call' field will have been nulled. + // In that case, we don't need to bother sending it. + outbound_transfers_.pop_front(); + transfer->Abort(Status::Aborted("already timed out")); + delete transfer; + continue; + } + + // If this is the start of the transfer, then check if the server has the + // required RPC flags. We have to wait until just before the transfer in + // order to ensure that the negotiation has taken place, so that the flags + // are available. + const set<RpcFeatureFlag>& required_features = car->call->required_rpc_features(); + if (!includes(remote_features_.begin(), remote_features_.end(), + required_features.begin(), required_features.end())) { + outbound_transfers_.pop_front(); + Status s = Status::NotSupported("server does not support the required RPC features"); + transfer->Abort(s); + car->call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL + : Phase::CONNECTION_NEGOTIATION); + car->call.reset(); + delete transfer; + continue; + } + + car->call->SetSending(); + } + } + + last_activity_time_ = reactor_thread_->cur_time(); + Status status = transfer->SendBuffer(*socket_); + if (PREDICT_FALSE(!status.ok())) { + LOG(WARNING) << ToString() << " send error: " << status.ToString(); + reactor_thread_->DestroyConnection(this, status); + return; + } + + if (!transfer->TransferFinished()) { + DVLOG(3) << ToString() << ": writeHandler: xfer not finished."; + return; + } + + outbound_transfers_.pop_front(); + delete transfer; + } + + // If we were able to write all of our outbound transfers, + // we don't have any more to write. + write_io_.stop(); +} + +std::string Connection::ToString() const { + // This may be called from other threads, so we cannot + // include anything in the output about the current state, + // which might concurrently change from another thread. + return strings::Substitute( + "$0 $1", + direction_ == SERVER ? "server connection from" : "client connection to", + remote_.ToString()); +} + +// Reactor task that transitions this Connection from connection negotiation to +// regular RPC handling. Destroys Connection on negotiation error. +class NegotiationCompletedTask : public ReactorTask { + public: + NegotiationCompletedTask(Connection* conn, + Status negotiation_status, + std::unique_ptr<ErrorStatusPB> rpc_error) + : conn_(conn), + negotiation_status_(std::move(negotiation_status)), + rpc_error_(std::move(rpc_error)) { + } + + virtual void Run(ReactorThread *rthread) OVERRIDE { + rthread->CompleteConnectionNegotiation(conn_, + negotiation_status_, + std::move(rpc_error_)); + delete this; + } + + virtual void Abort(const Status &status) OVERRIDE { + DCHECK(conn_->reactor_thread()->reactor()->closing()); + VLOG(1) << "Failed connection negotiation due to shut down reactor thread: " + << status.ToString(); + delete this; + } + + private: + scoped_refptr<Connection> conn_; + const Status negotiation_status_; + std::unique_ptr<ErrorStatusPB> rpc_error_; +}; + +void Connection::CompleteNegotiation(Status negotiation_status, + unique_ptr<ErrorStatusPB> rpc_error) { + auto task = new NegotiationCompletedTask( + this, std::move(negotiation_status), std::move(rpc_error)); + reactor_thread_->reactor()->ScheduleReactorTask(task); +} + +void Connection::MarkNegotiationComplete() { + DCHECK(reactor_thread_->IsCurrentThread()); + negotiation_complete_ = true; +} + +Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req, + RpcConnectionPB* resp) { + DCHECK(reactor_thread_->IsCurrentThread()); + resp->set_remote_ip(remote_.ToString()); + if (negotiation_complete_) { + resp->set_state(RpcConnectionPB::OPEN); + } else { + resp->set_state(RpcConnectionPB::NEGOTIATING); + } + + if (direction_ == CLIENT) { + for (const car_map_t::value_type& entry : awaiting_response_) { + CallAwaitingResponse *c = entry.second; + if (c->call) { + c->call->DumpPB(req, resp->add_calls_in_flight()); + } + } + } else if (direction_ == SERVER) { + if (negotiation_complete_) { + // It's racy to dump credentials while negotiating, since the Connection + // object is owned by the negotiation thread at that point. + resp->set_remote_user_credentials(remote_user_.ToString()); + } + for (const inbound_call_map_t::value_type& entry : calls_being_handled_) { + InboundCall* c = entry.second; + c->DumpPB(req, resp->add_calls_in_flight()); + } + } else { + LOG(FATAL); + } + return Status::OK(); +} + +} // namespace rpc +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/connection.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h new file mode 100644 index 0000000..816c43c --- /dev/null +++ b/be/src/kudu/rpc/connection.h @@ -0,0 +1,360 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_CONNECTION_H +#define KUDU_RPC_CONNECTION_H + +#include <cstdint> +#include <limits> +#include <memory> +#include <set> +#include <string> +#include <unordered_map> +#include <vector> + +#include <boost/intrusive/list.hpp> +#include <ev++.h> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/inbound_call.h" +#include "kudu/rpc/outbound_call.h" +#include "kudu/rpc/remote_user.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/transfer.h" +#include "kudu/util/monotime.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" +#include "kudu/util/object_pool.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace rpc { + +class DumpRunningRpcsRequestPB; +class ErrorStatusPB; +class RpcConnectionPB; +class ReactorThread; +class RpczStore; +enum class CredentialsPolicy; + +// +// A connection between an endpoint and us. +// +// Inbound connections are created by AcceptorPools, which eventually schedule +// RegisterConnection() to be called from the reactor thread. +// +// Outbound connections are created by the Reactor thread in order to service +// outbound calls. +// +// Once a Connection is created, it can be used both for sending messages and +// receiving them, but any given connection is explicitly a client or server. +// If a pair of servers are making bidirectional RPCs, they will use two separate +// TCP connections (and Connection objects). +// +// This class is not fully thread-safe. It is accessed only from the context of a +// single ReactorThread except where otherwise specified. +// +class Connection : public RefCountedThreadSafe<Connection> { + public: + enum Direction { + // This host is sending calls via this connection. + CLIENT, + // This host is receiving calls via this connection. + SERVER + }; + + // Create a new Connection. + // reactor_thread: the reactor that owns us. + // remote: the address of the remote end + // socket: the socket to take ownership of. + // direction: whether we are the client or server side + Connection(ReactorThread *reactor_thread, + Sockaddr remote, + std::unique_ptr<Socket> socket, + Direction direction, + CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS); + + // Set underlying socket to non-blocking (or blocking) mode. + Status SetNonBlocking(bool enabled); + + // Register our socket with an epoll loop. We will only ever be registered in + // one epoll loop at a time. + void EpollRegister(ev::loop_ref& loop); + + ~Connection(); + + MonoTime last_activity_time() const { + return last_activity_time_; + } + + // Returns true if we are not in the process of receiving or sending a + // message, and we have no outstanding calls. + bool Idle() const; + + // Fail any calls which are currently queued or awaiting response. + // Prohibits any future calls (they will be failed immediately with this + // same Status). + void Shutdown(const Status& status, + std::unique_ptr<ErrorStatusPB> rpc_error = {}); + + // Queue a new call to be made. If the queueing fails, the call will be + // marked failed. + // Takes ownership of the 'call' object regardless of whether it succeeds or fails. + void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call); + + // Queue a call response back to the client on the server side. + // + // This may be called from a non-reactor thread. + void QueueResponseForCall(gscoped_ptr<InboundCall> call); + + // The address of the remote end of the connection. + const Sockaddr &remote() const { return remote_; } + + // Set the user credentials for an outbound connection. + void set_local_user_credentials(UserCredentials creds) { + DCHECK_EQ(direction_, CLIENT); + local_user_credentials_ = std::move(creds); + } + + // Get the user credentials which will be used to log in. + const UserCredentials& local_user_credentials() const { + DCHECK_EQ(direction_, CLIENT); + return local_user_credentials_; + } + + // Credentials policy to start connection negotiation. + CredentialsPolicy credentials_policy() const { return credentials_policy_; } + + // Whether the connection satisfies the specified credentials policy. + // + // NOTE: The policy is set prior to connection negotiation, and the actual + // authentication credentials used for connection negotiation might + // effectively make the connection to satisfy a stronger policy. + // An example: the credentials policy for the connection was set to + // ANY_CREDENTIALS, but since the authn token was not available + // at the time of negotiation, the primary credentials were used, making + // the connection de facto satisfying the PRIMARY_CREDENTIALS policy. + bool SatisfiesCredentialsPolicy(CredentialsPolicy policy) const; + + RpczStore* rpcz_store(); + + // libev callback when data is available to read. + void ReadHandler(ev::io &watcher, int revents); + + // libev callback when we may write to the socket. + void WriteHandler(ev::io &watcher, int revents); + + // Safe to be called from other threads. + std::string ToString() const; + + Direction direction() const { return direction_; } + + Socket* socket() { return socket_.get(); } + + // Go through the process of transferring control of the underlying socket back to the Reactor. + void CompleteNegotiation(Status negotiation_status, + std::unique_ptr<ErrorStatusPB> rpc_error); + + // Indicate that negotiation is complete and that the Reactor is now in control of the socket. + void MarkNegotiationComplete(); + + Status DumpPB(const DumpRunningRpcsRequestPB& req, + RpcConnectionPB* resp); + + ReactorThread* reactor_thread() const { return reactor_thread_; } + + std::unique_ptr<Socket> release_socket() { + return std::move(socket_); + } + + void adopt_socket(std::unique_ptr<Socket> socket) { + socket_ = std::move(socket); + } + + void set_remote_features(std::set<RpcFeatureFlag> remote_features) { + remote_features_ = std::move(remote_features); + } + + void set_remote_user(RemoteUser user) { + DCHECK_EQ(direction_, SERVER); + remote_user_ = std::move(user); + } + + const RemoteUser& remote_user() const { + DCHECK_EQ(direction_, SERVER); + return remote_user_; + } + + // Whether the connection is scheduled for shutdown. + bool scheduled_for_shutdown() const { + DCHECK_EQ(direction_, CLIENT); + return scheduled_for_shutdown_; + } + + // Mark the connection as scheduled to be shut down. Reactor does not dispatch + // new calls on such a connection. + void set_scheduled_for_shutdown() { + DCHECK_EQ(direction_, CLIENT); + scheduled_for_shutdown_ = true; + } + + private: + friend struct CallAwaitingResponse; + friend class QueueTransferTask; + friend struct ResponseTransferCallbacks; + + // A call which has been fully sent to the server, which we're waiting for + // the server to process. This is used on the client side only. + struct CallAwaitingResponse { + ~CallAwaitingResponse(); + + // Notification from libev that the call has timed out. + void HandleTimeout(ev::timer &watcher, int revents); + + Connection *conn; + std::shared_ptr<OutboundCall> call; + ev::timer timeout_timer; + + // We time out RPC calls in two stages. This is set to the amount of timeout + // remaining after the next timeout fires. See Connection::QueueOutboundCall(). + double remaining_timeout; + }; + + typedef std::unordered_map<uint64_t, CallAwaitingResponse*> car_map_t; + typedef std::unordered_map<uint64_t, InboundCall*> inbound_call_map_t; + + // Returns the next valid (positive) sequential call ID by incrementing a counter + // and ensuring we roll over from INT32_MAX to 0. + // Negative numbers are reserved for special purposes. + int32_t GetNextCallId() { + int32_t call_id = next_call_id_; + if (PREDICT_FALSE(next_call_id_ == std::numeric_limits<int32_t>::max())) { + next_call_id_ = 0; + } else { + next_call_id_++; + } + return call_id; + } + + // An incoming packet has completed transferring on the server side. + // This parses the call and delivers it into the call queue. + void HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer); + + // An incoming packet has completed on the client side. This parses the + // call response, looks up the CallAwaitingResponse, and calls the + // client callback. + void HandleCallResponse(gscoped_ptr<InboundTransfer> transfer); + + // The given CallAwaitingResponse has elapsed its user-defined timeout. + // Set it to Failed. + void HandleOutboundCallTimeout(CallAwaitingResponse *car); + + // Queue a transfer for sending on this connection. + // We will take ownership of the transfer. + // This must be called from the reactor thread. + void QueueOutbound(gscoped_ptr<OutboundTransfer> transfer); + + // The reactor thread that created this connection. + ReactorThread* const reactor_thread_; + + // The remote address we're talking to. + const Sockaddr remote_; + + // The socket we're communicating on. + std::unique_ptr<Socket> socket_; + + // The credentials of the user operating on this connection (if a client user). + UserCredentials local_user_credentials_; + + // The authenticated remote user (if this is an inbound connection on the server). + RemoteUser remote_user_; + + // whether we are client or server + Direction direction_; + + // The last time we read or wrote from the socket. + MonoTime last_activity_time_; + + // the inbound transfer, if any + gscoped_ptr<InboundTransfer> inbound_; + + // notifies us when our socket is writable. + ev::io write_io_; + + // notifies us when our socket is readable. + ev::io read_io_; + + // Set to true when the connection is registered on a loop. + // This is used for a sanity check in the destructor that we are properly + // un-registered before shutting down. + bool is_epoll_registered_; + + // waiting to be sent + boost::intrusive::list<OutboundTransfer> outbound_transfers_; // NOLINT(*) + + // Calls which have been sent and are now waiting for a response. + car_map_t awaiting_response_; + + // Calls which have been received on the server and are currently + // being handled. + inbound_call_map_t calls_being_handled_; + + // the next call ID to use + int32_t next_call_id_; + + // Starts as Status::OK, gets set to a shutdown status upon Shutdown(). + Status shutdown_status_; + + // Temporary vector used when serializing - avoids an allocation + // when serializing calls. + std::vector<Slice> slices_tmp_; + + // RPC features supported by the remote end of the connection. + std::set<RpcFeatureFlag> remote_features_; + + // Pool from which CallAwaitingResponse objects are allocated. + // Also a funny name. + ObjectPool<CallAwaitingResponse> car_pool_; + typedef ObjectPool<CallAwaitingResponse>::scoped_ptr scoped_car; + + // The credentials policy to use for connection negotiation. It defines which + // type of user credentials used to negotiate a connection. The actual type of + // credentials used for authentication during the negotiation process depends + // on the credentials availability, but the result credentials guaranteed to + // always satisfy the specified credentials policy. In other words, the actual + // type of credentials used for connection negotiation might effectively make + // the connection to satisfy a stronger/narrower policy. + // + // An example: + // The credentials policy for the connection was set to ANY_CREDENTIALS, + // but since no secondary credentials (such authn token) were available + // at the time of negotiation, the primary credentials were used,making the + // connection satisfying the PRIMARY_CREDENTIALS policy de facto. + const CredentialsPolicy credentials_policy_; + + // Whether we completed connection negotiation. + bool negotiation_complete_; + + // Whether the connection is scheduled for shutdown. + bool scheduled_for_shutdown_; +}; + +} // namespace rpc +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/constants.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/constants.cc b/be/src/kudu/rpc/constants.cc new file mode 100644 index 0000000..bcf9985 --- /dev/null +++ b/be/src/kudu/rpc/constants.cc @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/constants.h" + +using std::set; + +namespace kudu { +namespace rpc { + +const char* const kMagicNumber = "hrpc"; +const char* const kSaslAppName = "kudu"; +const char* const kSaslProtoName = "kudu"; + +// NOTE: the TLS flag is dynamically added based on the local encryption +// configuration. +// +// NOTE: the TLS_AUTHENTICATION_ONLY flag is dynamically added on both +// sides based on the remote peer's address. +set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS }; +set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS }; + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/constants.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/constants.h b/be/src/kudu/rpc/constants.h new file mode 100644 index 0000000..e179ddc --- /dev/null +++ b/be/src/kudu/rpc/constants.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_RPC_CONSTANTS_H +#define KUDU_RPC_RPC_CONSTANTS_H + +#include <cstdint> +#include <set> + +#include "kudu/rpc/rpc_header.pb.h" + +namespace kudu { +namespace rpc { + +// Magic number bytes sent at connection setup time. +extern const char* const kMagicNumber; + +// App name for SASL library init +extern const char* const kSaslAppName; + +// Network protocol name for SASL library init +extern const char* const kSaslProtoName; + +// Current version of the RPC protocol. +static const uint32_t kCurrentRpcVersion = 9; + +// From Hadoop. +static const int32_t kInvalidCallId = -2; +static const int32_t kConnectionContextCallId = -3; +static const int32_t kNegotiateCallId = -33; + +static const uint8_t kMagicNumberLength = 4; +static const uint8_t kHeaderFlagsLength = 3; + +// There is a 4-byte length prefix before any packet. +static const uint8_t kMsgLengthPrefixLength = 4; + +// The set of RPC features that this server build supports. +// Non-const for testing. +extern std::set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags; + +// The set of RPC features that this client build supports. +// Non-const for testing. +extern std::set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags; + +} // namespace rpc +} // namespace kudu + +#endif // KUDU_RPC_RPC_CONSTANTS_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/exactly_once_rpc-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/exactly_once_rpc-test.cc b/be/src/kudu/rpc/exactly_once_rpc-test.cc new file mode 100644 index 0000000..388919d --- /dev/null +++ b/be/src/kudu/rpc/exactly_once_rpc-test.cc @@ -0,0 +1,589 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/retriable_rpc.h" +#include "kudu/rpc/rpc-test-base.h" +#include "kudu/rpc/rpc.h" +#include "kudu/util/pb_util.h" + +DECLARE_int64(remember_clients_ttl_ms); +DECLARE_int64(remember_responses_ttl_ms); +DECLARE_int64(result_tracker_gc_interval_ms); + +using std::atomic_int; +using std::shared_ptr; +using std::unique_ptr; + +namespace kudu { +namespace rpc { + +namespace { + +const char* kClientId = "test-client"; + +void AddRequestId(RpcController* controller, + const std::string& client_id, + ResultTracker::SequenceNumber sequence_number, + int64_t attempt_no) { + unique_ptr<RequestIdPB> request_id(new RequestIdPB()); + request_id->set_client_id(client_id); + request_id->set_seq_no(sequence_number); + request_id->set_attempt_no(attempt_no); + request_id->set_first_incomplete_seq_no(sequence_number); + controller->SetRequestIdPB(std::move(request_id)); +} + +class TestServerPicker : public ServerPicker<CalculatorServiceProxy> { + public: + explicit TestServerPicker(CalculatorServiceProxy* proxy) : proxy_(proxy) {} + + void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override { + callback.Run(Status::OK(), proxy_); + } + + void MarkServerFailed(CalculatorServiceProxy*, const Status&) override {} + void MarkReplicaNotLeader(CalculatorServiceProxy*) override {} + void MarkResourceNotFound(CalculatorServiceProxy*) override {} + + private: + CalculatorServiceProxy* proxy_; +}; + +} // anonymous namespace + +class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy, + ExactlyOnceRequestPB, + ExactlyOnceResponsePB> { + public: + CalculatorServiceRpc(const scoped_refptr<TestServerPicker>& server_picker, + const scoped_refptr<RequestTracker>& request_tracker, + const MonoTime& deadline, + shared_ptr<Messenger> messenger, + int value, + CountDownLatch* latch, + int server_sleep = 0) + : RetriableRpc(server_picker, request_tracker, deadline, std::move(messenger)), + latch_(latch) { + req_.set_value_to_add(value); + req_.set_randomly_fail(true); + req_.set_sleep_for_ms(server_sleep); + } + + void Try(CalculatorServiceProxy* server, const ResponseCallback& callback) override { + server->AddExactlyOnceAsync(req_, + &resp_, + mutable_retrier()->mutable_controller(), + callback); + } + + RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override { + // We shouldn't get errors from the server/rpc system since we set a high timeout. + CHECK_OK(rpc_cb_status); + + if (!mutable_retrier()->controller().status().ok()) { + CHECK(mutable_retrier()->controller().status().IsRemoteError()); + if (mutable_retrier()->controller().error_response()->code() + == ErrorStatusPB::ERROR_REQUEST_STALE) { + return { RetriableRpcStatus::NON_RETRIABLE_ERROR, + mutable_retrier()->controller().status() }; + } + return { RetriableRpcStatus::SERVICE_UNAVAILABLE, + mutable_retrier()->controller().status() }; + } + + // If the controller is not finished we're in the ReplicaFoundCb() callback. + // Return ok to proceed with the call to the server. + if (!mutable_retrier()->mutable_controller()->finished()) { + return { RetriableRpcStatus::OK, Status::OK() }; + } + + // If we've received a response in the past, all following responses must + // match. + if (!successful_response_.IsInitialized()) { + successful_response_.CopyFrom(resp_); + } else { + CHECK_EQ(SecureDebugString(successful_response_), + SecureDebugString(resp_)); + } + + if (sometimes_retry_successful_) { + // Still report errors, with some probability. This will cause requests to + // be retried. Since the requests were originally successful we should get + // the same reply back. + int random = rand() % 4; + switch (random) { + case 0: + return { RetriableRpcStatus::SERVICE_UNAVAILABLE, + Status::RemoteError("") }; + case 1: + return { RetriableRpcStatus::RESOURCE_NOT_FOUND, + Status::RemoteError("") }; + case 2: + return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE, + Status::RemoteError("") }; + case 3: + return { RetriableRpcStatus::OK, Status::OK() }; + default: LOG(FATAL) << "Unexpected value"; + } + } + return { RetriableRpcStatus::OK, Status::OK() }; + } + + void Finish(const Status& status) override { + CHECK_OK(status); + latch_->CountDown(); + delete this; + } + + std::string ToString() const override { return "test-rpc"; } + CountDownLatch* latch_; + ExactlyOnceResponsePB successful_response_; + bool sometimes_retry_successful_ = true; +}; + +class ExactlyOnceRpcTest : public RpcTestBase { + public: + void SetUp() override { + RpcTestBase::SetUp(); + SeedRandom(); + } + + void StartServer() { + // Set up server. + StartTestServerWithGeneratedCode(&server_addr_); + client_messenger_ = CreateMessenger("Client"); + proxy_.reset(new CalculatorServiceProxy(client_messenger_, server_addr_)); + test_picker_.reset(new TestServerPicker(proxy_.get())); + request_tracker_.reset(new RequestTracker(kClientId)); + attempt_nos_ = 0; + } + + // An exactly once adder that uses RetriableRpc to perform the requests. + struct RetriableRpcExactlyOnceAdder { + RetriableRpcExactlyOnceAdder(const scoped_refptr<TestServerPicker>& server_picker, + const scoped_refptr<RequestTracker>& request_tracker, + shared_ptr<Messenger> messenger, + int value, + int server_sleep = 0) : latch_(1) { + MonoTime now = MonoTime::Now(); + now.AddDelta(MonoDelta::FromMilliseconds(10000)); + rpc_ = new CalculatorServiceRpc(server_picker, + request_tracker, + now, + std::move(messenger), + value, + &latch_, + server_sleep); + } + + void Start() { + CHECK_OK(kudu::Thread::Create( + "test", + "test", + &RetriableRpcExactlyOnceAdder::SleepAndSend, this, &thread)); + } + + void SleepAndSend() { + rpc_->SendRpc(); + latch_.Wait(); + } + + CountDownLatch latch_; + scoped_refptr<kudu::Thread> thread; + CalculatorServiceRpc* rpc_; + }; + + // An exactly once adder that sends multiple, simultaneous calls, to the server + // and makes sure that only one of the calls was successful. + struct SimultaneousExactlyOnceAdder { + SimultaneousExactlyOnceAdder(CalculatorServiceProxy* p, + ResultTracker::SequenceNumber sequence_number, + int value, + uint64_t client_sleep, + uint64_t server_sleep, + int64_t attempt_no) + : proxy(p), + client_sleep_for_ms(client_sleep) { + req.set_value_to_add(value); + req.set_sleep_for_ms(server_sleep); + AddRequestId(&controller, kClientId, sequence_number, attempt_no); + } + + void Start() { + CHECK_OK(kudu::Thread::Create( + "test", + "test", + &SimultaneousExactlyOnceAdder::SleepAndSend, this, &thread)); + } + + // Sleeps the preset number of msecs before sending the call. + void SleepAndSend() { + usleep(client_sleep_for_ms * 1000); + controller.set_timeout(MonoDelta::FromSeconds(20)); + CHECK_OK(proxy->AddExactlyOnce(req, &resp, &controller)); + } + + CalculatorServiceProxy* const proxy; + const uint64_t client_sleep_for_ms; + RpcController controller; + ExactlyOnceRequestPB req; + ExactlyOnceResponsePB resp; + scoped_refptr<kudu::Thread> thread; + }; + + + void CheckValueMatches(int expected_value) { + RpcController controller; + ExactlyOnceRequestPB req; + req.set_value_to_add(0); + ExactlyOnceResponsePB resp; + RequestTracker::SequenceNumber seq_no; + CHECK_OK(request_tracker_->NewSeqNo(&seq_no)); + AddRequestId(&controller, kClientId, seq_no, 0); + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller)); + ASSERT_EQ(resp.current_val(), expected_value); + request_tracker_->RpcCompleted(seq_no); + } + + + // This continuously issues calls to the server, that often last longer than + // 'remember_responses_ttl_ms', making sure that we don't get errors back. + void DoLongWritesThread(MonoDelta run_for) { + MonoTime run_until = MonoTime::Now(); + run_until.AddDelta(run_for); + int counter = 0; + while (MonoTime::Now() < run_until) { + unique_ptr<RetriableRpcExactlyOnceAdder> adder(new RetriableRpcExactlyOnceAdder( + test_picker_, request_tracker_, client_messenger_, 1, + rand() % (2 * FLAGS_remember_responses_ttl_ms))); + + // This thread is used in the stress test where we're constantly running GC. + // So, once we get a "success" response, it's likely that the result will be + // GCed on the server side, and thus it's not safe to spuriously retry. + adder->rpc_->sometimes_retry_successful_ = false; + adder->SleepAndSend(); + SleepFor(MonoDelta::FromMilliseconds(rand() % 10)); + counter++; + } + ExactlyOnceResponsePB response; + ResultTracker::SequenceNumber sequence_number; + CHECK_OK(request_tracker_->NewSeqNo(&sequence_number)); + CHECK_OK(MakeAddCall(sequence_number, 0, &response)); + CHECK_EQ(response.current_val(), counter); + request_tracker_->RpcCompleted(sequence_number); + } + + // Stubbornly sends the same request to the server, this should observe three states. + // The request should be successful at first, then its result should be GCed and the + // client should be GCed. + void StubbornlyWriteTheSameRequestThread(ResultTracker::SequenceNumber sequence_number, + MonoDelta run_for) { + MonoTime run_until = MonoTime::Now(); + run_until.AddDelta(run_for); + // Make an initial request, so that we get a response to compare to. + ExactlyOnceResponsePB original_response; + CHECK_OK(MakeAddCall(sequence_number, 0, &original_response)); + + // Now repeat the same request. At first we should get the same response, then the result + // should be GCed and we should get STALE back. Finally the request should succeed again + // but we should get a new response. + bool result_gced = false; + bool client_gced = false; + while (MonoTime::Now() < run_until) { + ExactlyOnceResponsePB response; + Status s = MakeAddCall(sequence_number, 0, &response); + if (s.ok()) { + if (!result_gced) { + CHECK_EQ(SecureDebugString(response), SecureDebugString(original_response)); + } else { + client_gced = true; + CHECK_NE(SecureDebugString(response), SecureDebugString(original_response)); + } + SleepFor(MonoDelta::FromMilliseconds(rand() % 10)); + } else if (s.IsRemoteError()) { + result_gced = true; + SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms * 2)); + } + } + CHECK(result_gced); + CHECK(client_gced); + } + + Status MakeAddCall(ResultTracker::SequenceNumber sequence_number, + int value_to_add, + ExactlyOnceResponsePB* response, + int attempt_no = -1) { + RpcController controller; + ExactlyOnceRequestPB req; + req.set_value_to_add(value_to_add); + if (attempt_no == -1) attempt_no = attempt_nos_.fetch_add(1); + AddRequestId(&controller, kClientId, sequence_number, attempt_no); + Status s = proxy_->AddExactlyOnce(req, response, &controller); + return s; + } + + protected: + Sockaddr server_addr_; + atomic_int attempt_nos_; + shared_ptr<Messenger> client_messenger_; + std::unique_ptr<CalculatorServiceProxy> proxy_; + scoped_refptr<TestServerPicker> test_picker_; + scoped_refptr<RequestTracker> request_tracker_; +}; + +// Tests that we get exactly once semantics on RPCs when we send a bunch of requests with the +// same sequence number as previous requests. +TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsAfterRpcCompleted) { + StartServer(); + ExactlyOnceResponsePB original_resp; + int mem_consumption = mem_tracker_->consumption(); + { + RpcController controller; + ExactlyOnceRequestPB req; + req.set_value_to_add(1); + + // Assign id 0. + AddRequestId(&controller, kClientId, 0, 0); + + // Send the request the first time. + ASSERT_OK(proxy_->AddExactlyOnce(req, &original_resp, &controller)); + + // The incremental usage of a new client is the size of the response itself + // plus some fixed overhead for the client-tracking structure. + int expected_incremental_usage = original_resp.SpaceUsed() + 200; + + int mem_consumption_after = mem_tracker_->consumption(); + ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage); + mem_consumption = mem_consumption_after; + } + + // Now repeat the rpc 10 times, using the same sequence number, none of these should be executed + // and they should get the same response back. + for (int i = 0; i < 10; i++) { + RpcController controller; + controller.set_timeout(MonoDelta::FromSeconds(20)); + ExactlyOnceRequestPB req; + req.set_value_to_add(1); + ExactlyOnceResponsePB resp; + AddRequestId(&controller, kClientId, 0, i + 1); + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller)); + ASSERT_EQ(resp.current_val(), 1); + ASSERT_EQ(resp.current_time_micros(), original_resp.current_time_micros()); + // Sleep to give the MemTracker time to update -- we don't expect any update, + // but if we had a bug here, we'd only see it with this sleep. + SleepFor(MonoDelta::FromMilliseconds(100)); + // We shouldn't have consumed any more memory since the responses were cached. + ASSERT_EQ(mem_consumption, mem_tracker_->consumption()); + } + + // Making a new request, from a new client, should double the memory consumption. + { + RpcController controller; + ExactlyOnceRequestPB req; + ExactlyOnceResponsePB resp; + req.set_value_to_add(1); + + // Assign id 0. + AddRequestId(&controller, "test-client2", 0, 0); + + // Send the first request for this new client. + ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller)); + ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2); + } +} + +// Performs a series of requests in which each single request is attempted multiple times, as +// the server side is instructed to spuriously fail attempts. +// In CalculatorServiceRpc we sure that the same response is returned by all retries and, +// after all the rpcs are done, we make sure that final result is the expected one. +TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithReplicatedRpc) { + StartServer(); + int kNumIterations = 10; + int kNumRpcs = 10; + + if (AllowSlowTests()) { + kNumIterations = 100; + kNumRpcs = 100; + } + + int count = 0; + for (int i = 0; i < kNumIterations; i ++) { + vector<unique_ptr<RetriableRpcExactlyOnceAdder>> adders; + for (int j = 0; j < kNumRpcs; j++) { + unique_ptr<RetriableRpcExactlyOnceAdder> adder( + new RetriableRpcExactlyOnceAdder(test_picker_, request_tracker_, client_messenger_, j)); + adders.push_back(std::move(adder)); + adders[j]->Start(); + count += j; + } + for (int j = 0; j < kNumRpcs; j++) { + CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join()); + } + CheckValueMatches(count); + } +} + +// Performs a series of requests in which each single request is attempted by multiple threads. +// On each iteration, after all the threads complete, we expect that the add operation was +// executed exactly once. +TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) { + StartServer(); + int kNumIterations = 10; + int kNumThreads = 10; + + if (AllowSlowTests()) { + kNumIterations = 100; + kNumThreads = 100; + } + + ResultTracker::SequenceNumber sequence_number = 0; + int memory_consumption_initial = mem_tracker_->consumption(); + int single_response_size = 0; + + // Measure memory consumption for a single response from the same client. + ExactlyOnceResponsePB resp; + ASSERT_OK(MakeAddCall(sequence_number, 1, &resp)); + + for (int i = 1; i <= kNumIterations; i ++) { + vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders; + for (int j = 0; j < kNumThreads; j++) { + unique_ptr<SimultaneousExactlyOnceAdder> adder( + new SimultaneousExactlyOnceAdder(proxy_.get(), + i, // sequence number + 1, // value + rand() % 20, // client_sleep + rand() % 10, // server_sleep + attempt_nos_.fetch_add(1))); // attempt number + adders.push_back(std::move(adder)); + adders[j]->Start(); + } + uint64_t time_micros = 0; + for (int j = 0; j < kNumThreads; j++) { + CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join()); + ASSERT_EQ(adders[j]->resp.current_val(), i + 1); + if (time_micros == 0) { + time_micros = adders[j]->resp.current_time_micros(); + } else { + ASSERT_EQ(adders[j]->resp.current_time_micros(), time_micros); + } + } + + // After all adders finished we should at least the size of one more response. + // The actual size depends of multiple factors, for instance, how many calls were "attached" + // (which is timing dependent) so we can't be more precise than this. + ASSERT_GT(mem_tracker_->consumption(), + memory_consumption_initial + single_response_size * i); + } +} + +TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) { + FLAGS_remember_clients_ttl_ms = 500; + FLAGS_remember_responses_ttl_ms = 100; + + StartServer(); + + // Make a request. + ExactlyOnceResponsePB original; + ResultTracker::SequenceNumber sequence_number = 0; + ASSERT_OK(MakeAddCall(sequence_number, 1, &original)); + + // Making the same request again, should return the same response. + ExactlyOnceResponsePB resp; + ASSERT_OK(MakeAddCall(sequence_number, 1, &resp)); + ASSERT_EQ(SecureShortDebugString(original), SecureShortDebugString(resp)); + + // Now sleep for 'remember_responses_ttl_ms' and run GC, we should then + // get a STALE back. + SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_responses_ttl_ms)); + int64_t memory_consumption = mem_tracker_->consumption(); + result_tracker_->GCResults(); + ASSERT_LT(mem_tracker_->consumption(), memory_consumption); + + resp.Clear(); + Status s = MakeAddCall(sequence_number, 1, &resp); + ASSERT_TRUE(s.IsRemoteError()); + ASSERT_STR_CONTAINS(s.ToString(), "is stale"); + + // Sleep again, this time for 'remember_clients_ttl_ms' and run GC again. + // The request should be successful, but its response should be a new one. + SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms)); + memory_consumption = mem_tracker_->consumption(); + result_tracker_->GCResults(); + ASSERT_LT(mem_tracker_->consumption(), memory_consumption); + + resp.Clear(); + ASSERT_OK(MakeAddCall(sequence_number, 1, &resp)); + ASSERT_NE(SecureShortDebugString(resp), SecureShortDebugString(original)); +} + +// This test creates a thread continuously making requests to the server, some lasting longer +// than the GC period, at the same time it runs GC, making sure that the corresponding +// CompletionRecords/ClientStates are not deleted from underneath the ongoing requests. +// This also creates a thread that runs GC very frequently and another thread that sends the +// same request over and over and observes the possible states: request is ok, request is stale +// request is ok again (because the client was forgotten). +TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) { + FLAGS_remember_clients_ttl_ms = 100; + FLAGS_remember_responses_ttl_ms = 10; + FLAGS_result_tracker_gc_interval_ms = 10; + + StartServer(); + + // The write thread runs for a shorter period to make sure client GC has a + // chance to run. + MonoDelta writes_run_for = MonoDelta::FromSeconds(2); + MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3); + if (AllowSlowTests()) { + writes_run_for = MonoDelta::FromSeconds(10); + stubborn_run_for = MonoDelta::FromSeconds(11); + } + + result_tracker_->StartGCThread(); + + // Assign the first sequence number (0) to the 'stubborn writes' thread. + // This thread will keep making RPCs with this sequence number while + // the 'write_thread' will make normal requests with increasing sequence + // numbers. + ResultTracker::SequenceNumber stubborn_req_seq_num; + CHECK_OK(request_tracker_->NewSeqNo(&stubborn_req_seq_num)); + ASSERT_EQ(stubborn_req_seq_num, 0); + + scoped_refptr<kudu::Thread> stubborn_thread; + CHECK_OK(kudu::Thread::Create( + "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread, + this, stubborn_req_seq_num, stubborn_run_for, &stubborn_thread)); + + scoped_refptr<kudu::Thread> write_thread; + CHECK_OK(kudu::Thread::Create( + "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread, + this, writes_run_for, &write_thread)); + + write_thread->Join(); + stubborn_thread->Join(); + + // Within a few seconds, the consumption should be back to zero. + // Really, this should be within 100ms, but we'll give it a bit of + // time to avoid test flakiness. + AssertEventually([&]() { + ASSERT_EQ(0, mem_tracker_->consumption()); + }, MonoDelta::FromSeconds(5)); + NO_PENDING_FATALS(); +} + + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/inbound_call.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/inbound_call.cc b/be/src/kudu/rpc/inbound_call.cc new file mode 100644 index 0000000..aba9977 --- /dev/null +++ b/be/src/kudu/rpc/inbound_call.cc @@ -0,0 +1,322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/inbound_call.h" + +#include <glog/stl_logging.h> +#include <memory> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/connection.h" +#include "kudu/rpc/rpc_introspection.pb.h" +#include "kudu/rpc/rpc_sidecar.h" +#include "kudu/rpc/rpcz_store.h" +#include "kudu/rpc/serialization.h" +#include "kudu/rpc/service_if.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/metrics.h" +#include "kudu/util/trace.h" + +using google::protobuf::FieldDescriptor; +using google::protobuf::io::CodedOutputStream; +using google::protobuf::MessageLite; +using std::unique_ptr; +using std::vector; +using strings::Substitute; + +namespace kudu { +namespace rpc { + +InboundCall::InboundCall(Connection* conn) + : conn_(conn), + trace_(new Trace), + method_info_(nullptr) { + RecordCallReceived(); +} + +InboundCall::~InboundCall() {} + +Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) { + TRACE_EVENT_FLOW_BEGIN0("rpc", "InboundCall", this); + TRACE_EVENT0("rpc", "InboundCall::ParseFrom"); + RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, &serialized_request_)); + + // Adopt the service/method info from the header as soon as it's available. + if (PREDICT_FALSE(!header_.has_remote_method())) { + return Status::Corruption("Non-connection context request header must specify remote_method"); + } + if (PREDICT_FALSE(!header_.remote_method().IsInitialized())) { + return Status::Corruption("remote_method in request header is not initialized", + header_.remote_method().InitializationErrorString()); + } + remote_method_.FromPB(header_.remote_method()); + + if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) { + return Status::Corruption(strings::Substitute( + "Received $0 additional payload slices, expected at most %d", + header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars)); + } + + RETURN_NOT_OK(RpcSidecar::ParseSidecars( + header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_)); + if (header_.sidecar_offsets_size() > 0) { + // Trim the request to just the message + serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0)); + } + + // Retain the buffer that we have a view into. + transfer_.swap(transfer); + return Status::OK(); +} + +void InboundCall::RespondSuccess(const MessageLite& response) { + TRACE_EVENT0("rpc", "InboundCall::RespondSuccess"); + Respond(response, true); +} + +void InboundCall::RespondUnsupportedFeature(const vector<uint32_t>& unsupported_features) { + TRACE_EVENT0("rpc", "InboundCall::RespondUnsupportedFeature"); + ErrorStatusPB err; + err.set_message("unsupported feature flags"); + err.set_code(ErrorStatusPB::ERROR_INVALID_REQUEST); + for (uint32_t feature : unsupported_features) { + err.add_unsupported_feature_flags(feature); + } + + Respond(err, false); +} + +void InboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code, + const Status& status) { + TRACE_EVENT0("rpc", "InboundCall::RespondFailure"); + ErrorStatusPB err; + err.set_message(status.ToString()); + err.set_code(error_code); + + Respond(err, false); +} + +void InboundCall::RespondApplicationError(int error_ext_id, const std::string& message, + const MessageLite& app_error_pb) { + ErrorStatusPB err; + ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err); + Respond(err, false); +} + +void InboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message, + const google::protobuf::MessageLite& app_error_pb, + ErrorStatusPB* err) { + err->set_message(message); + const FieldDescriptor* app_error_field = + err->GetReflection()->FindKnownExtensionByNumber(error_ext_id); + if (app_error_field != nullptr) { + err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb); + } else { + LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id + << " (message=" << message << ")"; + } +} + +void InboundCall::Respond(const MessageLite& response, + bool is_success) { + TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this); + SerializeResponseBuffer(response, is_success); + + TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this, + "method", remote_method_.method_name()); + TRACE_TO(trace_, "Queueing $0 response", is_success ? "success" : "failure"); + RecordHandlingCompleted(); + conn_->rpcz_store()->AddCall(this); + conn_->QueueResponseForCall(gscoped_ptr<InboundCall>(this)); +} + +void InboundCall::SerializeResponseBuffer(const MessageLite& response, + bool is_success) { + if (PREDICT_FALSE(!response.IsInitialized())) { + LOG(ERROR) << "Invalid RPC response for " << ToString() + << ": protobuf missing required fields: " + << response.InitializationErrorString(); + // Send it along anyway -- the client will also notice the missing fields + // and produce an error on the other side, but this will at least + // make it clear on both sides of the RPC connection what kind of error + // happened. + } + + uint32_t protobuf_msg_size = response.ByteSize(); + + ResponseHeader resp_hdr; + resp_hdr.set_call_id(header_.call_id()); + resp_hdr.set_is_error(!is_success); + uint32_t absolute_sidecar_offset = protobuf_msg_size; + for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) { + resp_hdr.add_sidecar_offsets(absolute_sidecar_offset); + absolute_sidecar_offset += car->AsSlice().size(); + } + + int additional_size = absolute_sidecar_offset - protobuf_msg_size; + serialization::SerializeMessage(response, &response_msg_buf_, + additional_size, true); + int main_msg_size = additional_size + response_msg_buf_.size(); + serialization::SerializeHeader(resp_hdr, main_msg_size, + &response_hdr_buf_); +} + +void InboundCall::SerializeResponseTo(vector<Slice>* slices) const { + TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo"); + CHECK_GT(response_hdr_buf_.size(), 0); + CHECK_GT(response_msg_buf_.size(), 0); + slices->reserve(slices->size() + 2 + outbound_sidecars_.size()); + slices->push_back(Slice(response_hdr_buf_)); + slices->push_back(Slice(response_msg_buf_)); + for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) { + slices->push_back(car->AsSlice()); + } +} + +Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) { + // Check that the number of sidecars does not exceed the number of payload + // slices that are free (two are used up by the header and main message + // protobufs). + if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) { + return Status::ServiceUnavailable("All available sidecars already used"); + } + outbound_sidecars_.emplace_back(std::move(car)); + *idx = outbound_sidecars_.size() - 1; + return Status::OK(); +} + +string InboundCall::ToString() const { + if (header_.has_request_id()) { + return Substitute("Call $0 from $1 (ReqId={client: $2, seq_no=$3, attempt_no=$4})", + remote_method_.ToString(), + conn_->remote().ToString(), + header_.request_id().client_id(), + header_.request_id().seq_no(), + header_.request_id().attempt_no()); + } + return Substitute("Call $0 from $1 (request call id $2)", + remote_method_.ToString(), + conn_->remote().ToString(), + header_.call_id()); +} + +void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req, + RpcCallInProgressPB* resp) { + resp->mutable_header()->CopyFrom(header_); + if (req.include_traces() && trace_) { + resp->set_trace_buffer(trace_->DumpToString()); + } + resp->set_micros_elapsed((MonoTime::Now() - timing_.time_received) + .ToMicroseconds()); +} + +const RemoteUser& InboundCall::remote_user() const { + return conn_->remote_user(); +} + +const Sockaddr& InboundCall::remote_address() const { + return conn_->remote(); +} + +const scoped_refptr<Connection>& InboundCall::connection() const { + return conn_; +} + +Trace* InboundCall::trace() { + return trace_.get(); +} + +void InboundCall::RecordCallReceived() { + TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this); + DCHECK(!timing_.time_received.Initialized()); // Protect against multiple calls. + timing_.time_received = MonoTime::Now(); +} + +void InboundCall::RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time) { + DCHECK(incoming_queue_time != nullptr); + DCHECK(!timing_.time_handled.Initialized()); // Protect against multiple calls. + timing_.time_handled = MonoTime::Now(); + incoming_queue_time->Increment( + (timing_.time_handled - timing_.time_received).ToMicroseconds()); +} + +void InboundCall::RecordHandlingCompleted() { + DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple calls. + timing_.time_completed = MonoTime::Now(); + + if (!timing_.time_handled.Initialized()) { + // Sometimes we respond to a call before we begin handling it (e.g. due to queue + // overflow, etc). These cases should not be counted against the histogram. + return; + } + + if (method_info_) { + method_info_->handler_latency_histogram->Increment( + (timing_.time_completed - timing_.time_handled).ToMicroseconds()); + } +} + +bool InboundCall::ClientTimedOut() const { + if (!header_.has_timeout_millis() || header_.timeout_millis() == 0) { + return false; + } + + MonoTime now = MonoTime::Now(); + int total_time = (now - timing_.time_received).ToMilliseconds(); + return total_time > header_.timeout_millis(); +} + +MonoTime InboundCall::GetClientDeadline() const { + if (!header_.has_timeout_millis() || header_.timeout_millis() == 0) { + return MonoTime::Max(); + } + return timing_.time_received + MonoDelta::FromMilliseconds(header_.timeout_millis()); +} + +MonoTime InboundCall::GetTimeReceived() const { + return timing_.time_received; +} + +vector<uint32_t> InboundCall::GetRequiredFeatures() const { + vector<uint32_t> features; + for (uint32_t feature : header_.required_feature_flags()) { + features.push_back(feature); + } + return features; +} + +Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const { + DCHECK(transfer_) << "Sidecars have been discarded"; + if (idx < 0 || idx >= header_.sidecar_offsets_size()) { + return Status::InvalidArgument(strings::Substitute( + "Index $0 does not reference a valid sidecar", idx)); + } + *sidecar = inbound_sidecar_slices_[idx]; + return Status::OK(); +} + +void InboundCall::DiscardTransfer() { + transfer_.reset(); +} + +size_t InboundCall::GetTransferSize() { + if (!transfer_) return 0; + return transfer_->data().size(); +} + +} // namespace rpc +} // namespace kudu
