http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_if.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_if.cc b/be/src/kudu/rpc/service_if.cc new file mode 100644 index 0000000..39e9ab5 --- /dev/null +++ b/be/src/kudu/rpc/service_if.cc @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/service_if.h" + +#include <memory> +#include <string> +#include <google/protobuf/descriptor.pb.h> + +#include "kudu/gutil/strings/substitute.h" + +#include "kudu/rpc/connection.h" +#include "kudu/rpc/inbound_call.h" +#include "kudu/rpc/rpc_context.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/util/flag_tags.h" + +// TODO remove this once we have fully cluster-tested this. +// Despite being on by default, this is left in in case we discover +// any issues in 0.10.0, we'll have an easy workaround to disable the feature. +DEFINE_bool(enable_exactly_once, true, "Whether to enable exactly once semantics."); +TAG_FLAG(enable_exactly_once, hidden); + +using google::protobuf::Message; +using std::string; +using std::unique_ptr; +using strings::Substitute; + +namespace kudu { +namespace rpc { + +ServiceIf::~ServiceIf() { +} + +void ServiceIf::Shutdown() { +} + +bool ServiceIf::SupportsFeature(uint32_t feature) const { + return false; +} + +bool ServiceIf::ParseParam(InboundCall *call, google::protobuf::Message *message) { + Slice param(call->serialized_request()); + if (PREDICT_FALSE(!message->ParseFromArray(param.data(), param.size()))) { + string err = Substitute("invalid parameter for call $0: missing fields: $1", + call->remote_method().ToString(), + message->InitializationErrorString().c_str()); + LOG(WARNING) << err; + call->RespondFailure(ErrorStatusPB::ERROR_INVALID_REQUEST, + Status::InvalidArgument(err)); + return false; + } + return true; +} + +void ServiceIf::RespondBadMethod(InboundCall *call) { + Sockaddr local_addr, remote_addr; + + CHECK_OK(call->connection()->socket()->GetSocketAddress(&local_addr)); + CHECK_OK(call->connection()->socket()->GetPeerAddress(&remote_addr)); + string err = Substitute("Call on service $0 received at $1 from $2 with an " + "invalid method name: $3", + call->remote_method().service_name(), + local_addr.ToString(), + remote_addr.ToString(), + call->remote_method().method_name()); + LOG(WARNING) << err; + call->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD, + Status::InvalidArgument(err)); +} + +GeneratedServiceIf::~GeneratedServiceIf() { +} + + +void GeneratedServiceIf::Handle(InboundCall *call) { + const RpcMethodInfo* method_info = call->method_info(); + if (!method_info) { + RespondBadMethod(call); + return; + } + unique_ptr<Message> req(method_info->req_prototype->New()); + if (PREDICT_FALSE(!ParseParam(call, req.get()))) { + return; + } + Message* resp = method_info->resp_prototype->New(); + + bool track_result = call->header().has_request_id() + && method_info->track_result + && FLAGS_enable_exactly_once; + RpcContext* ctx = new RpcContext(call, + req.release(), + resp, + track_result ? result_tracker_ : nullptr); + if (!method_info->authz_method(ctx->request_pb(), resp, ctx)) { + // The authz_method itself should have responded to the RPC. + return; + } + + if (track_result) { + RequestIdPB request_id(call->header().request_id()); + ResultTracker::RpcState state = ctx->result_tracker()->TrackRpc( + call->header().request_id(), + resp, + ctx); + switch (state) { + case ResultTracker::NEW: + // Fall out of the 'if' statement to the normal path. + break; + case ResultTracker::COMPLETED: + case ResultTracker::IN_PROGRESS: + case ResultTracker::STALE: + // ResultTracker has already responded to the RPC and deleted + // 'ctx'. + return; + default: + LOG(FATAL) << "Unknown state: " << state; + } + } + method_info->func(ctx->request_pb(), resp, ctx); +} + + +RpcMethodInfo* GeneratedServiceIf::LookupMethod(const RemoteMethod& method) { + DCHECK_EQ(method.service_name(), service_name()); + const auto& it = methods_by_name_.find(method.method_name()); + if (PREDICT_FALSE(it == methods_by_name_.end())) { + return nullptr; + } + return it->second.get(); +} + + +} // namespace rpc +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_if.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_if.h b/be/src/kudu/rpc/service_if.h new file mode 100644 index 0000000..a3722c6 --- /dev/null +++ b/be/src/kudu/rpc/service_if.h @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_RPC_SERVICE_IF_H +#define KUDU_RPC_SERVICE_IF_H + +#include <unordered_map> +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/metrics.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/rpc/result_tracker.h" + +namespace google { +namespace protobuf { +class Message; +} +} + +namespace kudu { + +class Histogram; + +namespace rpc { + +class InboundCall; +class RemoteMethod; +class RpcContext; +class ServiceIf; + +// Generated services define an instance of this class for each +// method that they implement. The generic server code implemented +// by GeneratedServiceIf look up the RpcMethodInfo in order to handle +// each RPC. +struct RpcMethodInfo : public RefCountedThreadSafe<RpcMethodInfo> { + // Prototype protobufs for requests and responses. + // These are empty protobufs which are cloned in order to provide an + // instance for each request. + std::unique_ptr<google::protobuf::Message> req_prototype; + std::unique_ptr<google::protobuf::Message> resp_prototype; + + scoped_refptr<Histogram> handler_latency_histogram; + + // Whether we should track this method's result, using ResultTracker. + bool track_result; + + // The authorization function for this RPC. If this function + // returns false, the RPC has already been handled (i.e. rejected) + // by the authorization function. + std::function<bool(const google::protobuf::Message* req, + google::protobuf::Message* resp, + RpcContext* ctx)> authz_method; + + // The actual function to be called. + std::function<void(const google::protobuf::Message* req, + google::protobuf::Message* resp, + RpcContext* ctx)> func; +}; + +// Handles incoming messages that initiate an RPC. +class ServiceIf { + public: + virtual ~ServiceIf(); + virtual void Handle(InboundCall* incoming) = 0; + virtual void Shutdown(); + virtual std::string service_name() const = 0; + + // The service should return true if it supports the provided application + // specific feature flag. + virtual bool SupportsFeature(uint32_t feature) const; + + // Look up the method being requested by the remote call. + // + // If this returns nullptr, then certain functionality like + // metrics collection will not be performed for this call. + virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) { + return nullptr; + } + + // Default authorization method, which just allows all RPCs. + // + // See docs/design-docs/rpc.md for details on how to add custom + // authorization checks to a service. + bool AuthorizeAllowAll(const google::protobuf::Message* /*req*/, + google::protobuf::Message* /*resp*/, + RpcContext* /*ctx*/) { + return true; + } + + protected: + bool ParseParam(InboundCall* call, google::protobuf::Message* message); + void RespondBadMethod(InboundCall* call); +}; + + +// Base class for code-generated service classes. +class GeneratedServiceIf : public ServiceIf { + public: + virtual ~GeneratedServiceIf(); + + // Looks up the appropriate method in 'methods_by_name_' and executes + // it on the current thread. + // + // If no such method is found, responds with an error. + void Handle(InboundCall* incoming) override; + + RpcMethodInfo* LookupMethod(const RemoteMethod& method) override; + + protected: + // For each method, stores the relevant information about how to handle the + // call. Methods are inserted by the constructor of the generated subclass. + // After construction, this map is accessed by multiple threads and therefore + // must not be modified. + std::unordered_map<std::string, scoped_refptr<RpcMethodInfo>> methods_by_name_; + + // The result tracker for this service's methods. + scoped_refptr<ResultTracker> result_tracker_; +}; + +} // namespace rpc +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_pool.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_pool.cc b/be/src/kudu/rpc/service_pool.cc new file mode 100644 index 0000000..1a23ca9 --- /dev/null +++ b/be/src/kudu/rpc/service_pool.cc @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/service_pool.h" + +#include <glog/logging.h> +#include <memory> +#include <string> +#include <vector> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/inbound_call.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/service_if.h" +#include "kudu/rpc/service_queue.h" +#include "kudu/util/logging.h" +#include "kudu/util/metrics.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" +#include "kudu/util/trace.h" + +using std::shared_ptr; +using strings::Substitute; + +METRIC_DEFINE_histogram(server, rpc_incoming_queue_time, + "RPC Queue Time", + kudu::MetricUnit::kMicroseconds, + "Number of microseconds incoming RPC requests spend in the worker queue", + 60000000LU, 3); + +METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue, + "RPC Queue Timeouts", + kudu::MetricUnit::kRequests, + "Number of RPCs whose timeout elapsed while waiting " + "in the service queue, and thus were not processed."); + +METRIC_DEFINE_counter(server, rpcs_queue_overflow, + "RPC Queue Overflows", + kudu::MetricUnit::kRequests, + "Number of RPCs dropped because the service queue " + "was full."); + +namespace kudu { +namespace rpc { + +ServicePool::ServicePool(gscoped_ptr<ServiceIf> service, + const scoped_refptr<MetricEntity>& entity, + size_t service_queue_length) + : service_(std::move(service)), + service_queue_(service_queue_length), + incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)), + rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)), + rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)), + closing_(false) { +} + +ServicePool::~ServicePool() { + Shutdown(); +} + +Status ServicePool::Init(int num_threads) { + for (int i = 0; i < num_threads; i++) { + scoped_refptr<kudu::Thread> new_thread; + CHECK_OK(kudu::Thread::Create("service pool", "rpc worker", + &ServicePool::RunThread, this, &new_thread)); + threads_.push_back(new_thread); + } + return Status::OK(); +} + +void ServicePool::Shutdown() { + service_queue_.Shutdown(); + + MutexLock lock(shutdown_lock_); + if (closing_) return; + closing_ = true; + // TODO: Use a proper thread pool implementation. + for (scoped_refptr<kudu::Thread>& thread : threads_) { + CHECK_OK(ThreadJoiner(thread.get()).Join()); + } + + // Now we must drain the service queue. + Status status = Status::ServiceUnavailable("Service is shutting down"); + std::unique_ptr<InboundCall> incoming; + while (service_queue_.BlockingGet(&incoming)) { + incoming.release()->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status); + } + + service_->Shutdown(); +} + +void ServicePool::RejectTooBusy(InboundCall* c) { + string err_msg = + Substitute("$0 request on $1 from $2 dropped due to backpressure. " + "The service queue is full; it has $3 items.", + c->remote_method().method_name(), + service_->service_name(), + c->remote_address().ToString(), + service_queue_.max_size()); + rpcs_queue_overflow_->Increment(); + KLOG_EVERY_N_SECS(WARNING, 1) << err_msg; + c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, + Status::ServiceUnavailable(err_msg)); + DLOG(INFO) << err_msg << " Contents of service queue:\n" + << service_queue_.ToString(); +} + +RpcMethodInfo* ServicePool::LookupMethod(const RemoteMethod& method) { + return service_->LookupMethod(method); +} + +Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) { + InboundCall* c = call.release(); + + vector<uint32_t> unsupported_features; + for (uint32_t feature : c->GetRequiredFeatures()) { + if (!service_->SupportsFeature(feature)) { + unsupported_features.push_back(feature); + } + } + + if (!unsupported_features.empty()) { + c->RespondUnsupportedFeature(unsupported_features); + return Status::NotSupported("call requires unsupported application feature flags", + JoinMapped(unsupported_features, + [] (uint32_t flag) { return std::to_string(flag); }, + ", ")); + } + + TRACE_TO(c->trace(), "Inserting onto call queue"); + + // Queue message on service queue + boost::optional<InboundCall*> evicted; + auto queue_status = service_queue_.Put(c, &evicted); + if (queue_status == QUEUE_FULL) { + RejectTooBusy(c); + return Status::OK(); + } + + if (PREDICT_FALSE(evicted != boost::none)) { + RejectTooBusy(*evicted); + } + + if (PREDICT_TRUE(queue_status == QUEUE_SUCCESS)) { + // NB: do not do anything with 'c' after it is successfully queued -- + // a service thread may have already dequeued it, processed it, and + // responded by this point, in which case the pointer would be invalid. + return Status::OK(); + } + + Status status = Status::OK(); + if (queue_status == QUEUE_SHUTDOWN) { + status = Status::ServiceUnavailable("Service is shutting down"); + c->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status); + } else { + status = Status::RuntimeError(Substitute("Unknown error from BlockingQueue: $0", queue_status)); + c->RespondFailure(ErrorStatusPB::FATAL_UNKNOWN, status); + } + return status; +} + +void ServicePool::RunThread() { + while (true) { + std::unique_ptr<InboundCall> incoming; + if (!service_queue_.BlockingGet(&incoming)) { + VLOG(1) << "ServicePool: messenger shutting down."; + return; + } + + incoming->RecordHandlingStarted(incoming_queue_time_); + ADOPT_TRACE(incoming->trace()); + + if (PREDICT_FALSE(incoming->ClientTimedOut())) { + TRACE_TO(incoming->trace(), "Skipping call since client already timed out"); + rpcs_timed_out_in_queue_->Increment(); + + // Respond as a failure, even though the client will probably ignore + // the response anyway. + incoming->RespondFailure( + ErrorStatusPB::ERROR_SERVER_TOO_BUSY, + Status::TimedOut("Call waited in the queue past client deadline")); + + // Must release since RespondFailure above ends up taking ownership + // of the object. + ignore_result(incoming.release()); + continue; + } + + TRACE_TO(incoming->trace(), "Handling call"); + + // Release the InboundCall pointer -- when the call is responded to, + // it will get deleted at that point. + service_->Handle(incoming.release()); + } +} + +const string ServicePool::service_name() const { + return service_->service_name(); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_pool.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_pool.h b/be/src/kudu/rpc/service_pool.h new file mode 100644 index 0000000..70611c8 --- /dev/null +++ b/be/src/kudu/rpc/service_pool.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_SERVICE_POOL_H +#define KUDU_SERVICE_POOL_H + +#include <string> +#include <vector> + +#include "kudu/gutil/macros.h" +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/rpc/rpc_service.h" +#include "kudu/rpc/service_queue.h" +#include "kudu/util/mutex.h" +#include "kudu/util/thread.h" +#include "kudu/util/status.h" + +namespace kudu { + +class Counter; +class Histogram; +class MetricEntity; +class Socket; + +namespace rpc { + +class Messenger; +class ServiceIf; + +// A pool of threads that handle new incoming RPC calls. +// Also includes a queue that calls get pushed onto for handling by the pool. +class ServicePool : public RpcService { + public: + ServicePool(gscoped_ptr<ServiceIf> service, + const scoped_refptr<MetricEntity>& metric_entity, + size_t service_queue_length); + virtual ~ServicePool(); + + // Start up the thread pool. + virtual Status Init(int num_threads); + + // Shut down the queue and the thread pool. + virtual void Shutdown(); + + RpcMethodInfo* LookupMethod(const RemoteMethod& method) override; + + virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) OVERRIDE; + + const Counter* RpcsTimedOutInQueueMetricForTests() const { + return rpcs_timed_out_in_queue_.get(); + } + + const Histogram* IncomingQueueTimeMetricForTests() const { + return incoming_queue_time_.get(); + } + + const Counter* RpcsQueueOverflowMetric() const { + return rpcs_queue_overflow_.get(); + } + + const std::string service_name() const; + + private: + void RunThread(); + void RejectTooBusy(InboundCall* c); + + gscoped_ptr<ServiceIf> service_; + std::vector<scoped_refptr<kudu::Thread> > threads_; + LifoServiceQueue service_queue_; + scoped_refptr<Histogram> incoming_queue_time_; + scoped_refptr<Counter> rpcs_timed_out_in_queue_; + scoped_refptr<Counter> rpcs_queue_overflow_; + + mutable Mutex shutdown_lock_; + bool closing_; + + DISALLOW_COPY_AND_ASSIGN(ServicePool); +}; + +} // namespace rpc +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_queue-test.cc b/be/src/kudu/rpc/service_queue-test.cc new file mode 100644 index 0000000..0bcbd12 --- /dev/null +++ b/be/src/kudu/rpc/service_queue-test.cc @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include <atomic> +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <memory> +#include <string> +#include <thread> +#include <vector> + +#include "kudu/rpc/service_queue.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; + +DEFINE_int32(num_producers, 4, + "Number of producer threads"); + +DEFINE_int32(num_consumers, 20, + "Number of consumer threads"); + +DEFINE_int32(max_queue_size, 50, + "Max queue length"); + +namespace kudu { +namespace rpc { + +static std::atomic<uint32_t> inprogress; + +static std::atomic<uint32_t> total; + +template <typename Queue> +void ProducerThread(Queue* queue) { + int max_inprogress = FLAGS_max_queue_size - FLAGS_num_producers; + while (true) { + while (inprogress > max_inprogress) { + base::subtle::PauseCPU(); + } + inprogress++; + InboundCall* call = new InboundCall(nullptr); + boost::optional<InboundCall*> evicted; + auto status = queue->Put(call, &evicted); + if (status == QUEUE_FULL) { + LOG(INFO) << "queue full: producer exiting"; + delete call; + break; + } + + if (PREDICT_FALSE(evicted != boost::none)) { + LOG(INFO) << "call evicted: producer exiting"; + delete evicted.get(); + break; + } + + if (PREDICT_TRUE(status == QUEUE_SHUTDOWN)) { + delete call; + break; + } + } +} + +template <typename Queue> +void ConsumerThread(Queue* queue) { + unique_ptr<InboundCall> call; + while (queue->BlockingGet(&call)) { + inprogress--; + total++; + call.reset(); + } +} + +TEST(TestServiceQueue, LifoServiceQueuePerf) { + LifoServiceQueue queue(FLAGS_max_queue_size); + vector<std::thread> producers; + vector<std::thread> consumers; + + for (int i = 0; i < FLAGS_num_producers; i++) { + producers.emplace_back(&ProducerThread<LifoServiceQueue>, &queue); + } + + for (int i = 0; i < FLAGS_num_consumers; i++) { + consumers.emplace_back(&ConsumerThread<LifoServiceQueue>, &queue); + } + + int seconds = AllowSlowTests() ? 10 : 1; + uint64_t total_sample = 0; + uint64_t total_queue_len = 0; + uint64_t total_idle_workers = 0; + Stopwatch sw(Stopwatch::ALL_THREADS); + sw.start(); + int32_t before = total; + + for (int i = 0; i < seconds * 50; i++) { + SleepFor(MonoDelta::FromMilliseconds(20)); + total_sample++; + total_queue_len += queue.estimated_queue_length(); + total_idle_workers += queue.estimated_idle_worker_count(); + } + + sw.stop(); + int32_t delta = total - before; + + queue.Shutdown(); + for (int i = 0; i < FLAGS_num_producers; i++) { + producers[i].join(); + } + for (int i = 0; i < FLAGS_num_consumers; i++) { + consumers[i].join(); + } + + float reqs_per_second = static_cast<float>(delta / sw.elapsed().wall_seconds()); + float user_cpu_micros_per_req = static_cast<float>(sw.elapsed().user / 1000.0 / delta); + float sys_cpu_micros_per_req = static_cast<float>(sw.elapsed().system / 1000.0 / delta); + + LOG(INFO) << "Reqs/sec: " << (int32_t)reqs_per_second; + LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us"; + LOG(INFO) << "Sys CPU per req: " << sys_cpu_micros_per_req << "us"; + LOG(INFO) << "Avg rpc queue length: " << total_queue_len / static_cast<double>(total_sample); + LOG(INFO) << "Avg idle workers: " << total_idle_workers / static_cast<double>(total_sample); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_queue.cc b/be/src/kudu/rpc/service_queue.cc new file mode 100644 index 0000000..9b938f6 --- /dev/null +++ b/be/src/kudu/rpc/service_queue.cc @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/service_queue.h" + +#include <mutex> + +#include "kudu/util/logging.h" + +namespace kudu { +namespace rpc { + +__thread LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = nullptr; + +LifoServiceQueue::LifoServiceQueue(int max_size) + : shutdown_(false), + max_queue_size_(max_size) { + CHECK_GT(max_queue_size_, 0); +} + +LifoServiceQueue::~LifoServiceQueue() { + DCHECK(queue_.empty()) + << "ServiceQueue holds bare pointers at destruction time"; +} + +bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) { + auto consumer = tl_consumer_; + if (PREDICT_FALSE(!consumer)) { + consumer = tl_consumer_ = new ConsumerState(this); + std::lock_guard<simple_spinlock> l(lock_); + consumers_.emplace_back(consumer); + } + + while (true) { + { + std::lock_guard<simple_spinlock> l(lock_); + if (!queue_.empty()) { + auto it = queue_.begin(); + out->reset(*it); + queue_.erase(it); + return true; + } + if (PREDICT_FALSE(shutdown_)) { + return false; + } + consumer->DCheckBoundInstance(this); + waiting_consumers_.push_back(consumer); + } + InboundCall* call = consumer->Wait(); + if (call != nullptr) { + out->reset(call); + return true; + } + // if call == nullptr, this means we are shutting down the queue. + // Loop back around and re-check 'shutdown_'. + } +} + +QueueStatus LifoServiceQueue::Put(InboundCall* call, + boost::optional<InboundCall*>* evicted) { + std::unique_lock<simple_spinlock> l(lock_); + if (PREDICT_FALSE(shutdown_)) { + return QUEUE_SHUTDOWN; + } + + DCHECK(!(waiting_consumers_.size() > 0 && queue_.size() > 0)); + + // fast path + if (queue_.empty() && waiting_consumers_.size() > 0) { + auto consumer = waiting_consumers_[waiting_consumers_.size() - 1]; + waiting_consumers_.pop_back(); + // Notify condition var(and wake up consumer thread) takes time, + // so put it out of spinlock scope. + l.unlock(); + consumer->Post(call); + return QUEUE_SUCCESS; + } + + if (PREDICT_FALSE(queue_.size() >= max_queue_size_)) { + // eviction + DCHECK_EQ(queue_.size(), max_queue_size_); + auto it = queue_.end(); + --it; + if (DeadlineLess(*it, call)) { + return QUEUE_FULL; + } + + *evicted = *it; + queue_.erase(it); + } + + queue_.insert(call); + return QUEUE_SUCCESS; +} + +void LifoServiceQueue::Shutdown() { + std::lock_guard<simple_spinlock> l(lock_); + shutdown_ = true; + + // Post a nullptr to wake up any consumers which are waiting. + for (auto* cs : waiting_consumers_) { + cs->Post(nullptr); + } + waiting_consumers_.clear(); +} + +bool LifoServiceQueue::empty() const { + std::lock_guard<simple_spinlock> l(lock_); + return queue_.empty(); +} + +int LifoServiceQueue::max_size() const { + return max_queue_size_; +} + +std::string LifoServiceQueue::ToString() const { + std::string ret; + + std::lock_guard<simple_spinlock> l(lock_); + for (const auto* t : queue_) { + ret.append(t->ToString()); + ret.append("\n"); + } + return ret; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/service_queue.h b/be/src/kudu/rpc/service_queue.h new file mode 100644 index 0000000..d68576f --- /dev/null +++ b/be/src/kudu/rpc/service_queue.h @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_SERVICE_QUEUE_H +#define KUDU_UTIL_SERVICE_QUEUE_H + +#include <boost/optional.hpp> +#include <memory> +#include <string> +#include <set> +#include <vector> + +#include "kudu/rpc/inbound_call.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/mutex.h" + +namespace kudu { +namespace rpc { + +// Return values for ServiceQueue::Put() +enum QueueStatus { + QUEUE_SUCCESS = 0, + QUEUE_SHUTDOWN = 1, + QUEUE_FULL = 2 +}; + +// Blocking queue used for passing inbound RPC calls to the service handler pool. +// Calls are dequeued in 'earliest-deadline first' order. The queue also maintains a +// bounded number of calls. If the queue overflows, then calls with deadlines farthest +// in the future are evicted. +// +// When calls do not provide deadlines, the RPC layer considers their deadline to +// be infinitely in the future. This means that any call that does have a deadline +// can evict any call that does not have a deadline. This incentivizes clients to +// provide accurate deadlines for their calls. +// +// In order to improve concurrent throughput, this class uses a LIFO design: +// Each consumer thread has its own lock and condition variable. If a +// consumer arrives and there is no work available in the queue, it will not +// wait on the queue lock, but rather push its own 'ConsumerState' object +// to the 'waiting_consumers_' stack. When work arrives, if there are waiting +// consumers, the top consumer is popped from the stack and woken up. +// +// This design has a few advantages over the basic BlockingQueue: +// - the worker who was most recently busy is the one which will be selected for +// new work. This gives an opportunity for the worker to be scheduled again +// without going to sleep, and also keeps CPU cache and allocator caches hot. +// - in the common case that there are enough workers to fully service the incoming +// work rate, the queue implementation itself is never used. Thus, we can +// have a priority queue without paying extra for it in the common case. +// +// NOTE: because of the use of thread-local consumer records, once a consumer +// thread accesses one LifoServiceQueue, it becomes "bound" to that queue and +// must never access any other instance. +class LifoServiceQueue { + public: + explicit LifoServiceQueue(int max_size); + + ~LifoServiceQueue(); + + // Get an element from the queue. Returns false if we were shut down prior to + // getting the element. + bool BlockingGet(std::unique_ptr<InboundCall>* out); + + // Add a new call to the queue. + // Returns: + // - QUEUE_SHUTDOWN if Shutdown() has already been called. + // - QUEUE_FULL if the queue is full and 'call' has a later deadline than any + // RPC already in the queue. + // - QUEUE_SUCCESS if 'call' was enqueued. + // + // In the case of a 'QUEUE_SUCCESS' response, the new element may have bumped + // another call out of the queue. In that case, *evicted will be set to the + // call that was bumped. + QueueStatus Put(InboundCall* call, boost::optional<InboundCall*>* evicted); + + // Shut down the queue. + // When a blocking queue is shut down, no more elements can be added to it, + // and Put() will return QUEUE_SHUTDOWN. + // Existing elements will drain out of it, and then BlockingGet will start + // returning false. + void Shutdown(); + + bool empty() const; + + int max_size() const; + + std::string ToString() const; + + // Return an estimate of the current queue length. + int estimated_queue_length() const { + ANNOTATE_IGNORE_READS_BEGIN(); + // The C++ standard says that std::multiset::size must be constant time, + // so this method won't try to traverse any actual nodes of the underlying + // RB tree. Investigation of the libstdcxx implementation confirms that + // size() is a simple field access of the _Rb_tree structure. + int ret = queue_.size(); + ANNOTATE_IGNORE_READS_END(); + return ret; + } + + // Return an estimate of the number of idle threads currently awaiting work. + int estimated_idle_worker_count() const { + ANNOTATE_IGNORE_READS_BEGIN(); + // Size of a vector is a simple field access so this is safe. + int ret = waiting_consumers_.size(); + ANNOTATE_IGNORE_READS_END(); + return ret; + } + + private: + // Comparison function which orders calls by their deadlines. + static bool DeadlineLess(const InboundCall* a, + const InboundCall* b) { + auto time_a = a->GetClientDeadline(); + auto time_b = b->GetClientDeadline(); + if (time_a == time_b) { + // If two calls have the same deadline (most likely because neither one specified + // one) then we should order them by arrival order. + time_a = a->GetTimeReceived(); + time_b = b->GetTimeReceived(); + } + return time_a < time_b; + } + + // Struct functor wrapper for DeadlineLess. + struct DeadlineLessStruct { + bool operator()(const InboundCall* a, const InboundCall* b) const { + return DeadlineLess(a, b); + } + }; + + // The thread-local record corresponding to a single consumer thread. + // Threads push this record onto the waiting_consumers_ stack when + // they are awaiting work. Producers pop the top waiting consumer and + // post work using Post(). + class ConsumerState { + public: + explicit ConsumerState(LifoServiceQueue* queue) : + cond_(&lock_), + call_(nullptr), + should_wake_(false), + bound_queue_(queue) { + } + + void Post(InboundCall* call) { + DCHECK(call_ == nullptr); + MutexLock l(lock_); + call_ = call; + should_wake_ = true; + cond_.Signal(); + } + + InboundCall* Wait() { + MutexLock l(lock_); + while (should_wake_ == false) { + cond_.Wait(); + } + should_wake_ = false; + InboundCall* ret = call_; + call_ = nullptr; + return ret; + } + + void DCheckBoundInstance(LifoServiceQueue* q) { + DCHECK_EQ(q, bound_queue_); + } + + private: + Mutex lock_; + ConditionVariable cond_; + InboundCall* call_; + bool should_wake_; + + // For the purpose of assertions, tracks the LifoServiceQueue instance that + // this consumer is reading from. + LifoServiceQueue* bound_queue_; + }; + + static __thread ConsumerState* tl_consumer_; + + mutable simple_spinlock lock_; + bool shutdown_; + int max_queue_size_; + + // Stack of consumer threads which are currently waiting for work. + std::vector<ConsumerState*> waiting_consumers_; + + // The actual queue. Work is only added to the queue when there were no + // consumers available for a "direct hand-off". + std::multiset<InboundCall*, DeadlineLessStruct> queue_; + + // The total set of consumers who have ever accessed this queue. + std::vector<std::unique_ptr<ConsumerState>> consumers_; + + DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue); +}; + +} // namespace rpc +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/transfer.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc new file mode 100644 index 0000000..d24e94d --- /dev/null +++ b/be/src/kudu/rpc/transfer.cc @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/transfer.h" + +#include <stdint.h> + +#include <iostream> +#include <sstream> + +#include <glog/logging.h> + +#include "kudu/gutil/endian.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/rpc/constants.h" +#include "kudu/rpc/messenger.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/net/socket.h" + +DEFINE_int32(rpc_max_message_size, (50 * 1024 * 1024), + "The maximum size of a message that any RPC that the server will accept. " + "Must be at least 1MB."); +TAG_FLAG(rpc_max_message_size, advanced); +TAG_FLAG(rpc_max_message_size, runtime); + +static bool ValidateMaxMessageSize(const char* flagname, int32_t value) { + if (value < 1 * 1024 * 1024) { + LOG(ERROR) << flagname << " must be at least 1MB."; + return false; + } + return true; +} +static bool dummy = google::RegisterFlagValidator( + &FLAGS_rpc_max_message_size, &ValidateMaxMessageSize); + +namespace kudu { +namespace rpc { + +using std::ostringstream; +using std::set; +using std::string; +using strings::Substitute; + +#define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status) \ + if (PREDICT_FALSE(!status.ok())) { \ + if (Socket::IsTemporarySocketError(status.posix_code())) { \ + return Status::OK(); /* EAGAIN, etc. */ \ + } \ + return status; \ + } + +TransferCallbacks::~TransferCallbacks() +{} + +InboundTransfer::InboundTransfer() + : total_length_(kMsgLengthPrefixLength), + cur_offset_(0) { + buf_.resize(kMsgLengthPrefixLength); +} + +Status InboundTransfer::ReceiveBuffer(Socket &socket) { + if (cur_offset_ < kMsgLengthPrefixLength) { + // receive int32 length prefix + int32_t rem = kMsgLengthPrefixLength - cur_offset_; + int32_t nread; + Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); + RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); + if (nread == 0) { + return Status::OK(); + } + DCHECK_GE(nread, 0); + cur_offset_ += nread; + if (cur_offset_ < kMsgLengthPrefixLength) { + // If we still don't have the full length prefix, we can't continue + // reading yet. + return Status::OK(); + } + // Since we only read 'rem' bytes above, we should now have exactly + // the length prefix in our buffer and no more. + DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength); + + // The length prefix doesn't include its own 4 bytes, so we have to + // add that back in. + total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength; + if (total_length_ > FLAGS_rpc_max_message_size) { + return Status::NetworkError(Substitute( + "RPC frame had a length of $0, but we only support messages up to $1 bytes " + "long.", total_length_, FLAGS_rpc_max_message_size)); + } + if (total_length_ <= kMsgLengthPrefixLength) { + return Status::NetworkError(Substitute("RPC frame had invalid length of $0", + total_length_)); + } + buf_.resize(total_length_); + + // Fall through to receive the message body, which is likely to be already + // available on the socket. + } + + // receive message body + int32_t nread; + int32_t rem = total_length_ - cur_offset_; + Status status = socket.Recv(&buf_[cur_offset_], rem, &nread); + RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); + cur_offset_ += nread; + + return Status::OK(); +} + +bool InboundTransfer::TransferStarted() const { + return cur_offset_ != 0; +} + +bool InboundTransfer::TransferFinished() const { + return cur_offset_ == total_length_; +} + +string InboundTransfer::StatusAsString() const { + return Substitute("$0/$1 bytes received", cur_offset_, total_length_); +} + +OutboundTransfer* OutboundTransfer::CreateForCallRequest( + int32_t call_id, + const std::vector<Slice> &payload, + TransferCallbacks *callbacks) { + return new OutboundTransfer(call_id, payload, callbacks); +} + +OutboundTransfer* OutboundTransfer::CreateForCallResponse(const std::vector<Slice> &payload, + TransferCallbacks *callbacks) { + return new OutboundTransfer(kInvalidCallId, payload, callbacks); +} + + +OutboundTransfer::OutboundTransfer(int32_t call_id, + const std::vector<Slice> &payload, + TransferCallbacks *callbacks) + : cur_slice_idx_(0), + cur_offset_in_slice_(0), + callbacks_(callbacks), + call_id_(call_id), + aborted_(false) { + CHECK(!payload.empty()); + + n_payload_slices_ = payload.size(); + CHECK_LE(n_payload_slices_, arraysize(payload_slices_)); + for (int i = 0; i < payload.size(); i++) { + payload_slices_[i] = payload[i]; + } +} + +OutboundTransfer::~OutboundTransfer() { + if (!TransferFinished() && !aborted_) { + callbacks_->NotifyTransferAborted( + Status::RuntimeError("RPC transfer destroyed before it finished sending")); + } +} + +void OutboundTransfer::Abort(const Status &status) { + CHECK(!aborted_) << "Already aborted"; + CHECK(!TransferFinished()) << "Cannot abort a finished transfer"; + callbacks_->NotifyTransferAborted(status); + aborted_ = true; +} + +Status OutboundTransfer::SendBuffer(Socket &socket) { + CHECK_LT(cur_slice_idx_, n_payload_slices_); + + int n_iovecs = n_payload_slices_ - cur_slice_idx_; + struct iovec iovec[n_iovecs]; + { + int offset_in_slice = cur_offset_in_slice_; + for (int i = 0; i < n_iovecs; i++) { + Slice &slice = payload_slices_[cur_slice_idx_ + i]; + iovec[i].iov_base = slice.mutable_data() + offset_in_slice; + iovec[i].iov_len = slice.size() - offset_in_slice; + + offset_in_slice = 0; + } + } + + int32_t written; + Status status = socket.Writev(iovec, n_iovecs, &written); + RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status); + + // Adjust our accounting of current writer position. + for (int i = cur_slice_idx_; i < n_payload_slices_; i++) { + Slice &slice = payload_slices_[i]; + int rem_in_slice = slice.size() - cur_offset_in_slice_; + DCHECK_GE(rem_in_slice, 0); + + if (written >= rem_in_slice) { + // Used up this entire slice, advance to the next slice. + cur_slice_idx_++; + cur_offset_in_slice_ = 0; + written -= rem_in_slice; + } else { + // Partially used up this slice, just advance the offset within it. + cur_offset_in_slice_ += written; + break; + } + } + + if (cur_slice_idx_ == n_payload_slices_) { + callbacks_->NotifyTransferFinished(); + DCHECK_EQ(0, cur_offset_in_slice_); + } else { + DCHECK_LT(cur_slice_idx_, n_payload_slices_); + DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size()); + } + + return Status::OK(); +} + +bool OutboundTransfer::TransferStarted() const { + return cur_offset_in_slice_ != 0 || cur_slice_idx_ != 0; +} + +bool OutboundTransfer::TransferFinished() const { + if (cur_slice_idx_ == n_payload_slices_) { + DCHECK_EQ(0, cur_offset_in_slice_); // sanity check + return true; + } + return false; +} + +string OutboundTransfer::HexDump() const { + if (KUDU_SHOULD_REDACT()) { + return kRedactionMessage; + } + + string ret; + for (int i = 0; i < n_payload_slices_; i++) { + ret.append(payload_slices_[i].ToDebugString()); + } + return ret; +} + +int32_t OutboundTransfer::TotalLength() const { + int32_t ret = 0; + for (int i = 0; i < n_payload_slices_; i++) { + ret += payload_slices_[i].size(); + } + return ret; +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/transfer.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h new file mode 100644 index 0000000..671347a --- /dev/null +++ b/be/src/kudu/rpc/transfer.h @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_RPC_TRANSFER_H +#define KUDU_RPC_TRANSFER_H + +#include <boost/intrusive/list.hpp> +#include <gflags/gflags.h> +#include <set> +#include <stdint.h> +#include <string> +#include <vector> + +#include "kudu/rpc/constants.h" +#include "kudu/util/net/sockaddr.h" +#include "kudu/util/status.h" + +DECLARE_int32(rpc_max_message_size); + +namespace google { +namespace protobuf { +class Message; +} // namespace protobuf +} // namespace google + +namespace kudu { + +class Socket; + +namespace rpc { + +class Messenger; +struct TransferCallbacks; + +class TransferLimits { + public: + enum { + kMaxSidecars = 10, + kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg) + }; + + DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits); +}; + +// This class is used internally by the RPC layer to represent an inbound +// transfer in progress. +// +// Inbound Transfer objects are created by a Connection receiving data. When the +// message is fully received, it is either parsed as a call, or a call response, +// and the InboundTransfer object itself is handed off. +class InboundTransfer { + public: + + InboundTransfer(); + + // read from the socket into our buffer + Status ReceiveBuffer(Socket &socket); + + // Return true if any bytes have yet been sent. + bool TransferStarted() const; + + // Return true if the entire transfer has been sent. + bool TransferFinished() const; + + Slice data() const { + return Slice(buf_); + } + + // Return a string indicating the status of this transfer (number of bytes received, etc) + // suitable for logging. + std::string StatusAsString() const; + + private: + + Status ProcessInboundHeader(); + + faststring buf_; + + int32_t total_length_; + int32_t cur_offset_; + + DISALLOW_COPY_AND_ASSIGN(InboundTransfer); +}; + +// When the connection wants to send data, it creates an OutboundTransfer object +// to encompass it. This sits on a queue within the Connection, so that each time +// the Connection wakes up with a writable socket, it consumes more bytes off +// the next pending transfer in the queue. +// +// Upon completion of the transfer, a callback is triggered. +class OutboundTransfer : public boost::intrusive::list_base_hook<> { + public: + // Factory methods for creating transfers associated with call requests + // or responses. The 'payload' slices will be concatenated and + // written to the socket. When the transfer completes or errors, the + // appropriate method of 'callbacks' is invoked. + // + // Does not take ownership of the callbacks object or the underlying + // memory of the slices. The slices must remain valid until the callback + // is triggered. + // + // NOTE: 'payload' is currently restricted to a maximum of kMaxPayloadSlices + // slices. + // ------------------------------------------------------------ + + // Create an outbound transfer for a call request. + static OutboundTransfer* CreateForCallRequest(int32_t call_id, + const std::vector<Slice> &payload, + TransferCallbacks *callbacks); + + // Create an outbound transfer for a call response. + // See above for details. + static OutboundTransfer* CreateForCallResponse(const std::vector<Slice> &payload, + TransferCallbacks *callbacks); + + // Destruct the transfer. A transfer object should never be deallocated + // before it has either (a) finished transferring, or (b) been Abort()ed. + ~OutboundTransfer(); + + // Abort the current transfer, with the given status. + // This triggers TransferCallbacks::NotifyTransferAborted. + void Abort(const Status &status); + + // send from our buffers into the sock + Status SendBuffer(Socket &socket); + + // Return true if any bytes have yet been sent. + bool TransferStarted() const; + + // Return true if the entire transfer has been sent. + bool TransferFinished() const; + + // Return the total number of bytes to be sent (including those already sent) + int32_t TotalLength() const; + + std::string HexDump() const; + + bool is_for_outbound_call() const { + return call_id_ != kInvalidCallId; + } + + // Returns the call ID for a transfer associated with an outbound + // call. Must not be called for call responses. + int32_t call_id() const { + DCHECK_NE(call_id_, kInvalidCallId); + return call_id_; + } + + private: + OutboundTransfer(int32_t call_id, + const std::vector<Slice> &payload, + TransferCallbacks *callbacks); + + // Slices to send. Uses an array here instead of a vector to avoid an expensive + // vector construction (improved performance a couple percent). + Slice payload_slices_[TransferLimits::kMaxPayloadSlices]; + size_t n_payload_slices_; + + // The current slice that is being sent. + int32_t cur_slice_idx_; + // The number of bytes in the above slice which has already been sent. + int32_t cur_offset_in_slice_; + + TransferCallbacks *callbacks_; + + // In the case of outbound calls, the associated call ID. + // In the case of call responses, kInvalidCallId + int32_t call_id_; + + bool aborted_; + + DISALLOW_COPY_AND_ASSIGN(OutboundTransfer); +}; + +// Callbacks made after a transfer completes. +struct TransferCallbacks { + public: + virtual ~TransferCallbacks(); + + // The transfer finished successfully. + virtual void NotifyTransferFinished() = 0; + + // The transfer was aborted (e.g because the connection died or an error occurred). + virtual void NotifyTransferAborted(const Status &status) = 0; +}; + +} // namespace rpc +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/user_credentials.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/user_credentials.cc b/be/src/kudu/rpc/user_credentials.cc new file mode 100644 index 0000000..fdc3ac2 --- /dev/null +++ b/be/src/kudu/rpc/user_credentials.cc @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/rpc/user_credentials.h" + +#include <string> + +#include <boost/functional/hash.hpp> + +#include "kudu/gutil/strings/substitute.h" + +using std::string; + +namespace kudu { +namespace rpc { + +bool UserCredentials::has_real_user() const { + return !real_user_.empty(); +} + +void UserCredentials::set_real_user(const string& real_user) { + real_user_ = real_user; +} + +string UserCredentials::ToString() const { + // Does not print the password. + return strings::Substitute("{real_user=$0}", real_user_); +} + +size_t UserCredentials::HashCode() const { + size_t seed = 0; + if (has_real_user()) { + boost::hash_combine(seed, real_user()); + } + return seed; +} + +bool UserCredentials::Equals(const UserCredentials& other) const { + return real_user() == other.real_user(); +} + +} // namespace rpc +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/user_credentials.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/rpc/user_credentials.h b/be/src/kudu/rpc/user_credentials.h new file mode 100644 index 0000000..56af70a --- /dev/null +++ b/be/src/kudu/rpc/user_credentials.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <string> + +namespace kudu { +namespace rpc { + +// Client-side user credentials. Currently this is more-or-less a simple wrapper +// around a username string. However, we anticipate moving more credentials such as +// tokens into a per-Proxy structure rather than Messenger-wide, and this will +// be the place to store them. +class UserCredentials { + public: + // Real user. + bool has_real_user() const; + void set_real_user(const std::string& real_user); + const std::string& real_user() const { return real_user_; } + + // Returns a string representation of the object. + std::string ToString() const; + + std::size_t HashCode() const; + bool Equals(const UserCredentials& other) const; + + private: + // Remember to update HashCode() and Equals() when new fields are added. + std::string real_user_; +}; + +} // namespace rpc +} // namespace kudu
