This is an automated email from the ASF dual-hosted git repository.

mgreber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7acd71e9a5a33a92ab55122c316c89a8be61b3e1
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Sep 6 11:18:05 2024 -0700

    [rpc] micro-cleanup on InboundCall and ServicePool
    
    I was troubleshooting an issue and went over the related code
    to clean it up a bit:
      * don't crash on errors in ServicePool::Init(), but return the status
        to be handled at the upper level
      * ServicePool::RejectTooBusy() first sends corresponding response
        before logging about the rejected request
      * fix const-correctness of a few methods in InboundCall, ServicePool
      * remove unnecessary mutex primitive in ServicePool
    
    Change-Id: I255754da8e66263a00aa28e9061102d4ec421105
    Reviewed-on: http://gerrit.cloudera.org:8080/21759
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Marton Greber <[email protected]>
    Reviewed-by: Zoltan Chovan <[email protected]>
---
 src/kudu/rpc/inbound_call.cc |  6 ++++--
 src/kudu/rpc/inbound_call.h  |  4 ++--
 src/kudu/rpc/rpcz_store.cc   |  6 +++---
 src/kudu/rpc/rpcz_store.h    |  2 +-
 src/kudu/rpc/service_pool.cc | 26 ++++++++++++--------------
 src/kudu/rpc/service_pool.h  |  5 ++---
 6 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index b204eae3e..18ac13582 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -361,8 +361,10 @@ void InboundCall::DiscardTransfer() {
   transfer_.reset();
 }
 
-size_t InboundCall::GetTransferSize() {
-  if (!transfer_) return 0;
+size_t InboundCall::GetTransferSize() const {
+  if (PREDICT_FALSE(!transfer_)) {
+    return 0;
+  }
   return transfer_->data().size();
 }
 
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index d0dd75e5f..65af6e3be 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -174,7 +174,7 @@ class InboundCall {
   // Return the method associated with this call. This is set just before
   // the call is enqueued onto the service queue, and therefore may be
   // 'nullptr' for much of the lifecycle of a call.
-  RpcMethodInfo* method_info() {
+  const RpcMethodInfo* method_info() const {
     return method_info_.get();
   }
 
@@ -221,7 +221,7 @@ class InboundCall {
 
   // Returns the size of the transfer buffer that backs this call. If the 
transfer does
   // not exist (e.g. GetTransferSize() is called after DiscardTransfer()), 
returns 0.
-  size_t GetTransferSize();
+  size_t GetTransferSize() const;
 
  private:
   friend class RpczStore;
diff --git a/src/kudu/rpc/rpcz_store.cc b/src/kudu/rpc/rpcz_store.cc
index 52a91f317..6e46b3492 100644
--- a/src/kudu/rpc/rpcz_store.cc
+++ b/src/kudu/rpc/rpcz_store.cc
@@ -127,7 +127,7 @@ class MethodSampler {
 };
 
 MethodSampler* RpczStore::SamplerForCall(InboundCall* call) {
-  auto* method_info = call->method_info();
+  const auto* method_info = call->method_info();
   if (PREDICT_FALSE(!method_info)) {
     return nullptr;
   }
@@ -229,9 +229,9 @@ void RpczStore::AddCall(InboundCall* call) {
   sampler->SampleCall(call);
 }
 
-void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req,
+void RpczStore::DumpPB(const DumpRpczStoreRequestPB& /* req */,
                        DumpRpczStoreResponsePB* resp) {
-  vector<pair<RpcMethodInfo*, MethodSampler*>> samplers;
+  vector<pair<const RpcMethodInfo*, MethodSampler*>> samplers;
   {
     shared_lock<rw_spinlock> l(samplers_lock_.get_lock());
     for (const auto& [mi, ms] : method_samplers_) {
diff --git a/src/kudu/rpc/rpcz_store.h b/src/kudu/rpc/rpcz_store.h
index 563f51754..099f481dc 100644
--- a/src/kudu/rpc/rpcz_store.h
+++ b/src/kudu/rpc/rpcz_store.h
@@ -65,7 +65,7 @@ class RpczStore final {
   percpu_rwlock samplers_lock_;
 
   // Protected by samplers_lock_.
-  std::unordered_map<RpcMethodInfo*, std::unique_ptr<MethodSampler>> 
method_samplers_;
+  std::unordered_map<const RpcMethodInfo*, std::unique_ptr<MethodSampler>> 
method_samplers_;
 
   DISALLOW_COPY_AND_ASSIGN(RpczStore);
 };
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index 1d7a4a07b..ad5fb03cd 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -89,7 +89,7 @@ ServicePool::~ServicePool() {
 Status ServicePool::Init(int num_threads) {
   for (int i = 0; i < num_threads; i++) {
     scoped_refptr<kudu::Thread> new_thread;
-    CHECK_OK(kudu::Thread::Create(
+    RETURN_NOT_OK(kudu::Thread::Create(
         Substitute("service pool $0", service_->service_name()),
         "rpc worker",
         [this]() { this->RunThread(); }, &new_thread));
@@ -101,11 +101,10 @@ Status ServicePool::Init(int num_threads) {
 void ServicePool::Shutdown() {
   service_queue_.Shutdown();
 
-  std::lock_guard lock(shutdown_lock_);
-  if (closing_) {
+  bool is_shut_down = false;
+  if (!closing_.compare_exchange_strong(is_shut_down, true)) {
     return;
   }
-  closing_ = true;
   // TODO(mpercy): Use a proper thread pool implementation.
   for (scoped_refptr<kudu::Thread>& thread : threads_) {
     CHECK_OK(ThreadJoiner(thread.get()).Join());
@@ -122,21 +121,20 @@ void ServicePool::Shutdown() {
 }
 
 void ServicePool::RejectTooBusy(InboundCall* c) {
-  string err_msg =
-      Substitute("$0 request on $1 from $2 dropped due to backpressure. "
-                 "The service queue is full; it has $3 items.",
-                 c->remote_method().method_name(),
-                 service_->service_name(),
-                 c->remote_address().ToString(),
-                 service_queue_.max_size());
   rpcs_queue_overflow_->Increment();
-  auto* minfo = c->method_info();
-  if (minfo) {
+  if (const auto* minfo = c->method_info(); minfo != nullptr) {
     minfo->queue_overflow_rejections->Increment();
   }
-  KLOG_EVERY_N_SECS(WARNING, 1) << err_msg << THROTTLE_MSG;
+  const string err_msg = Substitute(
+      "$0 request on $1 from $2 dropped due to backpressure: "
+      "service queue is full with $3 items",
+      c->remote_method().method_name(),
+      service_->service_name(),
+      c->remote_address().ToString(),
+      service_queue_.max_size());
   c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
                     Status::ServiceUnavailable(err_msg));
+  KLOG_EVERY_N_SECS(WARNING, 1) << err_msg << THROTTLE_MSG;
   DLOG(INFO) << err_msg << " Contents of service queue:\n"
              << service_queue_.ToString();
 
diff --git a/src/kudu/rpc/service_pool.h b/src/kudu/rpc/service_pool.h
index 62f68cef1..daaeb822c 100644
--- a/src/kudu/rpc/service_pool.h
+++ b/src/kudu/rpc/service_pool.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstddef>
 #include <functional>
 #include <memory>
@@ -27,7 +28,6 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/rpc_service.h"
 #include "kudu/rpc/service_queue.h"
-#include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -99,8 +99,7 @@ class ServicePool : public RpcService {
   scoped_refptr<Counter> rpcs_timed_out_in_queue_;
   scoped_refptr<Counter> rpcs_queue_overflow_;
 
-  Mutex shutdown_lock_;
-  bool closing_;
+  std::atomic<bool> closing_;
 
   std::function<void(void)> too_busy_hook_;
 

Reply via email to