This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new a7e0a36dd [rpc] modernize AcceptorPool's code
a7e0a36dd is described below

commit a7e0a36dda2fd2c0545fa8f00f4bc1711059224d
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Dec 13 10:35:19 2023 -0800

    [rpc] modernize AcceptorPool's code
    
    Since I'm modifying AcceptorPool implementation to add a new metric,
    I went ahead and modernized the code a bit to logically separate
    these mostly style-related updates from follow-up modifications.
    This patch:
      * replaces Atomic32 field with std::atomic<bool>
      * updates the code to make it conform to the project style guide
      * other minor improvements
    
    There are no functional modifications in this changelist.
    
    Change-Id: I065510f846642098bc0182bc517324b88d175f0c
    Reviewed-on: http://gerrit.cloudera.org:8080/20789
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Tested-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/rpc/acceptor_pool.cc | 73 +++++++++++++++++++++----------------------
 src/kudu/rpc/acceptor_pool.h  | 12 +++----
 2 files changed, 42 insertions(+), 43 deletions(-)

diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 84fbd1872..a753f34f8 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -20,12 +20,14 @@
 #include <functional>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
@@ -37,16 +39,6 @@
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
-namespace google {
-namespace protobuf {
-
-class Message;
-
-}
-}
-
-using google::protobuf::Message;
-using std::string;
 
 METRIC_DEFINE_counter(server, rpc_connections_accepted,
                       "RPC Connections Accepted",
@@ -69,11 +61,15 @@ DEFINE_int32(rpc_acceptor_listen_backlog, 128,
              "new inbound connection requests.");
 TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
 
+using std::string;
+using strings::Substitute;
+
 namespace kudu {
 namespace rpc {
 
-AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket,
-                           Sockaddr bind_address)
+AcceptorPool::AcceptorPool(Messenger* messenger,
+                           Socket* socket,
+                           const Sockaddr& bind_address)
     : messenger_(messenger),
       socket_(socket->Release()),
       bind_address_(bind_address),
@@ -92,22 +88,23 @@ Status AcceptorPool::Start(int num_threads) {
   RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
 
   for (int i = 0; i < num_threads; i++) {
-    scoped_refptr<kudu::Thread> new_thread;
-    Status s = kudu::Thread::Create("acceptor pool", "acceptor",
-                                    [this]() { this->RunThread(); }, 
&new_thread);
-    if (!s.ok()) {
+    scoped_refptr<Thread> new_thread;
+    Status s = Thread::Create("acceptor pool", "acceptor",
+                              [this]() { this->RunThread(); }, &new_thread);
+    if (PREDICT_FALSE(!s.ok())) {
       Shutdown();
       return s;
     }
-    threads_.push_back(new_thread);
+    threads_.emplace_back(std::move(new_thread));
   }
   return Status::OK();
 }
 
 void AcceptorPool::Shutdown() {
-  if (Acquire_CompareAndSwap(&closing_, false, true) != false) {
-    VLOG(2) << "Acceptor Pool on " << bind_address_.ToString()
-            << " already shut down";
+  bool is_shut_down = false;
+  if (!closing_.compare_exchange_strong(is_shut_down, true)) {
+    VLOG(2) << Substitute("AcceptorPool on $0 already shut down",
+                          bind_address_.ToString());
     return;
   }
 
@@ -115,18 +112,18 @@ void AcceptorPool::Shutdown() {
   // Closing the socket will break us out of accept() if we're in it, and
   // prevent future accepts.
   WARN_NOT_OK(socket_.Shutdown(true, true),
-              strings::Substitute("Could not shut down acceptor socket on $0",
-                                  bind_address_.ToString()));
+              Substitute("Could not shut down acceptor socket on $0",
+                         bind_address_.ToString()));
 #else
   // Calling shutdown on an accepting (non-connected) socket is illegal on most
   // platforms (but not Linux). Instead, the accepting threads are interrupted
   // forcefully.
-  for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+  for (const auto& thread : threads_) {
     pthread_cancel(thread.get()->pthread_id());
   }
 #endif
 
-  for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+  for (const auto& thread : threads_) {
     CHECK_OK(ThreadJoiner(thread.get()).Join());
   }
   threads_.clear();
@@ -158,30 +155,32 @@ void AcceptorPool::RunThread() {
   while (true) {
     Socket new_sock;
     Sockaddr remote;
-    VLOG(2) << "calling accept() on socket " << socket_.GetFd()
-            << " listening on " << bind_address_.ToString();
-    Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
-    if (!s.ok()) {
-      if (Release_Load(&closing_)) {
+    VLOG(2) << Substitute("calling accept() on socket $0 listening on $1",
+                          socket_.GetFd(), bind_address_.ToString());
+    const auto s = socket_.Accept(&new_sock, &remote, 
Socket::FLAG_NONBLOCKING);
+    if (PREDICT_FALSE(!s.ok())) {
+      if (closing_) {
         break;
       }
-      KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " << 
s.ToString()
-                                    << THROTTLE_MSG;
+      KLOG_EVERY_N_SECS(WARNING, 1)
+          << Substitute("AcceptorPool: accept() failed: $0", s.ToString())
+          << THROTTLE_MSG;
       continue;
     }
     if (remote.is_ip()) {
-      s = new_sock.SetNoDelay(true);
-      if (!s.ok()) {
-        KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " << 
remote.ToString()
-                                      << " failed to set TCP_NODELAY on a 
newly accepted socket: "
-                                      << s.ToString() << THROTTLE_MSG;
+      if (auto s = new_sock.SetNoDelay(true); PREDICT_FALSE(!s.ok())) {
+        KLOG_EVERY_N_SECS(WARNING, 1)
+            << Substitute("unable to set TCP_NODELAY on newly accepted "
+                          "connection from $0: $1",
+                          remote.ToString(), s.ToString())
+            << THROTTLE_MSG;
         continue;
       }
     }
     rpc_connections_accepted_->Increment();
     messenger_->RegisterInboundSocket(&new_sock, remote);
   }
-  VLOG(1) << "AcceptorPool shutting down.";
+  VLOG(1) << "AcceptorPool shutting down";
 }
 
 } // namespace rpc
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 6e5ae3587..aa00df66f 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -16,10 +16,10 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <vector>
 
-#include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/net/sockaddr.h"
@@ -43,7 +43,7 @@ class AcceptorPool {
   // Create a new acceptor pool.  Calls socket::Release to take ownership of 
the
   // socket.
   // 'socket' must be already bound, but should not yet be listening.
-  AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address);
+  AcceptorPool(Messenger* messenger, Socket* socket, const Sockaddr& 
bind_address);
   ~AcceptorPool();
 
   // Start listening and accepting connections.
@@ -65,14 +65,14 @@ class AcceptorPool {
  private:
   void RunThread();
 
-  Messenger *messenger_;
+  Messenger* messenger_;
   Socket socket_;
-  Sockaddr bind_address_;
-  std::vector<scoped_refptr<kudu::Thread> > threads_;
+  const Sockaddr bind_address_;
+  std::vector<scoped_refptr<Thread>> threads_;
 
   scoped_refptr<Counter> rpc_connections_accepted_;
 
-  Atomic32 closing_;
+  std::atomic<bool> closing_;
 
   DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
 };

Reply via email to