This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 79a89c1d7 [rpc] fix rare race in GetPendingConnectionsNum
79a89c1d7 is described below

commit 79a89c1d70d4a8b4da4bda7735f6ac2f19b2626f
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Thu Mar 28 18:51:27 2024 -0700

    [rpc] fix rare race in GetPendingConnectionsNum
    
    A Kudu server might start its shutdown sequence while other thread
    is collecting the server's metrics. If that happens, a data race might
    manifest itself while fetching the 'rpc_pending_connections' metric.
    Running one of the tests under TSAN reproduced such a race with
    the report below.
    
    This patch addresses the data race issue.
    
    In addition, I took the liberty of optimizing the instantiation
    and initialization of DiagnosticSocket instances used to retrieve the
    information on number of pending RPC connections, so now the diagnostic
    sockets are instantiated and initialized once per AcceptorPool instance.
    
    This is a follow-up to c0c44a8acd8d6366987af687d5665f751249a95a.
    
      WARNING: ThreadSanitizer: data race
        Read of size 8 at 0x7b4c00002f78 by thread T63 (mutexes: write 
M558018781209703984):
          #0 std::__1::vector<std::__1::shared_ptr<kudu::rpc::AcceptorPool>, 
std::__1::allocator<std::__1::shared_ptr<kudu::rpc::AcceptorPool> > >::begin() 
thirdparty/installed/tsan/include/c++/v1/vector:1520:30 (libkrpc.so+0x1642b9)
          #1 kudu::rpc::Messenger::GetPendingConnectionsNum() 
src/kudu/rpc/messenger.cc:171:22 (libkrpc.so+0x15f6fb)
          ...
          #14 kudu::MetricRegistry::WriteAsJson(kudu::JsonWriter*, 
kudu::MetricJsonOptions const&) const src/kudu/util/metrics.cc:566:7 
(libkudu_util.so+0x3ab82c)
          ...
          #17 kudu::server::DiagnosticsLog::Start()::$_0::operator()() const 
src/kudu/server/diagnostics_log.cc:145:46 (libserver_process.so+0x118361)
          ...
    
        Previous write of size 8 at 0x7b4c00002f78 by main thread (mutexes: 
write M4638925457023032):
          #0 memset sanitizer_common/sanitizer_common_interceptors.inc:780:3 
(kudu+0x454d16)
          #1 memset sanitizer_common/sanitizer_common_interceptors.inc:778:1 
(kudu+0x454d16)
          #2 std::__1::vector<std::__1::shared_ptr<kudu::rpc::AcceptorPool>, 
std::__1::allocator<std::__1::shared_ptr<kudu::rpc::AcceptorPool> > 
>::__move_assign(std::__1::vector<std::__1::shared_ptr<kudu::rpc::AcceptorPool>,
 std::__1::allocator<std::__1::shared_ptr<kudu::rpc::AcceptorPool> > >&, 
std::__1::integral_constant<bool, true>) 
thirdparty/installed/tsan/include/c++/v1/vector:1392:18 (libkrpc.so+0x16a840)
          ...
          #4 
kudu::rpc::Messenger::ShutdownInternal(kudu::rpc::Messenger::ShutdownMode) 
src/kudu/rpc/messenger.cc:213:23 (libkrpc.so+0x15f509)
          ...
    
    Change-Id: I6aaf3373944eac86664ac62db3b7e6151c874539
    Reviewed-on: http://gerrit.cloudera.org:8080/21224
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com>
---
 src/kudu/rpc/acceptor_pool.cc          | 10 ++++++----
 src/kudu/rpc/acceptor_pool.h           |  2 ++
 src/kudu/rpc/messenger.cc              | 35 +++++++++++++++++++++++++---------
 src/kudu/util/net/diagnostic_socket.cc |  5 +++--
 src/kudu/util/net/diagnostic_socket.h  |  4 ++--
 5 files changed, 39 insertions(+), 17 deletions(-)

diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index a5038526e..63748604d 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -155,6 +155,9 @@ AcceptorPool::~AcceptorPool() {
 
 Status AcceptorPool::Start(int num_threads) {
   RETURN_NOT_OK(socket_.Listen(listen_backlog_));
+#if defined(__linux__)
+  WARN_NOT_OK(diag_socket_.Init(), "could not initialize diagnostic socket");
+#endif
 
   for (int i = 0; i < num_threads; i++) {
     scoped_refptr<Thread> new_thread;
@@ -197,6 +200,8 @@ void AcceptorPool::Shutdown() {
   }
   threads_.clear();
 
+  WARN_NOT_OK(diag_socket_.Close(), "error closing diagnostic socket");
+
   // Close the socket: keeping the descriptor open and, possibly, receiving 
late
   // not-to-be-read messages from the peer does not make much sense. The
   // Socket::Close() method is called upon destruction of the aggregated 
socket_
@@ -221,11 +226,8 @@ int64_t AcceptorPool::num_rpc_connections_accepted() const 
{
 }
 
 Status AcceptorPool::GetPendingConnectionsNum(uint32_t* result) const {
-  DiagnosticSocket ds;
-  RETURN_NOT_OK(ds.Init());
-
   DiagnosticSocket::TcpSocketInfo info;
-  RETURN_NOT_OK(ds.Query(socket_, &info));
+  RETURN_NOT_OK(diag_socket_.Query(socket_, &info));
   *result = info.rx_queue_size;
 
   return Status::OK();
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 91ebe3b56..3ffcfe5e7 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -22,6 +22,7 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/util/net/diagnostic_socket.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/status.h"
@@ -83,6 +84,7 @@ class AcceptorPool {
   const Sockaddr bind_address_;
   const int listen_backlog_;
   std::vector<scoped_refptr<Thread>> threads_;
+  DiagnosticSocket diag_socket_;
 
   std::atomic<bool> closing_;
 
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 663a27cb3..24da3f0e8 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -166,11 +166,23 @@ void Messenger::AllExternalReferencesDropped() {
 }
 
 int32_t Messenger::GetPendingConnectionsNum() {
+  // This method might be called when the messenger is shutting down;
+  // making a copy of acceptor_pools_ is necessary to avoid data races.
+  decltype(acceptor_pools_) acceptor_pools;
+  {
+    std::lock_guard<percpu_rwlock> guard(lock_);
+    if (state_ == kClosing) {
+      return -1;
+    }
+    acceptor_pools.reserve(acceptor_pools_.size());
+    acceptor_pools = acceptor_pools_;
+  }
+
   auto pool_reports_num = 0;
   int32_t total_count = 0;
-  for (const auto& p : acceptor_pools_) {
+  for (const auto& p : acceptor_pools) {
     uint32_t count;
-    if (auto s = p->GetPendingConnectionsNum(&count); !s.ok()) {
+    if (auto s = p->GetPendingConnectionsNum(&count); PREDICT_FALSE(!s.ok())) {
       KLOG_EVERY_N_SECS(WARNING, 60) << Substitute(
           "$0: no data on pending connections for acceptor pool at $1",
           s.ToString(), p->bind_address().ToString()) << THROTTLE_MSG;
@@ -255,14 +267,19 @@ Status Messenger::AddAcceptorPool(const Sockaddr& 
accept_addr,
     acceptor_pools_.emplace_back(std::make_shared<AcceptorPool>(
         this, &sock, addr, acceptor_listen_backlog_));
     *pool = acceptor_pools_.back();
-  }
 
-  // 'rpc_pending_connections' metric is instantiated only when a messenger
-  // contains at least one acceptor pool. So, this metric is instantiated
-  // only for a server-side messenger.
-  METRIC_rpc_pending_connections.InstantiateFunctionGauge(
-      metric_entity_, [this]() { return this->GetPendingConnectionsNum(); })->
-      AutoDetachToLastValue(&metric_detacher_);
+#if defined(__linux__)
+    if (acceptor_pools_.size() == 1) {
+      // 'rpc_pending_connections' metric is instantiated when the messenger
+      // contains exactly one acceptor pool: this metric makes sense
+      // only for server-side messengers, and it's enough to instantiate the
+      // metric only once.
+      METRIC_rpc_pending_connections.InstantiateFunctionGauge(
+          metric_entity_, [this]() { return this->GetPendingConnectionsNum(); 
})->
+          AutoDetachToLastValue(&metric_detacher_);
+    }
+#endif // #if defined(__linux__) ...
+  }
 
   return Status::OK();
 }
diff --git a/src/kudu/util/net/diagnostic_socket.cc 
b/src/kudu/util/net/diagnostic_socket.cc
index 17ac931e8..6c3ad3260 100644
--- a/src/kudu/util/net/diagnostic_socket.cc
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -113,7 +113,7 @@ Status DiagnosticSocket::Close() {
 Status DiagnosticSocket::Query(const Sockaddr& socket_src_addr,
                                const Sockaddr& socket_dst_addr,
                                const vector<SocketState>& socket_states,
-                               vector<TcpSocketInfo>* info) {
+                               vector<TcpSocketInfo>* info) const {
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
@@ -134,7 +134,8 @@ Status DiagnosticSocket::Query(const Sockaddr& 
socket_src_addr,
 #endif // #if !defined(__linux__) ... #else ...
 }
 
-Status DiagnosticSocket::Query(const Socket& socket, TcpSocketInfo* info) {
+Status DiagnosticSocket::Query(const Socket& socket,
+                               TcpSocketInfo* info) const {
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
diff --git a/src/kudu/util/net/diagnostic_socket.h 
b/src/kudu/util/net/diagnostic_socket.h
index 20877284f..84ff6f293 100644
--- a/src/kudu/util/net/diagnostic_socket.h
+++ b/src/kudu/util/net/diagnostic_socket.h
@@ -99,12 +99,12 @@ class DiagnosticSocket final {
   Status Query(const Sockaddr& socket_src_addr,
                const Sockaddr& socket_dst_addr,
                const std::vector<SocketState>& socket_states,
-               std::vector<TcpSocketInfo>* info);
+               std::vector<TcpSocketInfo>* info) const;
 
   // Get diagnostic information on the specified socket. This is a handy
   // shortcut to the Query() method above for a single active socket in the
   // SS_ESTABLISHED or SS_LISTEN.
-  Status Query(const Socket& socket, TcpSocketInfo* info);
+  Status Query(const Socket& socket, TcpSocketInfo* info) const;
 
  private:
   // Build and send netlink request, writing it into the diagnostic socket.

Reply via email to