http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_method.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/remote_method.h b/be/src/kudu/rpc/remote_method.h new file mode 100644 index 0000000..5b78dad --- /dev/null +++ b/be/src/kudu/rpc/remote_method.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_REMOTE_METHOD_H_ +#define KUDU_RPC_REMOTE_METHOD_H_ + +#include <string> + +namespace kudu { +namespace rpc { + +class RemoteMethodPB; + +// Simple class that acts as a container for a fully qualified remote RPC name +// and converts to/from RemoteMethodPB. +// This class is also copyable and assignable for convenience reasons. +class RemoteMethod { + public: + RemoteMethod() {} + RemoteMethod(std::string service_name, const std::string method_name); + std::string service_name() const { return service_name_; } + std::string method_name() const { return method_name_; } + + // Encode/decode to/from 'pb'. + void FromPB(const RemoteMethodPB& pb); + void ToPB(RemoteMethodPB* pb) const; + + std::string ToString() const; + + private: + std::string service_name_; + std::string method_name_; +}; + +} // namespace rpc +} // namespace kudu + +#endif // KUDU_RPC_REMOTE_METHOD_H_
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_user.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/remote_user.cc b/be/src/kudu/rpc/remote_user.cc new file mode 100644 index 0000000..50e3fcd --- /dev/null +++ b/be/src/kudu/rpc/remote_user.cc @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/remote_user.h" + +#include <boost/optional.hpp> +#include <string> + +#include "kudu/gutil/strings/substitute.h" + +using std::string; + +namespace kudu { +namespace rpc { + +string RemoteUser::ToString() const { + string ret; + strings::SubstituteAndAppend(&ret, "{username='$0'", username_); + if (principal_) { + strings::SubstituteAndAppend(&ret, ", principal='$0'", *principal_); + } + ret.append("}"); + return ret; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_user.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/remote_user.h b/be/src/kudu/rpc/remote_user.h new file mode 100644 index 0000000..7dc0590 --- /dev/null +++ b/be/src/kudu/rpc/remote_user.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <string> + +#include <boost/optional.hpp> + +namespace kudu { +namespace rpc { + +// Server-side view of the remote authenticated user. +// +// This class may be read by multiple threads concurrently after +// its initialization during RPC negotiation. +class RemoteUser { + public: + // The method by which the remote user authenticated. + enum Method { + // No authentication (authentication was not required by the server + // and the user provided a username but it was not validated in any way) + UNAUTHENTICATED, + // Kerberos-authenticated. + KERBEROS, + // Authenticated by a Kudu authentication token. + AUTHN_TOKEN, + // Authenticated by a client certificate. + CLIENT_CERT + }; + + Method authenticated_by() const { + return authenticated_by_; + } + + const std::string& username() const { return username_; } + + boost::optional<std::string> principal() const { + return principal_; + } + + void SetAuthenticatedByKerberos(std::string username, + std::string principal) { + authenticated_by_ = KERBEROS; + username_ = std::move(username); + principal_ = std::move(principal); + } + + void SetUnauthenticated(std::string username) { + authenticated_by_ = UNAUTHENTICATED; + username_ = std::move(username); + principal_ = boost::none; + } + + void SetAuthenticatedByClientCert(std::string username, + boost::optional<std::string> principal) { + authenticated_by_ = CLIENT_CERT; + username_ = std::move(username); + principal_ = std::move(principal); + } + + void SetAuthenticatedByToken(std::string username) { + authenticated_by_ = AUTHN_TOKEN; + username_ = std::move(username); + principal_ = boost::none; + } + + // Returns a string representation of the object. + std::string ToString() const; + + private: + // The real username of the remote user. In the case of a Kerberos + // principal, this has already been mapped to a local username. + // TODO(todd): actually do the above mapping. + std::string username_; + + // The full principal of the remote user. This is only set in the + // case of a strong-authenticated user. + boost::optional<std::string> principal_; + + Method authenticated_by_ = UNAUTHENTICATED; +}; + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/request_tracker-test.cc b/be/src/kudu/rpc/request_tracker-test.cc new file mode 100644 index 0000000..89ea8a2 --- /dev/null +++ b/be/src/kudu/rpc/request_tracker-test.cc @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> +#include <vector> + +#include "kudu/rpc/request_tracker.h" +#include "kudu/util/test_util.h" + +using std::vector; + +namespace kudu { +namespace rpc { + +TEST(RequestTrackerTest, TestSequenceNumberGeneration) { + const int MAX = 10; + + scoped_refptr<RequestTracker> tracker_(new RequestTracker("test_client")); + + // A new tracker should have no incomplete RPCs + RequestTracker::SequenceNumber seq_no = tracker_->FirstIncomplete(); + ASSERT_EQ(seq_no, RequestTracker::NO_SEQ_NO); + + vector<RequestTracker::SequenceNumber> generated_seq_nos; + + // Generate MAX in flight RPCs, making sure they are correctly returned. + for (int i = 0; i < MAX; i++) { + ASSERT_OK(tracker_->NewSeqNo(&seq_no)); + generated_seq_nos.push_back(seq_no); + } + + // Now we should get a first incomplete. + ASSERT_EQ(generated_seq_nos[0], tracker_->FirstIncomplete()); + + // Marking 'first_incomplete' as done, should advance the first incomplete. + tracker_->RpcCompleted(tracker_->FirstIncomplete()); + + ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete()); + + // Marking a 'middle' rpc, should not advance 'first_incomplete'. + tracker_->RpcCompleted(generated_seq_nos[5]); + ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete()); + + // Marking half the rpc as complete should advance FirstIncomplete. + // Note that this also tests that RequestTracker::RpcCompleted() is idempotent, i.e. that + // marking the same sequence number as complete twice is a no-op. + for (int i = 0; i < MAX / 2; i++) { + tracker_->RpcCompleted(generated_seq_nos[i]); + } + + ASSERT_EQ(generated_seq_nos[6], tracker_->FirstIncomplete()); + + for (int i = MAX / 2; i <= MAX; i++) { + ASSERT_OK(tracker_->NewSeqNo(&seq_no)); + generated_seq_nos.push_back(seq_no); + } + + // Marking them all as completed should cause RequestTracker::FirstIncomplete() to return + // Status::NotFound() again. + for (auto seq_no : generated_seq_nos) { + tracker_->RpcCompleted(seq_no); + } + + ASSERT_EQ(tracker_->FirstIncomplete(), RequestTracker::NO_SEQ_NO); +} + +} // namespace rpc +} // namespace kudu + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/request_tracker.cc b/be/src/kudu/rpc/request_tracker.cc new file mode 100644 index 0000000..1958664 --- /dev/null +++ b/be/src/kudu/rpc/request_tracker.cc @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/request_tracker.h" + +#include <mutex> + +#include "kudu/gutil/map-util.h" + +namespace kudu { +namespace rpc { + +const RequestTracker::SequenceNumber RequestTracker::NO_SEQ_NO = -1; + +RequestTracker::RequestTracker(const string& client_id) + : client_id_(client_id), + next_(0) {} + +Status RequestTracker::NewSeqNo(SequenceNumber* seq_no) { + std::lock_guard<simple_spinlock> l(lock_); + *seq_no = next_; + InsertOrDie(&incomplete_rpcs_, *seq_no); + next_++; + return Status::OK(); +} + +RequestTracker::SequenceNumber RequestTracker::FirstIncomplete() { + std::lock_guard<simple_spinlock> l(lock_); + if (incomplete_rpcs_.empty()) return NO_SEQ_NO; + return *incomplete_rpcs_.begin(); +} + +void RequestTracker::RpcCompleted(const SequenceNumber& seq_no) { + std::lock_guard<simple_spinlock> l(lock_); + incomplete_rpcs_.erase(seq_no); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/request_tracker.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/request_tracker.h b/be/src/kudu/rpc/request_tracker.h new file mode 100644 index 0000000..99f8d6c --- /dev/null +++ b/be/src/kudu/rpc/request_tracker.h @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <set> +#include <string> + +#include "kudu/util/locks.h" +#include "kudu/util/status.h" + +namespace kudu { +namespace rpc { + +// RequestTracker implementation, inspired by: +// "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al. +// +// This generates sequence numbers for retriable RPCs and tracks the ongoing ones. +// The main point of this is to enable exactly-once semantics, i.e. making sure that +// an RPC is only executed once, by uniquely identifying each RPC that is sent to +// the server. +// +// Note that the sequence numbers here are differet from RPC 'call ids'. A call id +// uniquely identifies a call _to a server_. All calls have a call id that is +// assigned incrementally. Sequence numbers, on the other hand, uniquely identify +// the RPC operation itself. That is, if an RPC is retried on another server it will +// have a different call id, but the same sequence number. +// +// By keeping track of the RPCs that are in-flight and which ones are completed +// we can determine the first incomplete RPC. When this information is sent +// to the server it can use it to garbage collect RPC results that it might be +// saving for future retries, since it now knows there won't be any. +// +// This class is thread safe. +class RequestTracker : public RefCountedThreadSafe<RequestTracker> { + public: + typedef int64_t SequenceNumber; + static const RequestTracker::SequenceNumber NO_SEQ_NO; + explicit RequestTracker(const std::string& client_id); + + // Creates a new, unique, sequence number. + // Sequence numbers are assigned in increasing integer order. + // Returns Status::OK() and sets 'seq_no' if it was able to generate a sequence number + // or returns Status::ServiceUnavailable() if too many RPCs are in-flight, in which case + // the caller should try again later. + Status NewSeqNo(SequenceNumber* seq_no); + + // Returns the sequence number of the first incomplete RPC. + // If there is no incomplete RPC returns NO_SEQ_NO. + SequenceNumber FirstIncomplete(); + + // Marks the rpc with 'seq_no' as completed. + void RpcCompleted(const SequenceNumber& seq_no); + + // Returns the client id for this request tracker. + const std::string& client_id() { return client_id_; } + private: + // The client id for this request tracker. + const std::string client_id_; + + // Lock that protects all non-const fields. + simple_spinlock lock_; + + // The next sequence number. + SequenceNumber next_; + + // The (ordered) set of incomplete RPCs. + std::set<SequenceNumber> incomplete_rpcs_; +}; + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/response_callback.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/response_callback.h b/be/src/kudu/rpc/response_callback.h new file mode 100644 index 0000000..8c4fc03 --- /dev/null +++ b/be/src/kudu/rpc/response_callback.h @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_RESPONSE_CALLBACK_H +#define KUDU_RPC_RESPONSE_CALLBACK_H + +#include <boost/function.hpp> + +namespace kudu { +namespace rpc { + +typedef boost::function<void()> ResponseCallback; + +} +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/result_tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/result_tracker.cc b/be/src/kudu/rpc/result_tracker.cc new file mode 100644 index 0000000..11ff8d2 --- /dev/null +++ b/be/src/kudu/rpc/result_tracker.cc @@ -0,0 +1,582 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/result_tracker.h" + +#include <algorithm> +#include <limits> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/inbound_call.h" +#include "kudu/rpc/rpc_context.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/pb_util.h" +#include "kudu/util/trace.h" + +DEFINE_int64(remember_clients_ttl_ms, 3600 * 1000 /* 1 hour */, + "Maximum amount of time, in milliseconds, the server \"remembers\" a client for the " + "purpose of caching its responses. After this period without hearing from it, the " + "client is no longer remembered and the memory occupied by its responses is reclaimed. " + "Retries of requests older than 'remember_clients_ttl_ms' are treated as new " + "ones."); +TAG_FLAG(remember_clients_ttl_ms, advanced); + +DEFINE_int64(remember_responses_ttl_ms, 600 * 1000 /* 10 mins */, + "Maximum amount of time, in milliseconds, the server \"remembers\" a response to a " + "specific request for a client. After this period has elapsed, the response may have " + "been garbage collected and the client might get a response indicating the request is " + "STALE."); +TAG_FLAG(remember_responses_ttl_ms, advanced); + +DEFINE_int64(result_tracker_gc_interval_ms, 1000, + "Interval at which the result tracker will look for entries to GC."); +TAG_FLAG(result_tracker_gc_interval_ms, hidden); + +namespace kudu { +namespace rpc { + +using google::protobuf::Message; +using kudu::MemTracker; +using rpc::InboundCall; +using std::move; +using std::lock_guard; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using strings::Substitute; +using strings::SubstituteAndAppend; + +// This tracks the size changes of anything that has a memory_footprint() method. +// It must be instantiated before the updates, and it makes sure that the MemTracker +// is updated on scope exit. +template <class T> +struct ScopedMemTrackerUpdater { + ScopedMemTrackerUpdater(MemTracker* tracker_, const T* tracked_) + : tracker(tracker_), + tracked(tracked_), + memory_before(tracked->memory_footprint()), + cancelled(false) { + } + + ~ScopedMemTrackerUpdater() { + if (cancelled) return; + tracker->Release(memory_before - tracked->memory_footprint()); + } + + void Cancel() { + cancelled = true; + } + + MemTracker* tracker; + const T* tracked; + int64_t memory_before; + bool cancelled; +}; + +ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker) + : mem_tracker_(std::move(mem_tracker)), + clients_(ClientStateMap::key_compare(), + ClientStateMapAllocator(mem_tracker_)), + gc_thread_stop_latch_(1) {} + +ResultTracker::~ResultTracker() { + if (gc_thread_) { + gc_thread_stop_latch_.CountDown(); + gc_thread_->Join(); + } + + lock_guard<simple_spinlock> l(lock_); + // Release all the memory for the stuff we'll delete on destruction. + for (auto& client_state : clients_) { + client_state.second->GCCompletionRecords( + mem_tracker_, [] (SequenceNumber, CompletionRecord*){ return true; }); + mem_tracker_->Release(client_state.second->memory_footprint()); + } +} + +ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id, + Message* response, + RpcContext* context) { + lock_guard<simple_spinlock> l(lock_); + return TrackRpcUnlocked(request_id, response, context); +} + +ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& request_id, + Message* response, + RpcContext* context) { + ClientState* client_state = ComputeIfAbsent( + &clients_, + request_id.client_id(), + [&]{ + unique_ptr<ClientState> client_state(new ClientState(mem_tracker_)); + mem_tracker_->Consume(client_state->memory_footprint()); + client_state->stale_before_seq_no = request_id.first_incomplete_seq_no(); + return client_state; + })->get(); + + client_state->last_heard_from = MonoTime::Now(); + + // If the arriving request is older than our per-client GC watermark, report its + // staleness to the client. + if (PREDICT_FALSE(request_id.seq_no() < client_state->stale_before_seq_no)) { + if (context) { + context->call_->RespondFailure( + ErrorStatusPB::ERROR_REQUEST_STALE, + Status::Incomplete(Substitute("Request with id { $0 } is stale.", + SecureShortDebugString(request_id)))); + delete context; + } + return RpcState::STALE; + } + + // GC records according to the client's first incomplete watermark. + client_state->GCCompletionRecords( + mem_tracker_, + [&] (SequenceNumber seq_no, CompletionRecord* completion_record) { + return completion_record->state != RpcState::IN_PROGRESS && + seq_no < request_id.first_incomplete_seq_no(); + }); + + auto result = ComputeIfAbsentReturnAbsense( + &client_state->completion_records, + request_id.seq_no(), + [&]{ + unique_ptr<CompletionRecord> completion_record(new CompletionRecord( + RpcState::IN_PROGRESS, request_id.attempt_no())); + mem_tracker_->Consume(completion_record->memory_footprint()); + return completion_record; + }); + + CompletionRecord* completion_record = result.first->get(); + ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record); + + if (PREDICT_TRUE(result.second)) { + // When a follower is applying an operation it doesn't have a response yet, and it won't + // have a context, so only set them if they exist. + if (response != nullptr) { + completion_record->ongoing_rpcs.push_back({response, + DCHECK_NOTNULL(context), + request_id.attempt_no()}); + } + return RpcState::NEW; + } + + completion_record->last_updated = MonoTime::Now(); + switch (completion_record->state) { + case RpcState::COMPLETED: { + // If the RPC is COMPLETED and the request originates from a client (context, response are + // non-null) copy the response and reply immediately. If there is no context/response + // do nothing. + if (context != nullptr) { + DCHECK_NOTNULL(response)->CopyFrom(*completion_record->response); + context->call_->RespondSuccess(*response); + delete context; + } + return RpcState::COMPLETED; + } + case RpcState::IN_PROGRESS: { + // If the RPC is IN_PROGRESS check if there is a context and, if so, attach it + // so that the rpc gets the same response when the original one completes. + if (context != nullptr) { + completion_record->ongoing_rpcs.push_back({DCHECK_NOTNULL(response), + context, + NO_HANDLER}); + } + return RpcState::IN_PROGRESS; + } + default: + LOG(FATAL) << "Wrong state: " << completion_record->state; + // dummy return to avoid warnings + return RpcState::STALE; + } +} + +ResultTracker::RpcState ResultTracker::TrackRpcOrChangeDriver(const RequestIdPB& request_id) { + lock_guard<simple_spinlock> l(lock_); + RpcState state = TrackRpcUnlocked(request_id, nullptr, nullptr); + + if (state != RpcState::IN_PROGRESS) return state; + + CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id); + ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record); + + // ... if we did find a CompletionRecord change the driver and return true. + completion_record->driver_attempt_no = request_id.attempt_no(); + completion_record->ongoing_rpcs.push_back({nullptr, + nullptr, + request_id.attempt_no()}); + + // Since we changed the driver of the RPC, return NEW, so that the caller knows + // to store the result. + return RpcState::NEW; +} + +bool ResultTracker::IsCurrentDriver(const RequestIdPB& request_id) { + lock_guard<simple_spinlock> l(lock_); + CompletionRecord* completion_record = FindCompletionRecordOrNullUnlocked(request_id); + + // If we couldn't find the CompletionRecord, someone might have called FailAndRespond() so + // just return false. + if (completion_record == nullptr) return false; + + // ... if we did find a CompletionRecord return true if we're the driver or false + // otherwise. + return completion_record->driver_attempt_no == request_id.attempt_no(); +} + +void ResultTracker::LogAndTraceAndRespondSuccess(RpcContext* context, + const Message& msg) { + InboundCall* call = context->call_; + VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC success " + "response for " << call->ToString() << ":" << std::endl << SecureDebugString(msg); + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "response", pb_util::PbTracer::TracePb(msg), + "trace", context->trace()->DumpToString()); + call->RespondSuccess(msg); + delete context; +} + +void ResultTracker::LogAndTraceFailure(RpcContext* context, + const Message& msg) { + InboundCall* call = context->call_; + VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure " + "response for " << call->ToString() << ": " << SecureDebugString(msg); + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "response", pb_util::PbTracer::TracePb(msg), + "trace", context->trace()->DumpToString()); +} + +void ResultTracker::LogAndTraceFailure(RpcContext* context, + ErrorStatusPB_RpcErrorCodePB err, + const Status& status) { + InboundCall* call = context->call_; + VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure " + "response for " << call->ToString() << ": " << status.ToString(); + TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, + "status", status.ToString(), + "trace", context->trace()->DumpToString()); +} + +ResultTracker::CompletionRecord* ResultTracker::FindCompletionRecordOrDieUnlocked( + const RequestIdPB& request_id) { + ClientState* client_state = DCHECK_NOTNULL(FindPointeeOrNull(clients_, request_id.client_id())); + return DCHECK_NOTNULL(FindPointeeOrNull(client_state->completion_records, request_id.seq_no())); +} + +pair<ResultTracker::ClientState*, ResultTracker::CompletionRecord*> +ResultTracker::FindClientStateAndCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) { + ClientState* client_state = FindPointeeOrNull(clients_, request_id.client_id()); + CompletionRecord* completion_record = nullptr; + if (client_state != nullptr) { + completion_record = FindPointeeOrNull(client_state->completion_records, request_id.seq_no()); + } + return make_pair(client_state, completion_record); +} + +ResultTracker::CompletionRecord* +ResultTracker::FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) { + return FindClientStateAndCompletionRecordOrNullUnlocked(request_id).second; +} + +void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id, + const Message* response) { + vector<OnGoingRpcInfo> to_respond; + { + lock_guard<simple_spinlock> l(lock_); + + CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id); + ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record); + + CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no()) + << "Called RecordCompletionAndRespond() from an executor identified with an " + << "attempt number that was not marked as the driver for the RPC. RequestId: " + << SecureShortDebugString(request_id) << "\nTracker state:\n " << ToStringUnlocked(); + DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS); + completion_record->response.reset(DCHECK_NOTNULL(response)->New()); + completion_record->response->CopyFrom(*response); + completion_record->state = RpcState::COMPLETED; + completion_record->last_updated = MonoTime::Now(); + + CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no()); + + int64_t handler_attempt_no = request_id.attempt_no(); + + // Go through the ongoing RPCs and reply to each one. + for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin(); + orpc_iter != completion_record->ongoing_rpcs.rend();) { + + const OnGoingRpcInfo& ongoing_rpc = *orpc_iter; + if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) { + if (ongoing_rpc.context != nullptr) { + to_respond.push_back(ongoing_rpc); + } + ++orpc_iter; + orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator( + completion_record->ongoing_rpcs.erase(orpc_iter.base())); + } else { + ++orpc_iter; + } + } + } + + // Respond outside of holding the lock. This reduces lock contention and also + // means that we will have fully updated our memory tracking before responding, + // which makes testing easier. + for (auto& ongoing_rpc : to_respond) { + if (PREDICT_FALSE(ongoing_rpc.response != response)) { + ongoing_rpc.response->CopyFrom(*response); + } + LogAndTraceAndRespondSuccess(ongoing_rpc.context, *ongoing_rpc.response); + } +} + +void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id, + HandleOngoingRpcFunc func) { + vector<OnGoingRpcInfo> to_handle; + { + lock_guard<simple_spinlock> l(lock_); + auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id); + if (PREDICT_FALSE(state_and_record.first == nullptr)) { + LOG(FATAL) << "Couldn't find ClientState for request: " << SecureShortDebugString(request_id) + << ". \nTracker state:\n" << ToStringUnlocked(); + } + + CompletionRecord* completion_record = state_and_record.second; + + // It is possible for this method to be called for an RPC that was never actually + // tracked (though RecordCompletionAndRespond() can't). One such case is when a + // follower transaction fails on the TransactionManager, for some reason, before it + // was tracked. The CompletionCallback still calls this method. In this case, do + // nothing. + if (completion_record == nullptr) { + return; + } + + ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record); + completion_record->last_updated = MonoTime::Now(); + + int64_t seq_no = request_id.seq_no(); + int64_t handler_attempt_no = request_id.attempt_no(); + + // If we're copying from a client originated response we need to take care to reply + // to that call last, otherwise we'll lose 'response', before we go through all the + // CompletionRecords. + for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin(); + orpc_iter != completion_record->ongoing_rpcs.rend();) { + + const OnGoingRpcInfo& ongoing_rpc = *orpc_iter; + if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) { + to_handle.push_back(ongoing_rpc); + ++orpc_iter; + orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator( + completion_record->ongoing_rpcs.erase(orpc_iter.base())); + } else { + ++orpc_iter; + } + } + + // If we're the last ones trying this and the state is not completed, + // delete the completion record. + if (completion_record->ongoing_rpcs.size() == 0 + && completion_record->state != RpcState::COMPLETED) { + cr_updater.Cancel(); + unique_ptr<CompletionRecord> completion_record = + EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no); + mem_tracker_->Release(completion_record->memory_footprint()); + } + } + + // Wait until outside the lock to do the heavy-weight work. + for (auto& ongoing_rpc : to_handle) { + if (ongoing_rpc.context != nullptr) { + func(ongoing_rpc); + delete ongoing_rpc.context; + } + } +} + +void ResultTracker::FailAndRespond(const RequestIdPB& request_id, Message* response) { + auto func = [&](const OnGoingRpcInfo& ongoing_rpc) { + // In the common case RPCs are just executed once so, in that case, avoid an extra + // copy of the response. + if (PREDICT_FALSE(ongoing_rpc.response != response)) { + ongoing_rpc.response->CopyFrom(*response); + } + LogAndTraceFailure(ongoing_rpc.context, *response); + ongoing_rpc.context->call_->RespondSuccess(*response); + }; + FailAndRespondInternal(request_id, func); +} + +void ResultTracker::FailAndRespond(const RequestIdPB& request_id, + ErrorStatusPB_RpcErrorCodePB err, const Status& status) { + auto func = [&](const OnGoingRpcInfo& ongoing_rpc) { + LogAndTraceFailure(ongoing_rpc.context, err, status); + ongoing_rpc.context->call_->RespondFailure(err, status); + }; + FailAndRespondInternal(request_id, func); +} + +void ResultTracker::FailAndRespond(const RequestIdPB& request_id, + int error_ext_id, const string& message, + const Message& app_error_pb) { + auto func = [&](const OnGoingRpcInfo& ongoing_rpc) { + LogAndTraceFailure(ongoing_rpc.context, app_error_pb); + ongoing_rpc.context->call_->RespondApplicationError(error_ext_id, message, app_error_pb); + }; + FailAndRespondInternal(request_id, func); +} + +void ResultTracker::StartGCThread() { + CHECK(!gc_thread_); + CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread, + this, &gc_thread_)); +} + +void ResultTracker::RunGCThread() { + while (!gc_thread_stop_latch_.WaitFor(MonoDelta::FromMilliseconds( + FLAGS_result_tracker_gc_interval_ms))) { + GCResults(); + } +} + +void ResultTracker::GCResults() { + lock_guard<simple_spinlock> l(lock_); + MonoTime now = MonoTime::Now(); + // Calculate the instants before which we'll start GCing ClientStates and CompletionRecords. + MonoTime time_to_gc_clients_from = now; + time_to_gc_clients_from.AddDelta( + MonoDelta::FromMilliseconds(-FLAGS_remember_clients_ttl_ms)); + MonoTime time_to_gc_responses_from = now; + time_to_gc_responses_from.AddDelta( + MonoDelta::FromMilliseconds(-FLAGS_remember_responses_ttl_ms)); + + // Now go through the ClientStates. If we haven't heard from a client in a while + // GC it and all its completion records (making sure there isn't actually one in progress first). + // If we've heard from a client recently, but some of its responses are old, GC those responses. + for (auto iter = clients_.begin(); iter != clients_.end();) { + auto& client_state = iter->second; + if (client_state->last_heard_from < time_to_gc_clients_from) { + // Client should be GCed. + bool ongoing_request = false; + client_state->GCCompletionRecords( + mem_tracker_, + [&] (SequenceNumber, CompletionRecord* completion_record) { + if (PREDICT_FALSE(completion_record->state == RpcState::IN_PROGRESS)) { + ongoing_request = true; + return false; + } + return true; + }); + // Don't delete the client state if there is still a request in execution. + if (PREDICT_FALSE(ongoing_request)) { + ++iter; + continue; + } + mem_tracker_->Release(client_state->memory_footprint()); + iter = clients_.erase(iter); + } else { + // Client can't be GCed, but its calls might be GCable. + iter->second->GCCompletionRecords( + mem_tracker_, + [&] (SequenceNumber, CompletionRecord* completion_record) { + return completion_record->state != RpcState::IN_PROGRESS && + completion_record->last_updated < time_to_gc_responses_from; + }); + ++iter; + } + } +} + +string ResultTracker::ToString() { + lock_guard<simple_spinlock> l(lock_); + return ToStringUnlocked(); +} + +string ResultTracker::ToStringUnlocked() const { + string result = Substitute("ResultTracker[this: $0, Num. Client States: $1, Client States:\n", + this, clients_.size()); + for (auto& cs : clients_) { + SubstituteAndAppend(&result, Substitute("\n\tClient: $0, $1", cs.first, cs.second->ToString())); + } + result.append("]"); + return result; +} + +template<class MustGcRecordFunc> +void ResultTracker::ClientState::GCCompletionRecords( + const shared_ptr<kudu::MemTracker>& mem_tracker, + MustGcRecordFunc must_gc_record_func) { + ScopedMemTrackerUpdater<ClientState> updater(mem_tracker.get(), this); + for (auto iter = completion_records.begin(); iter != completion_records.end();) { + if (must_gc_record_func(iter->first, iter->second.get())) { + mem_tracker->Release(iter->second->memory_footprint()); + SequenceNumber deleted_seq_no = iter->first; + iter = completion_records.erase(iter); + // Each time we GC a response, update 'stale_before_seq_no'. + // This will allow to answer clients that their responses are stale if we get + // a request with a sequence number lower than or equal to this one. + stale_before_seq_no = std::max(deleted_seq_no + 1, stale_before_seq_no); + continue; + } + // Since we store completion records in order, if we found one that shouldn't be GCed, + // don't GC anything after it. + return; + } +} + +string ResultTracker::ClientState::ToString() const { + auto since_last_heard = + MonoTime::Now().GetDeltaSince(last_heard_from); + string result = Substitute("Client State[Last heard from: $0s ago, " + "$1 CompletionRecords:", + since_last_heard.ToString(), + completion_records.size()); + for (auto& completion_record : completion_records) { + SubstituteAndAppend(&result, Substitute("\n\tCompletion Record: $0, $1", + completion_record.first, + completion_record.second->ToString())); + } + result.append("\t]"); + return result; +} + +string ResultTracker::CompletionRecord::ToString() const { + string result = Substitute("Completion Record[State: $0, Driver: $1, " + "Cached response: $2, $3 OngoingRpcs:", + state, + driver_attempt_no, + response ? SecureShortDebugString(*response) : "None", + ongoing_rpcs.size()); + for (auto& orpc : ongoing_rpcs) { + SubstituteAndAppend(&result, Substitute("\n\t$0", orpc.ToString())); + } + result.append("\t\t]"); + return result; +} + +string ResultTracker::OnGoingRpcInfo::ToString() const { + return Substitute("OngoingRpc[Handler: $0, Context: $1, Response: $2]", + handler_attempt_no, context, + response ? SecureShortDebugString(*response) : "NULL"); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/result_tracker.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/result_tracker.h b/be/src/kudu/rpc/result_tracker.h new file mode 100644 index 0000000..f629d7a --- /dev/null +++ b/be/src/kudu/rpc/result_tracker.h @@ -0,0 +1,399 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <functional> +#include <map> +#include <string> +#include <utility> +#include <vector> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/rpc/request_tracker.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/locks.h" +#include "kudu/util/malloc.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/monotime.h" +#include "kudu/util/thread.h" + +namespace google { +namespace protobuf { +class Message; +} // protobuf +} // google + +namespace kudu { +namespace rpc { +class RpcContext; + +// A ResultTracker for RPC results. +// +// The ResultTracker is responsible for tracking the results of RPCs and making sure that +// client calls with the same client ID and sequence number (first attempt and subsequent retries) +// are executed exactly once. +// +// In most cases, the use of ResultTracker is internal to the RPC system: RPCs are tracked when +// they first arrive, before service methods are called, and calls to ResultTracker to store +// responses are performed internally by RpcContext. The exception is when an RPC is replicated +// across multiple servers, such as with writes, in which case direct interaction with the result +// tracker is required so as to cache responses on replicas which did not receive the RPC directly +// from the client. +// +// Throughout this header and elsewhere we use the following terms: +// +// RPC - The operation that a client or another server wants to execute on this server. The client +// might attempt one RPC many times, for instance if failures or timeouts happen. +// Attempt - Each individual attempt of an RPC on the server. +// Handler - A thread executing an attempt. Usually there is only one handler that executes the +// first attempt of an RPC and, when it completes, replies to its own attempt and to all +// other attempts that might have arrived after it started. +// Driver - Only important in cases where there might be multiple handlers (e.g. in replicated +// RPCs). In these cases there might be two handlers executing the same RPC, corresponding +// to different attempts. Since the RPC must be executed exactly once, only one of the +// handlers must be selected as the "driver" and actually perform the operation. +// +// If a client wishes to track the result of a given RPC it must send on the RPC header +// a RequestId with the following information: +// +// Client ID - Uniquely identifies a single client. All the RPCs originating from the same +// client must have the same ID. +// Sequence number - Uniquely identifies a single RPC, even across retries to multiple servers, for +// replicated RPCs. All retries of the same RPC must have the same sequence +// number. +// Attempt number - Uniquely identifies each retry of the same RPC. All retries of the same RPC +// must have different attempt numbers. +// +// When a call first arrives from the client the RPC subsystem will call TrackRpc() which +// will return the state of the RPC in the form of an RpcState enum. +// +// If the ResultTracker returns NEW, this signals that it's the first time the server has heard +// of the RPC and that the corresponding server function should be executed. +// +// If anything other than NEW is returned it means that the call has either previously completed or +// is in the process of being executed. In this case the caller should _not_ execute the function +// corresponding to the RPC. The ResultTracker itself will take care of responding to the client +// appropriately. If the RPC was already completed, the ResultTracker replies to the client +// immediately. If the RPC is still ongoing, the attempt gets "attached" to the ongoing one and will +// receive the same response when its handler finishes. +// +// If handling of the RPC is successful, RecordCompletionAndRespond() must be called +// to register successful completion, in which case all pending or future RPCs with the same +// sequence number, from the same client, will receive the same response. +// +// On the other hand, if execution of the server function is not successful then one of +// the FailAndRespond() methods should be called, causing all _pending_ attempts to receive the same +// error. However this error is not stored, any future attempt with the same sequence number and +// same client ID will be given a new chance to execute, as if it it had never been tried before. +// This gives the client a chance to either retry (if the failure reason is transient) or give up. +// +// ============================================================================ +// RPCs with multiple handlers +// ============================================================================ +// +// Some RPCs results are tracked by single server, i.e. they correspond to the modification of an +// unreplicated resource and are unpersisted. For those no additional care needs to be taken, the +// first attempt will be the only handler, and subsequent attempts will receive the response when +// that first attempt is done. +// However some RPCs are replicated across servers, using consensus, and thus can have multiple +// handlers executing different attempts at the same time, e.g. one handler from a client +// originating retry, and one from a previous leader originating update. +// +// In this case we need to make sure that the following invariants are enforced: +// - Only one handler can actually record a response, the "driver" handler. +// - Only one handler must respond to "attached" attempts. +// - Each handler replies to their own RPCs, to avoid races. That is, a live handler should +// not mutate another live handler's response/context. +// +// This is achieved by naming one handler the "driver" of the RPC and making sure that only +// the driver can successfully complete it, i.e. call RecordCompletionAndRespond(). +// +// In order to make sure there is only one driver, there must be an _external_ serialization +// point, before the final response is produced, after which only one of the handlers will +// be marked as the driver. For instance, for writes, this serialization point is in +// TransactionDriver, in a synchronized block where a logic such as this one happens (here +// in pseudo-ish code): +// +// { +// lock_guard<simple_spinlock> l(lock_); +// if (follower_transaction) { +// result_tracker_->TrackRpcOrChangeDriver(request_id); +// continue_with_transaction(); +// } else if (client_transaction) { +// bool is_still_driver = result_tracker_->IsCurrentDriver(request_id); +// if (is_still_driver) continue_with_transaction(); +// else abort_transaction(); +// } +// } +// +// This class is thread safe. +class ResultTracker : public RefCountedThreadSafe<ResultTracker> { + public: + typedef rpc::RequestTracker::SequenceNumber SequenceNumber; + static const int NO_HANDLER = -1; + // Enum returned by TrackRpc that reflects the state of the RPC. + enum RpcState { + // The RPC is new. + NEW, + // The RPC has previously completed and the same response has been sent + // to the client. + COMPLETED, + // The RPC is currently in-progress and, when it completes, the same response + // will be sent to the client. + IN_PROGRESS, + // The RPC's state is stale, meaning it's older than our per-client garbage + // collection watermark and we do not recall the original response. + STALE + }; + + explicit ResultTracker(std::shared_ptr<kudu::MemTracker> mem_tracker); + ~ResultTracker(); + + // Tracks the RPC and returns its current state. + // + // If the RpcState == NEW the caller is supposed to actually start executing the RPC. + // The caller still owns the passed 'response' and 'context'. + // + // If the RpcState is anything else all remaining actions will be taken care of internally, + // i.e. the caller no longer needs to execute the RPC and this takes ownership of the passed + // 'response' and 'context'. + RpcState TrackRpc(const RequestIdPB& request_id, + google::protobuf::Message* response, + RpcContext* context); + + // Used to track RPC attempts which originate from other replicas, and which may race with + // client originated ones. + // Tracks the RPC if it is untracked or changes the current driver of this RPC, i.e. sets the + // attempt number in 'request_id' as the driver of the RPC, if it is tracked and IN_PROGRESS. + RpcState TrackRpcOrChangeDriver(const RequestIdPB& request_id); + + // Checks if the attempt at an RPC identified by 'request_id' is the current driver of the + // RPC. That is, if the attempt number in 'request_id' corresponds to the attempt marked + // as the driver of this RPC, either by initially getting NEW from TrackRpc() or by + // explicit driver change with ChangeDriver(). + bool IsCurrentDriver(const RequestIdPB& request_id); + + // Records the completion of sucessful operation. + // This will respond to all RPCs from the same client with the same sequence_number. + // The response will be stored so that any future retries of this RPC get the same response. + // + // Requires that TrackRpc() was called before with the same 'client_id' and + // 'sequence_number'. + // Requires that the attempt indentified by 'request_id' is the current driver + // of the RPC. + void RecordCompletionAndRespond(const RequestIdPB& request_id, + const google::protobuf::Message* response); + + // Responds to all RPCs identified by 'client_id' and 'sequence_number' with the same response, + // but doesn't actually store the response. + // This should be called when the RPC failed validation or if some transient error occurred. + // Based on the response the client can then decide whether to retry the RPC (which will + // be treated as a new one) or to give up. + // + // Requires that TrackRpc() was called before with the same 'client_id' and + // 'sequence_number'. + // Requires that the attempt indentified by 'request_id' is the current driver + // of the RPC. + void FailAndRespond(const RequestIdPB& request_id, + google::protobuf::Message* response); + + // Overload to match other types of RpcContext::Respond*Failure() + void FailAndRespond(const RequestIdPB& request_id, + ErrorStatusPB_RpcErrorCodePB err, const Status& status); + + // Overload to match other types of RpcContext::Respond*Failure() + void FailAndRespond(const RequestIdPB& request_id, + int error_ext_id, const std::string& message, + const google::protobuf::Message& app_error_pb); + + // Start a background thread which periodically runs GCResults(). + // This thread is automatically stopped in the destructor. + // + // Must be called at most once. + void StartGCThread(); + + // Runs time-based garbage collection on the results this result tracker is caching. + // When garbage collection runs, it goes through all ClientStates and: + // - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no + // requests are in progress, GCs the ClientState and all its CompletionRecords. + // - If a ClientState is newer than the 'remember_clients_ttl_ms' flag, goes + // through all CompletionRecords and: + // - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag, + // GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark. + // + // Typically this is invoked from an internal thread started by 'StartGCThread()'. + void GCResults(); + + string ToString(); + + private: + // Information about client originated ongoing RPCs. + // The lifecycle of 'response' and 'context' is managed by the RPC layer. + struct OnGoingRpcInfo { + google::protobuf::Message* response; + RpcContext* context; + int64_t handler_attempt_no; + + std::string ToString() const; + }; + // A completion record for an IN_PROGRESS or COMPLETED RPC. + struct CompletionRecord { + CompletionRecord(RpcState state, int64_t driver_attempt_no) + : state(state), + driver_attempt_no(driver_attempt_no), + last_updated(MonoTime::Now()) { + } + + // The current state of the RPC. + RpcState state; + + // The attempt number that is/was "driving" this RPC. + int64_t driver_attempt_no; + + // The timestamp of the last CompletionRecord update. + MonoTime last_updated; + + // The cached response, if this RPC is in COMPLETED state. + std::unique_ptr<google::protobuf::Message> response; + + // The set of ongoing RPCs that correspond to this record. + std::vector<OnGoingRpcInfo> ongoing_rpcs; + + std::string ToString() const; + + // Calculates the memory footprint of this struct. + int64_t memory_footprint() const { + return kudu_malloc_usable_size(this) + + (ongoing_rpcs.capacity() > 0 ? kudu_malloc_usable_size(ongoing_rpcs.data()) : 0) + + (response.get() != nullptr ? response->SpaceUsed() : 0); + } + }; + + // The state corresponding to a single client. + struct ClientState { + typedef MemTrackerAllocator< + std::pair<const SequenceNumber, + std::unique_ptr<CompletionRecord>>> CompletionRecordMapAllocator; + typedef std::map<SequenceNumber, + std::unique_ptr<CompletionRecord>, + std::less<SequenceNumber>, + CompletionRecordMapAllocator> CompletionRecordMap; + + explicit ClientState(std::shared_ptr<MemTracker> mem_tracker) + : stale_before_seq_no(0), + completion_records(CompletionRecordMap::key_compare(), + CompletionRecordMapAllocator(std::move(mem_tracker))) {} + + // The last time we've heard from this client. + MonoTime last_heard_from; + + // The sequence number of the first response we remember for this client. + // All sequence numbers before this one are considered STALE. + SequenceNumber stale_before_seq_no; + + // The (un gc'd) CompletionRecords for this client. + CompletionRecordMap completion_records; + + // Garbage collects this client's CompletionRecords for which MustGcRecordFunc returns + // true. We use a lambda here so that we can have a single method that GCs and releases + // the memory for CompletionRecords based on different policies. + // + // 'func' should have the following signature: + // bool MyFunction(SequenceNumber seq_no, CompletionRecord* record); + // + template<class MustGcRecordFunc> + void GCCompletionRecords(const std::shared_ptr<kudu::MemTracker>& mem_tracker, + MustGcRecordFunc func); + + std::string ToString() const; + + // Calculates the memory footprint of this struct. + // This calculation is shallow and doesn't account for the memory the nested data + // structures occupy. + int64_t memory_footprint() const { + return kudu_malloc_usable_size(this); + } + }; + + RpcState TrackRpcUnlocked(const RequestIdPB& request_id, + google::protobuf::Message* response, + RpcContext* context); + + typedef std::function<void (const OnGoingRpcInfo&)> HandleOngoingRpcFunc; + + // Helper method to handle the multiple overloads of FailAndRespond. Takes a lambda + // that knows what to do with OnGoingRpcInfo in each individual case. + void FailAndRespondInternal(const rpc::RequestIdPB& request_id, + HandleOngoingRpcFunc func); + + CompletionRecord* FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id); + CompletionRecord* FindCompletionRecordOrDieUnlocked(const RequestIdPB& request_id); + std::pair<ClientState*, CompletionRecord*> FindClientStateAndCompletionRecordOrNullUnlocked( + const RequestIdPB& request_id); + + // A handler must handle an RPC attempt if: + // 1 - It's its own attempt. I.e. it has the same attempt number of the handler. + // 2 - It's the driver of the RPC and the attempt has no handler (was attached). + bool MustHandleRpc(int64_t handler_attempt_no, + CompletionRecord* completion_record, + const OnGoingRpcInfo& ongoing_rpc) { + if (PREDICT_TRUE(ongoing_rpc.handler_attempt_no == handler_attempt_no)) { + return true; + } + if (completion_record->driver_attempt_no == handler_attempt_no) { + return ongoing_rpc.handler_attempt_no == NO_HANDLER; + } + return false; + } + + void LogAndTraceAndRespondSuccess(RpcContext* context, const google::protobuf::Message& msg); + void LogAndTraceFailure(RpcContext* context, const google::protobuf::Message& msg); + void LogAndTraceFailure(RpcContext* context, ErrorStatusPB_RpcErrorCodePB err, + const Status& status); + + std::string ToStringUnlocked() const; + + void RunGCThread(); + + // The memory tracker that tracks this ResultTracker's memory consumption. + std::shared_ptr<kudu::MemTracker> mem_tracker_; + + // Lock that protects access to 'clients_' and to the state contained in each + // ClientState. + // TODO consider a per-ClientState lock if we find this too coarse grained. + simple_spinlock lock_; + + typedef MemTrackerAllocator<std::pair<const std::string, + std::unique_ptr<ClientState>>> ClientStateMapAllocator; + typedef std::map<std::string, + std::unique_ptr<ClientState>, + std::less<std::string>, + ClientStateMapAllocator> ClientStateMap; + + ClientStateMap clients_; + + // The thread which runs GC, and a latch to stop it. + scoped_refptr<Thread> gc_thread_; + CountDownLatch gc_thread_stop_latch_; + + DISALLOW_COPY_AND_ASSIGN(ResultTracker); +}; + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/retriable_rpc.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/retriable_rpc.h b/be/src/kudu/rpc/retriable_rpc.h new file mode 100644 index 0000000..c896027 --- /dev/null +++ b/be/src/kudu/rpc/retriable_rpc.h @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <memory> +#include <string> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/request_tracker.h" +#include "kudu/rpc/rpc.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/messenger.h" +#include "kudu/util/monotime.h" + +namespace kudu { +namespace rpc { + +namespace internal { +typedef rpc::RequestTracker::SequenceNumber SequenceNumber; +} + +// A base class for retriable RPCs that handles replica picking and retry logic. +// +// The 'Server' template parameter refers to the type of the server that will be looked up +// and passed to the derived classes on Try(). For instance in the case of WriteRpc it's +// RemoteTabletServer. +// +// TODO(unknown): merge RpcRetrier into this class? Can't be done right now as the retrier is used +// independently elsewhere, but likely possible when all replicated RPCs have a ReplicaPicker. +// +// TODO(unknown): allow to target replicas other than the leader, if needed. +// +// TODO(unknown): once we have retry handling on all the RPCs merge this with rpc::Rpc. +template <class Server, class RequestPB, class ResponsePB> +class RetriableRpc : public Rpc { + public: + RetriableRpc(const scoped_refptr<ServerPicker<Server>>& server_picker, + const scoped_refptr<RequestTracker>& request_tracker, + const MonoTime& deadline, + std::shared_ptr<Messenger> messenger) + : Rpc(deadline, std::move(messenger)), + server_picker_(server_picker), + request_tracker_(request_tracker), + sequence_number_(RequestTracker::NO_SEQ_NO), + num_attempts_(0) {} + + virtual ~RetriableRpc() { + DCHECK_EQ(sequence_number_, RequestTracker::NO_SEQ_NO); + } + + // Performs server lookup/initialization. + // If/when the server is looked up and initialized successfully RetriableRpc will call + // Try() to actually send the request. + void SendRpc() override; + + // The callback to call upon retrieving (of failing to retrieve) a new authn + // token. This is the callback that subclasses should call in their custom + // implementation of the GetNewAuthnTokenAndRetry() method. + void GetNewAuthnTokenAndRetryCb(const Status& status); + + protected: + // Subclasses implement this method to actually try the RPC. + // The server been looked up and is ready to be used. + virtual void Try(Server* replica, const ResponseCallback& callback) = 0; + + // Subclasses implement this method to analyze 'status', the controller status or + // the response and return a RetriableRpcStatus which will then be used + // to decide how to proceed (retry or give up). + virtual RetriableRpcStatus AnalyzeResponse(const Status& status) = 0; + + // Subclasses implement this method to perform cleanup and/or final steps. + // After this is called the RPC will be no longer retried. + virtual void Finish(const Status& status) = 0; + + // Returns 'true' if the RPC is to scheduled for retry with a new authn token, + // 'false' otherwise. For RPCs performed in the context of providing token + // for authentication it's necessary to implement this method. The default + // implementation returns 'false' meaning the calls returning + // INVALID_AUTHENTICATION_TOKEN RPC status are not retried. + virtual bool GetNewAuthnTokenAndRetry() { + return false; + } + + // Request body. + RequestPB req_; + + // Response body. + ResponsePB resp_; + + private: + friend class CalculatorServiceRpc; + + // Decides whether to retry the RPC, based on the result of AnalyzeResponse() + // and retries if that is the case. + // Returns true if the RPC was retried or false otherwise. + bool RetryIfNeeded(const RetriableRpcStatus& result, Server* server); + + // Called when the replica has been looked up. + void ReplicaFoundCb(const Status& status, Server* server); + + // Called after the RPC was performed. + void SendRpcCb(const Status& status) override; + + // Performs final cleanup, after the RPC is done (independently of success). + void FinishInternal(); + + scoped_refptr<ServerPicker<Server>> server_picker_; + scoped_refptr<RequestTracker> request_tracker_; + std::shared_ptr<Messenger> messenger_; + + // The sequence number for this RPC. + internal::SequenceNumber sequence_number_; + + // The number of times this RPC has been attempted + int32 num_attempts_; + + // Keeps track of the replica the RPCs were sent to. + // TODO Remove this and pass the used replica around. For now we need to keep this as + // the retrier calls the SendRpcCb directly and doesn't know the replica that was + // being written to. + Server* current_; +}; + +template <class Server, class RequestPB, class ResponsePB> +void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpc() { + if (sequence_number_ == RequestTracker::NO_SEQ_NO) { + CHECK_OK(request_tracker_->NewSeqNo(&sequence_number_)); + } + server_picker_->PickLeader(Bind(&RetriableRpc::ReplicaFoundCb, + Unretained(this)), + retrier().deadline()); +} + +template <class Server, class RequestPB, class ResponsePB> +void RetriableRpc<Server, RequestPB, ResponsePB>::GetNewAuthnTokenAndRetryCb( + const Status& status) { + if (status.ok()) { + // Perform the RPC call with the newly fetched authn token. + mutable_retrier()->mutable_controller()->Reset(); + SendRpc(); + } else { + // Back to the retry sequence, hoping for better conditions after some time. + VLOG(1) << "Failed to get new authn token: " << status.ToString(); + mutable_retrier()->DelayedRetry(this, status); + } +} + +template <class Server, class RequestPB, class ResponsePB> +bool RetriableRpc<Server, RequestPB, ResponsePB>::RetryIfNeeded( + const RetriableRpcStatus& result, Server* server) { + // Handle the cases where we retry. + switch (result.result) { + case RetriableRpcStatus::SERVICE_UNAVAILABLE: + // For writes, always retry the request on the same server in case of the + // SERVICE_UNAVAILABLE error. + break; + + case RetriableRpcStatus::SERVER_NOT_ACCESSIBLE: + // TODO(KUDU-1745): not checking for null here results in a crash, since in the case + // of a failed master lookup we have no tablet server corresponding to the error. + // + // But, with the null check, we end up with a relatively tight retry loop + // in this scenario whereas we should be backing off. Need to improve + // test coverage here to understand why the back-off is not taking effect. + if (server != nullptr) { + VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString(); + // Mark the server as failed. As for details on the only existing + // implementation of ServerPicker::MarkServerFailed(), see the note on + // the MetaCacheServerPicker::MarkServerFailed() method. + server_picker_->MarkServerFailed(server, result.status); + } + break; + + case RetriableRpcStatus::RESOURCE_NOT_FOUND: + // The TabletServer was not part of the config serving the tablet. + // We mark our tablet cache as stale, forcing a master lookup on the + // next attempt. + // + // TODO(KUDU-1314): Don't backoff the first time we hit this error. + server_picker_->MarkResourceNotFound(server); + break; + + case RetriableRpcStatus::REPLICA_NOT_LEADER: + // The TabletServer was not the leader of the quorum. + server_picker_->MarkReplicaNotLeader(server); + break; + + case RetriableRpcStatus::INVALID_AUTHENTICATION_TOKEN: { + // This is a special case for retry: first it's necessary to get a new + // authn token and then retry the operation with the new token. + if (GetNewAuthnTokenAndRetry()) { + // The RPC will be retried. + resp_.Clear(); + return true; + } + // Do not retry. + return false; + } + + case RetriableRpcStatus::NON_RETRIABLE_ERROR: + if (server != nullptr && result.status.IsTimedOut()) { + // For the NON_RETRIABLE_ERROR result in case of TimedOut status, + // mark the server as failed. As for details on the only existing + // implementation of ServerPicker::MarkServerFailed(), see the note on + // the MetaCacheServerPicker::MarkServerFailed() method. + VLOG(1) << "Failing " << ToString() << " to a new target: " << result.status.ToString(); + server_picker_->MarkServerFailed(server, result.status); + } + // Do not retry in the case of non-retriable error. + return false; + + default: + // For the OK case we should not retry. + DCHECK(result.result == RetriableRpcStatus::OK); + return false; + } + resp_.Clear(); + current_ = nullptr; + mutable_retrier()->DelayedRetry(this, result.status); + return true; +} + +template <class Server, class RequestPB, class ResponsePB> +void RetriableRpc<Server, RequestPB, ResponsePB>::FinishInternal() { + // Mark the RPC as completed and set the sequence number to NO_SEQ_NO to make + // sure we're in the appropriate state before destruction. + request_tracker_->RpcCompleted(sequence_number_); + sequence_number_ = RequestTracker::NO_SEQ_NO; +} + +template <class Server, class RequestPB, class ResponsePB> +void RetriableRpc<Server, RequestPB, ResponsePB>::ReplicaFoundCb(const Status& status, + Server* server) { + // NOTE: 'server' here may be nullptr in the case that status is not OK! + RetriableRpcStatus result = AnalyzeResponse(status); + if (RetryIfNeeded(result, server)) return; + + if (result.result == RetriableRpcStatus::NON_RETRIABLE_ERROR) { + FinishInternal(); + Finish(result.status); + return; + } + + // We successfully found a replica, so prepare the RequestIdPB before we send out the call. + std::unique_ptr<RequestIdPB> request_id(new RequestIdPB()); + request_id->set_client_id(request_tracker_->client_id()); + request_id->set_seq_no(sequence_number_); + request_id->set_first_incomplete_seq_no(request_tracker_->FirstIncomplete()); + request_id->set_attempt_no(num_attempts_++); + + mutable_retrier()->mutable_controller()->SetRequestIdPB(std::move(request_id)); + + DCHECK_EQ(result.result, RetriableRpcStatus::OK); + current_ = server; + Try(server, boost::bind(&RetriableRpc::SendRpcCb, this, Status::OK())); +} + +template <class Server, class RequestPB, class ResponsePB> +void RetriableRpc<Server, RequestPB, ResponsePB>::SendRpcCb(const Status& status) { + RetriableRpcStatus result = AnalyzeResponse(status); + if (RetryIfNeeded(result, current_)) return; + + FinishInternal(); + + // From here on out the RPC has either succeeded of suffered a non-retriable + // failure. + Status final_status = result.status; + if (!final_status.ok()) { + string error_string; + if (current_) { + error_string = strings::Substitute("Failed to write to server: $0", current_->ToString()); + } else { + error_string = "Failed to write to server: (no server available)"; + } + final_status = final_status.CloneAndPrepend(error_string); + } + Finish(final_status); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/rpc-bench.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/rpc-bench.cc b/be/src/kudu/rpc/rpc-bench.cc new file mode 100644 index 0000000..d569ea1 --- /dev/null +++ b/be/src/kudu/rpc/rpc-bench.cc @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <functional> +#include <gflags/gflags.h> +#include <gtest/gtest.h> +#include <memory> +#include <string> +#include <thread> + +#include "kudu/gutil/atomicops.h" +#include "kudu/rpc/rpc-test-base.h" +#include "kudu/rpc/rtest.proxy.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/test_util.h" + +using std::bind; +using std::shared_ptr; +using std::string; +using std::thread; +using std::unique_ptr; +using std::vector; + +DEFINE_int32(client_threads, 16, + "Number of client threads. For the synchronous benchmark, each thread has " + "a single outstanding synchronous request at a time. For the async " + "benchmark, this determines the number of client reactors."); + +DEFINE_int32(async_call_concurrency, 60, + "Number of concurrent requests that will be outstanding at a time for the " + "async benchmark. The requests are multiplexed across the number of " + "reactors specified by the 'client_threads' flag."); + +DEFINE_int32(worker_threads, 1, + "Number of server worker threads"); + +DEFINE_int32(server_reactors, 4, + "Number of server reactor threads"); + +DEFINE_int32(run_seconds, 1, "Seconds to run the test"); + +DECLARE_bool(rpc_encrypt_loopback_connections); +DEFINE_bool(enable_encryption, false, "Whether to enable TLS encryption for rpc-bench"); + +namespace kudu { +namespace rpc { + +class RpcBench : public RpcTestBase { + public: + RpcBench() + : should_run_(true), + stop_(0) + {} + + void SetUp() override { + OverrideFlagForSlowTests("run_seconds", "10"); + + n_worker_threads_ = FLAGS_worker_threads; + n_server_reactor_threads_ = FLAGS_server_reactors; + + // Set up server. + FLAGS_rpc_encrypt_loopback_connections = FLAGS_enable_encryption; + StartTestServerWithGeneratedCode(&server_addr_, FLAGS_enable_encryption); + } + + void SummarizePerf(CpuTimes elapsed, int total_reqs, bool sync) { + float reqs_per_second = static_cast<float>(total_reqs / elapsed.wall_seconds()); + float user_cpu_micros_per_req = static_cast<float>(elapsed.user / 1000.0 / total_reqs); + float sys_cpu_micros_per_req = static_cast<float>(elapsed.system / 1000.0 / total_reqs); + float csw_per_req = static_cast<float>(elapsed.context_switches) / total_reqs; + + LOG(INFO) << "Mode: " << (sync ? "Sync" : "Async"); + if (sync) { + LOG(INFO) << "Client threads: " << FLAGS_client_threads; + } else { + LOG(INFO) << "Client reactors: " << FLAGS_client_threads; + LOG(INFO) << "Call concurrency: " << FLAGS_async_call_concurrency; + } + + LOG(INFO) << "Worker threads: " << FLAGS_worker_threads; + LOG(INFO) << "Server reactors: " << FLAGS_server_reactors; + LOG(INFO) << "Encryption: " << FLAGS_enable_encryption; + LOG(INFO) << "----------------------------------"; + LOG(INFO) << "Reqs/sec: " << reqs_per_second; + LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us"; + LOG(INFO) << "Sys CPU per req: " << sys_cpu_micros_per_req << "us"; + LOG(INFO) << "Ctx Sw. per req: " << csw_per_req; + + } + + protected: + friend class ClientThread; + friend class ClientAsyncWorkload; + + Sockaddr server_addr_; + Atomic32 should_run_; + CountDownLatch stop_; +}; + +class ClientThread { + public: + explicit ClientThread(RpcBench *bench) + : bench_(bench), + request_count_(0) { + } + + void Start() { + thread_.reset(new thread(&ClientThread::Run, this)); + } + + void Join() { + thread_->join(); + } + + void Run() { + shared_ptr<Messenger> client_messenger = bench_->CreateMessenger("Client"); + + CalculatorServiceProxy p(client_messenger, bench_->server_addr_); + + AddRequestPB req; + AddResponsePB resp; + while (Acquire_Load(&bench_->should_run_)) { + req.set_x(request_count_); + req.set_y(request_count_); + RpcController controller; + controller.set_timeout(MonoDelta::FromSeconds(10)); + CHECK_OK(p.Add(req, &resp, &controller)); + CHECK_EQ(req.x() + req.y(), resp.result()); + request_count_++; + } + } + + unique_ptr<thread> thread_; + RpcBench *bench_; + int request_count_; +}; + + +// Test making successful RPC calls. +TEST_F(RpcBench, BenchmarkCalls) { + Stopwatch sw(Stopwatch::ALL_THREADS); + sw.start(); + + vector<unique_ptr<ClientThread>> threads; + for (int i = 0; i < FLAGS_client_threads; i++) { + threads.emplace_back(new ClientThread(this)); + threads.back()->Start(); + } + + SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds)); + Release_Store(&should_run_, false); + + int total_reqs = 0; + + for (auto& thr : threads) { + thr->Join(); + total_reqs += thr->request_count_; + } + sw.stop(); + + SummarizePerf(sw.elapsed(), total_reqs, true); +} + +class ClientAsyncWorkload { + public: + ClientAsyncWorkload(RpcBench *bench, shared_ptr<Messenger> messenger) + : bench_(bench), + messenger_(std::move(messenger)), + request_count_(0) { + controller_.set_timeout(MonoDelta::FromSeconds(10)); + proxy_.reset(new CalculatorServiceProxy(messenger_, bench_->server_addr_)); + } + + void CallOneRpc() { + if (request_count_ > 0) { + CHECK_OK(controller_.status()); + CHECK_EQ(req_.x() + req_.y(), resp_.result()); + } + if (!Acquire_Load(&bench_->should_run_)) { + bench_->stop_.CountDown(); + return; + } + controller_.Reset(); + req_.set_x(request_count_); + req_.set_y(request_count_); + request_count_++; + proxy_->AddAsync(req_, + &resp_, + &controller_, + bind(&ClientAsyncWorkload::CallOneRpc, this)); + } + + void Start() { + CallOneRpc(); + } + + RpcBench *bench_; + shared_ptr<Messenger> messenger_; + unique_ptr<CalculatorServiceProxy> proxy_; + uint32_t request_count_; + RpcController controller_; + AddRequestPB req_; + AddResponsePB resp_; +}; + +TEST_F(RpcBench, BenchmarkCallsAsync) { + int threads = FLAGS_client_threads; + int concurrency = FLAGS_async_call_concurrency; + + vector<shared_ptr<Messenger>> messengers; + for (int i = 0; i < threads; i++) { + messengers.push_back(CreateMessenger("Client")); + } + + vector<unique_ptr<ClientAsyncWorkload>> workloads; + for (int i = 0; i < concurrency; i++) { + workloads.emplace_back( + new ClientAsyncWorkload(this, messengers[i % threads])); + } + + stop_.Reset(concurrency); + + Stopwatch sw(Stopwatch::ALL_THREADS); + sw.start(); + + for (int i = 0; i < concurrency; i++) { + workloads[i]->Start(); + } + + SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds)); + Release_Store(&should_run_, false); + + sw.stop(); + + stop_.Wait(); + int total_reqs = 0; + for (int i = 0; i < concurrency; i++) { + total_reqs += workloads[i]->request_count_; + } + + SummarizePerf(sw.elapsed(), total_reqs, false); +} + +} // namespace rpc +} // namespace kudu +
