This is an automated email from the ASF dual-hosted git repository. achennaka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1749ec2b975d9ca02f65040193389298e54d33f6 Author: Alexey Serbin <[email protected]> AuthorDate: Wed Feb 21 12:48:15 2024 -0800 [rpc] a bit of clean-up on LifoServiceQueue I was looking at the code in service queue/pool while troubleshooting one issue, and I noticed there is some room for a minor clean-up such as removing unused methods, etc. This patch takes care of that. Change-Id: Ibcb0dec8ad165773cb5d50869b979d70f9bf4a79 Reviewed-on: http://gerrit.cloudera.org:8080/21049 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Mahesh Reddy <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> --- src/kudu/rpc/service_pool.cc | 11 +++---- src/kudu/rpc/service_queue.cc | 29 +++++++---------- src/kudu/rpc/service_queue.h | 72 +++++++++++++++++++++++-------------------- 3 files changed, 54 insertions(+), 58 deletions(-) diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc index 00d420c99..848e07d04 100644 --- a/src/kudu/rpc/service_pool.cc +++ b/src/kudu/rpc/service_pool.cc @@ -27,7 +27,6 @@ #include <glog/logging.h> -#include "kudu/gutil/basictypes.h" #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/strings/join.h" @@ -170,7 +169,7 @@ Status ServicePool::QueueInboundCall(unique_ptr<InboundCall> call) { // Queue message on service queue std::optional<InboundCall*> evicted; - auto queue_status = service_queue_.Put(c, &evicted); + const auto queue_status = service_queue_.Put(c, &evicted); if (queue_status == QUEUE_FULL) { RejectTooBusy(c); return Status::OK(); @@ -214,14 +213,12 @@ void ServicePool::RunThread() { rpcs_timed_out_in_queue_->Increment(); // Respond as a failure, even though the client will probably ignore - // the response anyway. - incoming->RespondFailure( + // the response anyway. Must release the raw pointer since the + // RespondFailure() call below ends up taking ownership of the object. + incoming.release()->RespondFailure( ErrorStatusPB::ERROR_SERVER_TOO_BUSY, Status::TimedOut("Call waited in the queue past client deadline")); - // Must release since RespondFailure above ends up taking ownership - // of the object. - ignore_result(incoming.release()); continue; } diff --git a/src/kudu/rpc/service_queue.cc b/src/kudu/rpc/service_queue.cc index c2ab05834..508161c1a 100644 --- a/src/kudu/rpc/service_queue.cc +++ b/src/kudu/rpc/service_queue.cc @@ -26,12 +26,12 @@ namespace kudu { namespace rpc { -__thread LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = nullptr; +thread_local LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = nullptr; -LifoServiceQueue::LifoServiceQueue(int max_size) - : shutdown_(false), - max_queue_size_(max_size) { - CHECK_GT(max_queue_size_, 0); +LifoServiceQueue::LifoServiceQueue(size_t max_size) + : max_queue_size_(max_size), + shutdown_(false) { + DCHECK_GT(max_queue_size_, 0); } LifoServiceQueue::~LifoServiceQueue() { @@ -40,7 +40,7 @@ LifoServiceQueue::~LifoServiceQueue() { } bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) { - auto consumer = tl_consumer_; + auto* consumer = tl_consumer_; if (PREDICT_FALSE(!consumer)) { consumer = tl_consumer_ = new ConsumerState(this); std::lock_guard<simple_spinlock> l(lock_); @@ -59,7 +59,9 @@ bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) { if (PREDICT_FALSE(shutdown_)) { return false; } +#if DCHECK_IS_ON() consumer->DCheckBoundInstance(this); +#endif waiting_consumers_.push_back(consumer); } InboundCall* call = consumer->Wait(); @@ -79,11 +81,11 @@ QueueStatus LifoServiceQueue::Put(InboundCall* call, return QUEUE_SHUTDOWN; } - DCHECK(!(waiting_consumers_.size() > 0 && queue_.size() > 0)); + DCHECK(waiting_consumers_.empty() || queue_.empty()); // fast path - if (queue_.empty() && waiting_consumers_.size() > 0) { - auto consumer = waiting_consumers_[waiting_consumers_.size() - 1]; + if (queue_.empty() && !waiting_consumers_.empty()) { + auto* consumer = waiting_consumers_.back(); waiting_consumers_.pop_back(); // Notify condition var(and wake up consumer thread) takes time, // so put it out of spinlock scope. @@ -120,15 +122,6 @@ void LifoServiceQueue::Shutdown() { waiting_consumers_.clear(); } -bool LifoServiceQueue::empty() const { - std::lock_guard<simple_spinlock> l(lock_); - return queue_.empty(); -} - -int LifoServiceQueue::max_size() const { - return max_queue_size_; -} - std::string LifoServiceQueue::ToString() const { std::string ret; diff --git a/src/kudu/rpc/service_queue.h b/src/kudu/rpc/service_queue.h index caee06bf7..ff2df1a67 100644 --- a/src/kudu/rpc/service_queue.h +++ b/src/kudu/rpc/service_queue.h @@ -16,13 +16,15 @@ // under the License. #pragma once +#include <cstddef> #include <memory> #include <optional> -#include <string> #include <set> +#include <string> #include <vector> #include <glog/logging.h> +#include <gtest/gtest_prod.h> #include "kudu/gutil/dynamic_annotations.h" #include "kudu/gutil/macros.h" @@ -70,12 +72,15 @@ enum QueueStatus { // NOTE: because of the use of thread-local consumer records, once a consumer // thread accesses one LifoServiceQueue, it becomes "bound" to that queue and // must never access any other instance. -class LifoServiceQueue { +class LifoServiceQueue final { public: - explicit LifoServiceQueue(int max_size); - + explicit LifoServiceQueue(size_t max_size); ~LifoServiceQueue(); + size_t max_size() const { + return max_queue_size_; + } + // Get an element from the queue. Returns false if we were shut down prior to // getting the element. bool BlockingGet(std::unique_ptr<InboundCall>* out); @@ -99,34 +104,11 @@ class LifoServiceQueue { // returning false. void Shutdown(); - bool empty() const; - - int max_size() const; - std::string ToString() const; - // Return an estimate of the current queue length. - int estimated_queue_length() const { - ANNOTATE_IGNORE_READS_BEGIN(); - // The C++ standard says that std::multiset::size must be constant time, - // so this method won't try to traverse any actual nodes of the underlying - // RB tree. Investigation of the libstdcxx implementation confirms that - // size() is a simple field access of the _Rb_tree structure. - int ret = queue_.size(); - ANNOTATE_IGNORE_READS_END(); - return ret; - } - - // Return an estimate of the number of idle threads currently awaiting work. - int estimated_idle_worker_count() const { - ANNOTATE_IGNORE_READS_BEGIN(); - // Size of a vector is a simple field access so this is safe. - int ret = waiting_consumers_.size(); - ANNOTATE_IGNORE_READS_END(); - return ret; - } - private: + FRIEND_TEST(TestServiceQueue, LifoServiceQueuePerf); + // Comparison function which orders calls by their deadlines. static bool DeadlineLess(const InboundCall* a, const InboundCall* b) { @@ -162,8 +144,8 @@ class LifoServiceQueue { } void Post(InboundCall* call) { - DCHECK(call_ == nullptr); MutexLock l(lock_); + DCHECK(!call_); call_ = call; should_wake_ = true; cond_.Signal(); @@ -171,7 +153,7 @@ class LifoServiceQueue { InboundCall* Wait() { MutexLock l(lock_); - while (should_wake_ == false) { + while (!should_wake_) { cond_.Wait(); } should_wake_ = false; @@ -195,11 +177,32 @@ class LifoServiceQueue { LifoServiceQueue* bound_queue_; }; - static __thread ConsumerState* tl_consumer_; + // Return an estimate of the current queue length. + size_t estimated_queue_length() const { + ANNOTATE_IGNORE_READS_BEGIN(); + // The C++ standard says that std::multiset::size must be constant time, + // so this method won't try to traverse any actual nodes of the underlying + // RB tree. Investigation of the libstdcxx implementation confirms that + // size() is a simple field access of the _Rb_tree structure. + auto ret = queue_.size(); + ANNOTATE_IGNORE_READS_END(); + return ret; + } + + // Return an estimate of the number of idle threads currently awaiting work. + size_t estimated_idle_worker_count() const { + ANNOTATE_IGNORE_READS_BEGIN(); + // Size of a vector is a simple field access so this is safe. + auto ret = waiting_consumers_.size(); + ANNOTATE_IGNORE_READS_END(); + return ret; + } + + thread_local static ConsumerState* tl_consumer_; + const size_t max_queue_size_; mutable simple_spinlock lock_; bool shutdown_; - int max_queue_size_; // Stack of consumer threads which are currently waiting for work. std::vector<ConsumerState*> waiting_consumers_; @@ -209,6 +212,9 @@ class LifoServiceQueue { std::multiset<InboundCall*, DeadlineLessStruct> queue_; // The total set of consumers who have ever accessed this queue. + // This container is necessary to maintain proper lifecycle and ownership + // of the corresponding ConsumerState objects while their raw pointers + // are used elsewhere in this class (e.g., in 'waiting_consumers_'). std::vector<std::unique_ptr<ConsumerState>> consumers_; DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue);
