This is an automated email from the ASF dual-hosted git repository.

achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1749ec2b975d9ca02f65040193389298e54d33f6
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Feb 21 12:48:15 2024 -0800

    [rpc] a bit of clean-up on LifoServiceQueue
    
    I was looking at the code in service queue/pool while troubleshooting
    one issue, and I noticed there is some room for a minor clean-up such
    as removing unused methods, etc.  This patch takes care of that.
    
    Change-Id: Ibcb0dec8ad165773cb5d50869b979d70f9bf4a79
    Reviewed-on: http://gerrit.cloudera.org:8080/21049
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Mahesh Reddy <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/rpc/service_pool.cc  | 11 +++----
 src/kudu/rpc/service_queue.cc | 29 +++++++----------
 src/kudu/rpc/service_queue.h  | 72 +++++++++++++++++++++++--------------------
 3 files changed, 54 insertions(+), 58 deletions(-)

diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index 00d420c99..848e07d04 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -27,7 +27,6 @@
 
 #include <glog/logging.h>
 
-#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
@@ -170,7 +169,7 @@ Status 
ServicePool::QueueInboundCall(unique_ptr<InboundCall> call) {
 
   // Queue message on service queue
   std::optional<InboundCall*> evicted;
-  auto queue_status = service_queue_.Put(c, &evicted);
+  const auto queue_status = service_queue_.Put(c, &evicted);
   if (queue_status == QUEUE_FULL) {
     RejectTooBusy(c);
     return Status::OK();
@@ -214,14 +213,12 @@ void ServicePool::RunThread() {
       rpcs_timed_out_in_queue_->Increment();
 
       // Respond as a failure, even though the client will probably ignore
-      // the response anyway.
-      incoming->RespondFailure(
+      // the response anyway. Must release the raw pointer since the
+      // RespondFailure() call below ends up taking ownership of the object.
+      incoming.release()->RespondFailure(
         ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
         Status::TimedOut("Call waited in the queue past client deadline"));
 
-      // Must release since RespondFailure above ends up taking ownership
-      // of the object.
-      ignore_result(incoming.release());
       continue;
     }
 
diff --git a/src/kudu/rpc/service_queue.cc b/src/kudu/rpc/service_queue.cc
index c2ab05834..508161c1a 100644
--- a/src/kudu/rpc/service_queue.cc
+++ b/src/kudu/rpc/service_queue.cc
@@ -26,12 +26,12 @@
 namespace kudu {
 namespace rpc {
 
-__thread LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = 
nullptr;
+thread_local LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = 
nullptr;
 
-LifoServiceQueue::LifoServiceQueue(int max_size)
-   : shutdown_(false),
-     max_queue_size_(max_size) {
-  CHECK_GT(max_queue_size_, 0);
+LifoServiceQueue::LifoServiceQueue(size_t max_size)
+   : max_queue_size_(max_size),
+     shutdown_(false) {
+  DCHECK_GT(max_queue_size_, 0);
 }
 
 LifoServiceQueue::~LifoServiceQueue() {
@@ -40,7 +40,7 @@ LifoServiceQueue::~LifoServiceQueue() {
 }
 
 bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) {
-  auto consumer = tl_consumer_;
+  auto* consumer = tl_consumer_;
   if (PREDICT_FALSE(!consumer)) {
     consumer = tl_consumer_ = new ConsumerState(this);
     std::lock_guard<simple_spinlock> l(lock_);
@@ -59,7 +59,9 @@ bool 
LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) {
       if (PREDICT_FALSE(shutdown_)) {
         return false;
       }
+#if DCHECK_IS_ON()
       consumer->DCheckBoundInstance(this);
+#endif
       waiting_consumers_.push_back(consumer);
     }
     InboundCall* call = consumer->Wait();
@@ -79,11 +81,11 @@ QueueStatus LifoServiceQueue::Put(InboundCall* call,
     return QUEUE_SHUTDOWN;
   }
 
-  DCHECK(!(waiting_consumers_.size() > 0 && queue_.size() > 0));
+  DCHECK(waiting_consumers_.empty() || queue_.empty());
 
   // fast path
-  if (queue_.empty() && waiting_consumers_.size() > 0) {
-    auto consumer = waiting_consumers_[waiting_consumers_.size() - 1];
+  if (queue_.empty() && !waiting_consumers_.empty()) {
+    auto* consumer = waiting_consumers_.back();
     waiting_consumers_.pop_back();
     // Notify condition var(and wake up consumer thread) takes time,
     // so put it out of spinlock scope.
@@ -120,15 +122,6 @@ void LifoServiceQueue::Shutdown() {
   waiting_consumers_.clear();
 }
 
-bool LifoServiceQueue::empty() const {
-  std::lock_guard<simple_spinlock> l(lock_);
-  return queue_.empty();
-}
-
-int LifoServiceQueue::max_size() const {
-  return max_queue_size_;
-}
-
 std::string LifoServiceQueue::ToString() const {
   std::string ret;
 
diff --git a/src/kudu/rpc/service_queue.h b/src/kudu/rpc/service_queue.h
index caee06bf7..ff2df1a67 100644
--- a/src/kudu/rpc/service_queue.h
+++ b/src/kudu/rpc/service_queue.h
@@ -16,13 +16,15 @@
 // under the License.
 #pragma once
 
+#include <cstddef>
 #include <memory>
 #include <optional>
-#include <string>
 #include <set>
+#include <string>
 #include <vector>
 
 #include <glog/logging.h>
+#include <gtest/gtest_prod.h>
 
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/macros.h"
@@ -70,12 +72,15 @@ enum QueueStatus {
 // NOTE: because of the use of thread-local consumer records, once a consumer
 // thread accesses one LifoServiceQueue, it becomes "bound" to that queue and
 // must never access any other instance.
-class LifoServiceQueue {
+class LifoServiceQueue final {
  public:
-  explicit LifoServiceQueue(int max_size);
-
+  explicit LifoServiceQueue(size_t max_size);
   ~LifoServiceQueue();
 
+  size_t max_size() const {
+    return max_queue_size_;
+  }
+
   // Get an element from the queue.  Returns false if we were shut down prior 
to
   // getting the element.
   bool BlockingGet(std::unique_ptr<InboundCall>* out);
@@ -99,34 +104,11 @@ class LifoServiceQueue {
   // returning false.
   void Shutdown();
 
-  bool empty() const;
-
-  int max_size() const;
-
   std::string ToString() const;
 
-  // Return an estimate of the current queue length.
-  int estimated_queue_length() const {
-    ANNOTATE_IGNORE_READS_BEGIN();
-    // The C++ standard says that std::multiset::size must be constant time,
-    // so this method won't try to traverse any actual nodes of the underlying
-    // RB tree. Investigation of the libstdcxx implementation confirms that
-    // size() is a simple field access of the _Rb_tree structure.
-    int ret = queue_.size();
-    ANNOTATE_IGNORE_READS_END();
-    return ret;
-  }
-
-  // Return an estimate of the number of idle threads currently awaiting work.
-  int estimated_idle_worker_count() const {
-    ANNOTATE_IGNORE_READS_BEGIN();
-    // Size of a vector is a simple field access so this is safe.
-    int ret = waiting_consumers_.size();
-    ANNOTATE_IGNORE_READS_END();
-    return ret;
-  }
-
  private:
+  FRIEND_TEST(TestServiceQueue, LifoServiceQueuePerf);
+
   // Comparison function which orders calls by their deadlines.
   static bool DeadlineLess(const InboundCall* a,
                            const InboundCall* b) {
@@ -162,8 +144,8 @@ class LifoServiceQueue {
     }
 
     void Post(InboundCall* call) {
-      DCHECK(call_ == nullptr);
       MutexLock l(lock_);
+      DCHECK(!call_);
       call_ = call;
       should_wake_ = true;
       cond_.Signal();
@@ -171,7 +153,7 @@ class LifoServiceQueue {
 
     InboundCall* Wait() {
       MutexLock l(lock_);
-      while (should_wake_ == false) {
+      while (!should_wake_) {
         cond_.Wait();
       }
       should_wake_ = false;
@@ -195,11 +177,32 @@ class LifoServiceQueue {
     LifoServiceQueue* bound_queue_;
   };
 
-  static __thread ConsumerState* tl_consumer_;
+  // Return an estimate of the current queue length.
+  size_t estimated_queue_length() const {
+    ANNOTATE_IGNORE_READS_BEGIN();
+    // The C++ standard says that std::multiset::size must be constant time,
+    // so this method won't try to traverse any actual nodes of the underlying
+    // RB tree. Investigation of the libstdcxx implementation confirms that
+    // size() is a simple field access of the _Rb_tree structure.
+    auto ret = queue_.size();
+    ANNOTATE_IGNORE_READS_END();
+    return ret;
+  }
+
+  // Return an estimate of the number of idle threads currently awaiting work.
+  size_t estimated_idle_worker_count() const {
+    ANNOTATE_IGNORE_READS_BEGIN();
+    // Size of a vector is a simple field access so this is safe.
+    auto ret = waiting_consumers_.size();
+    ANNOTATE_IGNORE_READS_END();
+    return ret;
+  }
+
+  thread_local static ConsumerState* tl_consumer_;
 
+  const size_t max_queue_size_;
   mutable simple_spinlock lock_;
   bool shutdown_;
-  int max_queue_size_;
 
   // Stack of consumer threads which are currently waiting for work.
   std::vector<ConsumerState*> waiting_consumers_;
@@ -209,6 +212,9 @@ class LifoServiceQueue {
   std::multiset<InboundCall*, DeadlineLessStruct> queue_;
 
   // The total set of consumers who have ever accessed this queue.
+  // This container is necessary to maintain proper lifecycle and ownership
+  // of the corresponding ConsumerState objects while their raw pointers
+  // are used elsewhere in this class (e.g., in 'waiting_consumers_').
   std::vector<std::unique_ptr<ConsumerState>> consumers_;
 
   DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue);

Reply via email to