This is an automated email from the ASF dual-hosted git repository. mgreber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 7acd71e9a5a33a92ab55122c316c89a8be61b3e1 Author: Alexey Serbin <[email protected]> AuthorDate: Fri Sep 6 11:18:05 2024 -0700 [rpc] micro-cleanup on InboundCall and ServicePool I was troubleshooting an issue and went over the related code to clean it up a bit: * don't crash on errors in ServicePool::Init(), but return the status to be handled at the upper level * ServicePool::RejectTooBusy() first sends corresponding response before logging about the rejected request * fix const-correctness of a few methods in InboundCall, ServicePool * remove unnecessary mutex primitive in ServicePool Change-Id: I255754da8e66263a00aa28e9061102d4ec421105 Reviewed-on: http://gerrit.cloudera.org:8080/21759 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Marton Greber <[email protected]> Reviewed-by: Zoltan Chovan <[email protected]> --- src/kudu/rpc/inbound_call.cc | 6 ++++-- src/kudu/rpc/inbound_call.h | 4 ++-- src/kudu/rpc/rpcz_store.cc | 6 +++--- src/kudu/rpc/rpcz_store.h | 2 +- src/kudu/rpc/service_pool.cc | 26 ++++++++++++-------------- src/kudu/rpc/service_pool.h | 5 ++--- 6 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc index b204eae3e..18ac13582 100644 --- a/src/kudu/rpc/inbound_call.cc +++ b/src/kudu/rpc/inbound_call.cc @@ -361,8 +361,10 @@ void InboundCall::DiscardTransfer() { transfer_.reset(); } -size_t InboundCall::GetTransferSize() { - if (!transfer_) return 0; +size_t InboundCall::GetTransferSize() const { + if (PREDICT_FALSE(!transfer_)) { + return 0; + } return transfer_->data().size(); } diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h index d0dd75e5f..65af6e3be 100644 --- a/src/kudu/rpc/inbound_call.h +++ b/src/kudu/rpc/inbound_call.h @@ -174,7 +174,7 @@ class InboundCall { // Return the method associated with this call. This is set just before // the call is enqueued onto the service queue, and therefore may be // 'nullptr' for much of the lifecycle of a call. - RpcMethodInfo* method_info() { + const RpcMethodInfo* method_info() const { return method_info_.get(); } @@ -221,7 +221,7 @@ class InboundCall { // Returns the size of the transfer buffer that backs this call. If the transfer does // not exist (e.g. GetTransferSize() is called after DiscardTransfer()), returns 0. - size_t GetTransferSize(); + size_t GetTransferSize() const; private: friend class RpczStore; diff --git a/src/kudu/rpc/rpcz_store.cc b/src/kudu/rpc/rpcz_store.cc index 52a91f317..6e46b3492 100644 --- a/src/kudu/rpc/rpcz_store.cc +++ b/src/kudu/rpc/rpcz_store.cc @@ -127,7 +127,7 @@ class MethodSampler { }; MethodSampler* RpczStore::SamplerForCall(InboundCall* call) { - auto* method_info = call->method_info(); + const auto* method_info = call->method_info(); if (PREDICT_FALSE(!method_info)) { return nullptr; } @@ -229,9 +229,9 @@ void RpczStore::AddCall(InboundCall* call) { sampler->SampleCall(call); } -void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req, +void RpczStore::DumpPB(const DumpRpczStoreRequestPB& /* req */, DumpRpczStoreResponsePB* resp) { - vector<pair<RpcMethodInfo*, MethodSampler*>> samplers; + vector<pair<const RpcMethodInfo*, MethodSampler*>> samplers; { shared_lock<rw_spinlock> l(samplers_lock_.get_lock()); for (const auto& [mi, ms] : method_samplers_) { diff --git a/src/kudu/rpc/rpcz_store.h b/src/kudu/rpc/rpcz_store.h index 563f51754..099f481dc 100644 --- a/src/kudu/rpc/rpcz_store.h +++ b/src/kudu/rpc/rpcz_store.h @@ -65,7 +65,7 @@ class RpczStore final { percpu_rwlock samplers_lock_; // Protected by samplers_lock_. - std::unordered_map<RpcMethodInfo*, std::unique_ptr<MethodSampler>> method_samplers_; + std::unordered_map<const RpcMethodInfo*, std::unique_ptr<MethodSampler>> method_samplers_; DISALLOW_COPY_AND_ASSIGN(RpczStore); }; diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc index 1d7a4a07b..ad5fb03cd 100644 --- a/src/kudu/rpc/service_pool.cc +++ b/src/kudu/rpc/service_pool.cc @@ -89,7 +89,7 @@ ServicePool::~ServicePool() { Status ServicePool::Init(int num_threads) { for (int i = 0; i < num_threads; i++) { scoped_refptr<kudu::Thread> new_thread; - CHECK_OK(kudu::Thread::Create( + RETURN_NOT_OK(kudu::Thread::Create( Substitute("service pool $0", service_->service_name()), "rpc worker", [this]() { this->RunThread(); }, &new_thread)); @@ -101,11 +101,10 @@ Status ServicePool::Init(int num_threads) { void ServicePool::Shutdown() { service_queue_.Shutdown(); - std::lock_guard lock(shutdown_lock_); - if (closing_) { + bool is_shut_down = false; + if (!closing_.compare_exchange_strong(is_shut_down, true)) { return; } - closing_ = true; // TODO(mpercy): Use a proper thread pool implementation. for (scoped_refptr<kudu::Thread>& thread : threads_) { CHECK_OK(ThreadJoiner(thread.get()).Join()); @@ -122,21 +121,20 @@ void ServicePool::Shutdown() { } void ServicePool::RejectTooBusy(InboundCall* c) { - string err_msg = - Substitute("$0 request on $1 from $2 dropped due to backpressure. " - "The service queue is full; it has $3 items.", - c->remote_method().method_name(), - service_->service_name(), - c->remote_address().ToString(), - service_queue_.max_size()); rpcs_queue_overflow_->Increment(); - auto* minfo = c->method_info(); - if (minfo) { + if (const auto* minfo = c->method_info(); minfo != nullptr) { minfo->queue_overflow_rejections->Increment(); } - KLOG_EVERY_N_SECS(WARNING, 1) << err_msg << THROTTLE_MSG; + const string err_msg = Substitute( + "$0 request on $1 from $2 dropped due to backpressure: " + "service queue is full with $3 items", + c->remote_method().method_name(), + service_->service_name(), + c->remote_address().ToString(), + service_queue_.max_size()); c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, Status::ServiceUnavailable(err_msg)); + KLOG_EVERY_N_SECS(WARNING, 1) << err_msg << THROTTLE_MSG; DLOG(INFO) << err_msg << " Contents of service queue:\n" << service_queue_.ToString(); diff --git a/src/kudu/rpc/service_pool.h b/src/kudu/rpc/service_pool.h index 62f68cef1..daaeb822c 100644 --- a/src/kudu/rpc/service_pool.h +++ b/src/kudu/rpc/service_pool.h @@ -16,6 +16,7 @@ // under the License. #pragma once +#include <atomic> #include <cstddef> #include <functional> #include <memory> @@ -27,7 +28,6 @@ #include "kudu/gutil/ref_counted.h" #include "kudu/rpc/rpc_service.h" #include "kudu/rpc/service_queue.h" -#include "kudu/util/mutex.h" #include "kudu/util/status.h" namespace kudu { @@ -99,8 +99,7 @@ class ServicePool : public RpcService { scoped_refptr<Counter> rpcs_timed_out_in_queue_; scoped_refptr<Counter> rpcs_queue_overflow_; - Mutex shutdown_lock_; - bool closing_; + std::atomic<bool> closing_; std::function<void(void)> too_busy_hook_;
