This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new a7e0a36dd [rpc] modernize AcceptorPool's code
a7e0a36dd is described below
commit a7e0a36dda2fd2c0545fa8f00f4bc1711059224d
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Dec 13 10:35:19 2023 -0800
[rpc] modernize AcceptorPool's code
Since I'm modifying AcceptorPool implementation to add a new metric,
I went ahead and modernized the code a bit to logically separate
these mostly style-related updates from follow-up modifications.
This patch:
* replaces Atomic32 field with std::atomic<bool>
* updates the code to make it conform to the project style guide
* other minor improvements
There are no functional modifications in this changelist.
Change-Id: I065510f846642098bc0182bc517324b88d175f0c
Reviewed-on: http://gerrit.cloudera.org:8080/20789
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Abhishek Chennaka <[email protected]>
---
src/kudu/rpc/acceptor_pool.cc | 73 +++++++++++++++++++++----------------------
src/kudu/rpc/acceptor_pool.h | 12 +++----
2 files changed, 42 insertions(+), 43 deletions(-)
diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index 84fbd1872..a753f34f8 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -20,12 +20,14 @@
#include <functional>
#include <ostream>
#include <string>
+#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
@@ -37,16 +39,6 @@
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
-namespace google {
-namespace protobuf {
-
-class Message;
-
-}
-}
-
-using google::protobuf::Message;
-using std::string;
METRIC_DEFINE_counter(server, rpc_connections_accepted,
"RPC Connections Accepted",
@@ -69,11 +61,15 @@ DEFINE_int32(rpc_acceptor_listen_backlog, 128,
"new inbound connection requests.");
TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
+using std::string;
+using strings::Substitute;
+
namespace kudu {
namespace rpc {
-AcceptorPool::AcceptorPool(Messenger* messenger, Socket* socket,
- Sockaddr bind_address)
+AcceptorPool::AcceptorPool(Messenger* messenger,
+ Socket* socket,
+ const Sockaddr& bind_address)
: messenger_(messenger),
socket_(socket->Release()),
bind_address_(bind_address),
@@ -92,22 +88,23 @@ Status AcceptorPool::Start(int num_threads) {
RETURN_NOT_OK(socket_.Listen(FLAGS_rpc_acceptor_listen_backlog));
for (int i = 0; i < num_threads; i++) {
- scoped_refptr<kudu::Thread> new_thread;
- Status s = kudu::Thread::Create("acceptor pool", "acceptor",
- [this]() { this->RunThread(); },
&new_thread);
- if (!s.ok()) {
+ scoped_refptr<Thread> new_thread;
+ Status s = Thread::Create("acceptor pool", "acceptor",
+ [this]() { this->RunThread(); }, &new_thread);
+ if (PREDICT_FALSE(!s.ok())) {
Shutdown();
return s;
}
- threads_.push_back(new_thread);
+ threads_.emplace_back(std::move(new_thread));
}
return Status::OK();
}
void AcceptorPool::Shutdown() {
- if (Acquire_CompareAndSwap(&closing_, false, true) != false) {
- VLOG(2) << "Acceptor Pool on " << bind_address_.ToString()
- << " already shut down";
+ bool is_shut_down = false;
+ if (!closing_.compare_exchange_strong(is_shut_down, true)) {
+ VLOG(2) << Substitute("AcceptorPool on $0 already shut down",
+ bind_address_.ToString());
return;
}
@@ -115,18 +112,18 @@ void AcceptorPool::Shutdown() {
// Closing the socket will break us out of accept() if we're in it, and
// prevent future accepts.
WARN_NOT_OK(socket_.Shutdown(true, true),
- strings::Substitute("Could not shut down acceptor socket on $0",
- bind_address_.ToString()));
+ Substitute("Could not shut down acceptor socket on $0",
+ bind_address_.ToString()));
#else
// Calling shutdown on an accepting (non-connected) socket is illegal on most
// platforms (but not Linux). Instead, the accepting threads are interrupted
// forcefully.
- for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+ for (const auto& thread : threads_) {
pthread_cancel(thread.get()->pthread_id());
}
#endif
- for (const scoped_refptr<kudu::Thread>& thread : threads_) {
+ for (const auto& thread : threads_) {
CHECK_OK(ThreadJoiner(thread.get()).Join());
}
threads_.clear();
@@ -158,30 +155,32 @@ void AcceptorPool::RunThread() {
while (true) {
Socket new_sock;
Sockaddr remote;
- VLOG(2) << "calling accept() on socket " << socket_.GetFd()
- << " listening on " << bind_address_.ToString();
- Status s = socket_.Accept(&new_sock, &remote, Socket::FLAG_NONBLOCKING);
- if (!s.ok()) {
- if (Release_Load(&closing_)) {
+ VLOG(2) << Substitute("calling accept() on socket $0 listening on $1",
+ socket_.GetFd(), bind_address_.ToString());
+ const auto s = socket_.Accept(&new_sock, &remote,
Socket::FLAG_NONBLOCKING);
+ if (PREDICT_FALSE(!s.ok())) {
+ if (closing_) {
break;
}
- KLOG_EVERY_N_SECS(WARNING, 1) << "AcceptorPool: accept failed: " <<
s.ToString()
- << THROTTLE_MSG;
+ KLOG_EVERY_N_SECS(WARNING, 1)
+ << Substitute("AcceptorPool: accept() failed: $0", s.ToString())
+ << THROTTLE_MSG;
continue;
}
if (remote.is_ip()) {
- s = new_sock.SetNoDelay(true);
- if (!s.ok()) {
- KLOG_EVERY_N_SECS(WARNING, 1) << "Acceptor with remote = " <<
remote.ToString()
- << " failed to set TCP_NODELAY on a
newly accepted socket: "
- << s.ToString() << THROTTLE_MSG;
+ if (auto s = new_sock.SetNoDelay(true); PREDICT_FALSE(!s.ok())) {
+ KLOG_EVERY_N_SECS(WARNING, 1)
+ << Substitute("unable to set TCP_NODELAY on newly accepted "
+ "connection from $0: $1",
+ remote.ToString(), s.ToString())
+ << THROTTLE_MSG;
continue;
}
}
rpc_connections_accepted_->Increment();
messenger_->RegisterInboundSocket(&new_sock, remote);
}
- VLOG(1) << "AcceptorPool shutting down.";
+ VLOG(1) << "AcceptorPool shutting down";
}
} // namespace rpc
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 6e5ae3587..aa00df66f 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -16,10 +16,10 @@
// under the License.
#pragma once
+#include <atomic>
#include <cstdint>
#include <vector>
-#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/net/sockaddr.h"
@@ -43,7 +43,7 @@ class AcceptorPool {
// Create a new acceptor pool. Calls socket::Release to take ownership of
the
// socket.
// 'socket' must be already bound, but should not yet be listening.
- AcceptorPool(Messenger *messenger, Socket *socket, Sockaddr bind_address);
+ AcceptorPool(Messenger* messenger, Socket* socket, const Sockaddr&
bind_address);
~AcceptorPool();
// Start listening and accepting connections.
@@ -65,14 +65,14 @@ class AcceptorPool {
private:
void RunThread();
- Messenger *messenger_;
+ Messenger* messenger_;
Socket socket_;
- Sockaddr bind_address_;
- std::vector<scoped_refptr<kudu::Thread> > threads_;
+ const Sockaddr bind_address_;
+ std::vector<scoped_refptr<Thread>> threads_;
scoped_refptr<Counter> rpc_connections_accepted_;
- Atomic32 closing_;
+ std::atomic<bool> closing_;
DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
};