This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c0c44a8ac [rpc] introduce rpc_listened_socket_rx_queue_size metric
c0c44a8ac is described below

commit c0c44a8acd8d6366987af687d5665f751249a95a
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Tue Jan 16 14:19:47 2024 -0800

    [rpc] introduce rpc_listened_socket_rx_queue_size metric
    
    This patch introduces a new 'rpc_listened_socket_rx_queue_size'
    histogram metric for AcceptorPool.  The metric allows for tracking
    the size of the listened RPC socket's RX queue.  The new metric
    shows meaningful numbers only on Linux since it's based on the
    DiagnosticSocket, where the latter is implemented only on Linux
    as of now.
    
    The new metric is sampled by each acceptor thread when accepting
    an RPC connection.  It's possible to change the frequency of the
    sampling (completely disabling it, if necessary) by tuning the
    --rpc_listen_socket_stats_every_log2 flag.
    
    I added basic tests scenarios to cover the newly introduced
    functionality.
    
    In addition, an extra performance test scenario has been added into
    rpc-bench.  The new scenario is to measure an extra latency introduced
    by capturing diagnostic snapshots on the listening RPC socket. Some
    results are below (3 measurement at each setting), and based on these
    I think that setting --rpc_listen_socket_stats_every_log2=3 by default
    makes sense: this about 1.2us of latency in average per RPC request.
    The numbers are in microseconds, so the overall latency of handling
    RPC requests and the sustainable RPC rate do not seem to be adversely
    affected by this patch.  If anybody finds otherwise, they can always
    set --rpc_listen_socket_stats_every_log2=-1 for their Kudu cluster
    if they don't care about the listening socket's backlog stats.
    
    The results below have been captured when running the command below
    for N in the set of { -1, 0, 3, 5 }:
      ./rpc-bench --gtest_filter='*RpcAcceptorBench*' \
          --client_threads=2 \
          --rpc_listen_socket_stats_every_log2=N
    
      -----------------------------------------------------------------------
    
      collecting diagnostics on the listening RPC socket ... is disabled
      Dispatched 99851 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 48 average 2.4426495478262611
    
      Dispatched 98651 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 54 average 2.4904663916229941
    
      Dispatched 99200 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 53 average 2.4747076612903225
    
      -----------------------------------------------------------------------
    
      collecting diagnostics on the listening RPC socket ... every 1 
connection(s)
      Dispatched 65383 connection requests in 1 seconds
      Request dispatching time (us): min 6 max 208 average 11.071424201639545
    
      Dispatched 65162 connection requests in 1 seconds
      Request dispatching time (us): min 6 max 392 average 11.258256092320913
    
      Dispatched 65428 connection requests in 1 seconds
      Request dispatching time (us): min 6 max 290 average 11.209445208619899
    
      -----------------------------------------------------------------------
    
      collecting diagnostics on the listening RPC socket ... every 8 
connection(s)
      Dispatched 99902 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 148 average 3.628295729815219
    
      Dispatched 98139 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 101 average 3.6546429044518489
    
      Dispatched 101681 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 98 average 3.549552030369489
    
      -----------------------------------------------------------------------
    
      collecting diagnostics on the listening RPC socket ... every 32 
connection(s)
      Dispatched 100832 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 114 average 2.727457553157727
    
      Dispatched 100214 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 71 average 2.8103658171512964
    
      Dispatched 99106 connection requests in 1 seconds
      Request dispatching time (us): min 0 max 52 average 2.7968841442495913
    
    Change-Id: I83580659bac39d9171f1ee0d0e88676ed0d50b99
    Reviewed-on: http://gerrit.cloudera.org:8080/20908
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Yingchun Lai <laiyingc...@apache.org>
---
 src/kudu/rpc/acceptor_pool.cc          |  81 ++++++++++++-
 src/kudu/rpc/acceptor_pool.h           |   1 +
 src/kudu/rpc/negotiation.cc            |   9 +-
 src/kudu/rpc/rpc-bench.cc              | 103 +++++++++++++++-
 src/kudu/rpc/rpc-test-base.h           |  14 ++-
 src/kudu/rpc/rpc-test.cc               | 215 ++++++++++++++++++++++++++++++++-
 src/kudu/util/CMakeLists.txt           |   5 +-
 src/kudu/util/net/diagnostic_socket.cc |  35 +++++-
 src/kudu/util/net/diagnostic_socket.h  |   4 +
 src/kudu/util/net/socket.cc            |  22 ++++
 src/kudu/util/net/socket.h             |  24 ++++
 11 files changed, 500 insertions(+), 13 deletions(-)

diff --git a/src/kudu/rpc/acceptor_pool.cc b/src/kudu/rpc/acceptor_pool.cc
index c20807a93..fca0f1e85 100644
--- a/src/kudu/rpc/acceptor_pool.cc
+++ b/src/kudu/rpc/acceptor_pool.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/net/diagnostic_socket.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -68,6 +69,14 @@ METRIC_DEFINE_histogram(server, acceptor_dispatch_times,
                         kudu::MetricLevel::kInfo,
                         1000000, 2);
 
+METRIC_DEFINE_histogram(server, rpc_listen_socket_rx_queue_size,
+                        "Listening RPC Socket Backlog",
+                        kudu::MetricUnit::kEntries,
+                        "A histogram of the pending connections queue size for 
"
+                        "the listening RPC socket that this acceptor pool 
serves.",
+                        kudu::MetricLevel::kInfo,
+                        1000000, 2);
+
 DEFINE_int32(rpc_acceptor_listen_backlog,
              kudu::rpc::AcceptorPool::kDefaultListenBacklog,
              "Socket backlog parameter used when listening for RPC 
connections. "
@@ -81,7 +90,16 @@ DEFINE_int32(rpc_acceptor_listen_backlog,
              "the server ride over bursts of new inbound connection 
requests.");
 TAG_FLAG(rpc_acceptor_listen_backlog, advanced);
 
+DEFINE_int32(rpc_listen_socket_stats_every_log2, 3,
+             "Listening RPC socket's statistics sampling frequency. With "
+             "--rpc_listen_socket_stats_every_log2=N, the statistics are "
+             "sampled every 2^N connection request by each acceptor thread. "
+             "Set this flag to -1 to disable statistics collection on "
+             "the listening RPC socket.");
+TAG_FLAG(rpc_listen_socket_stats_every_log2, advanced);
+
 namespace {
+
 bool ValidateListenBacklog(const char* flagname, int value) {
   if (value >= -1) {
     return true;
@@ -92,8 +110,21 @@ bool ValidateListenBacklog(const char* flagname, int value) 
{
       "capped at the system-wide limit", value, flagname);
   return false;
 }
+
+bool ValidateStatsCollectionFrequency(const char* flagname, int value) {
+  if (value < 64) {
+    return true;
+  }
+  LOG(ERROR) << Substitute("$0: invalid setting for $1; must be less than 64",
+                           value, flagname);
+  return false;
+}
+
 } // anonymous namespace
+
 DEFINE_validator(rpc_acceptor_listen_backlog, &ValidateListenBacklog);
+DEFINE_validator(rpc_listen_socket_stats_every_log2,
+                 &ValidateStatsCollectionFrequency);
 
 
 namespace kudu {
@@ -114,6 +145,8 @@ AcceptorPool::AcceptorPool(Messenger* messenger,
       : METRIC_rpc_connections_accepted_unix_domain_socket;
   rpc_connections_accepted_ = connections_accepted.Instantiate(metric_entity);
   dispatch_times_ = METRIC_acceptor_dispatch_times.Instantiate(metric_entity);
+  listen_socket_queue_size_ =
+      METRIC_rpc_listen_socket_rx_queue_size.Instantiate(metric_entity);
 }
 
 AcceptorPool::~AcceptorPool() {
@@ -190,6 +223,34 @@ int64_t AcceptorPool::num_rpc_connections_accepted() const 
{
 void AcceptorPool::RunThread() {
   const int64_t kCyclesPerSecond = 
static_cast<int64_t>(base::CyclesPerSecond());
 
+  // Fetch and keep the information on the listening socket's address to avoid
+  // re-fetching it every time when accepting a new connection.
+  // The diagnostic socket is needed to fetch information on the RX queue size.
+  Sockaddr cur_addr;
+  WARN_NOT_OK(socket_.GetSocketAddress(&cur_addr),
+              "unable to get address info on RPC socket");
+  const auto& cur_addr_str = cur_addr.ToString();
+
+  DiagnosticSocket ds;
+  const int ds_query_freq_log2 = FLAGS_rpc_listen_socket_stats_every_log2;
+  const bool ds_query_enabled = (ds_query_freq_log2 >= 0);
+  const uint64_t ds_query_freq_mask =
+      ds_query_enabled ? (1ULL << ds_query_freq_log2) - 1 : 0;
+  if (ds_query_enabled) {
+    if (const auto s = ds.Init(); s.ok()) {
+      LOG(INFO) << Substitute(
+          "collecting diagnostics on the listening RPC socket $0 "
+          "every $1 connection(s)", cur_addr_str, ds_query_freq_mask + 1);
+    } else {
+      WARN_NOT_OK(s, "unable to open diagnostic socket");
+    }
+  } else {
+    LOG(INFO) << Substitute(
+        "collecting diagnostics on the listening RPC socket $0 is disabled",
+        cur_addr_str);
+  }
+
+  uint64_t counter = 0;
   while (true) {
     Socket new_sock;
     Sockaddr remote;
@@ -197,11 +258,29 @@ void AcceptorPool::RunThread() {
                           socket_.GetFd(), bind_address_.ToString());
     const auto s = socket_.Accept(&new_sock, &remote, 
Socket::FLAG_NONBLOCKING);
     const auto accepted_at = CycleClock::Now();
+
+    if (ds_query_enabled && ds.IsInitialized() &&
+        (counter & ds_query_freq_mask) == ds_query_freq_mask) {
+      VLOG(2) << "getting stats on the listening socket";
+      // Once removing an element from the pending connections queue
+      // (a.k.a. listen backlog), collect information on the number
+      // of connections in the queue still waiting to be accepted.
+      DiagnosticSocket::TcpSocketInfo info;
+      if (auto s = ds.Query(socket_, &info); PREDICT_TRUE(s.ok())) {
+        listen_socket_queue_size_->Increment(info.rx_queue_size);
+      } else if (!closing_) {
+        KLOG_EVERY_N_SECS(WARNING, 60)
+            << Substitute("unable to collect diagnostics on RPC socket $0: $1",
+                          cur_addr_str, s.ToString());
+      }
+    }
+    ++counter;
+
     const auto dispatch_times_recorder = MakeScopedCleanup([&]() {
       // The timings are captured for both success and failure paths, so the
       // 'dispatch_times_' histogram accounts for all the connection attempts
       // that lead to successfully extracting an item from the queue of pending
-      // connections for the listened RPC socket. Meanwhile, the
+      // connections for the listening RPC socket. Meanwhile, the
       // 'rpc_connection_accepted_' counter accounts only for connections that
       // were successfully dispatched to the messenger for further processing.
       dispatch_times_->Increment(
diff --git a/src/kudu/rpc/acceptor_pool.h b/src/kudu/rpc/acceptor_pool.h
index 50e15e201..440c81903 100644
--- a/src/kudu/rpc/acceptor_pool.h
+++ b/src/kudu/rpc/acceptor_pool.h
@@ -84,6 +84,7 @@ class AcceptorPool {
   // Metrics.
   scoped_refptr<Counter> rpc_connections_accepted_;
   scoped_refptr<Histogram> dispatch_times_;
+  scoped_refptr<Histogram> listen_socket_queue_size_;
 
   DISALLOW_COPY_AND_ASSIGN(AcceptorPool);
 };
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index 2c0775a5f..1796d8170 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -72,6 +72,12 @@ DEFINE_bool(rpc_encrypt_loopback_connections, false,
             "an attacker.");
 TAG_FLAG(rpc_encrypt_loopback_connections, advanced);
 
+DEFINE_bool(rpc_suppress_negotiation_trace, false,
+            "Whether to suppress all negotiation traces: do not dump trace "
+            "of a connection negotiation into the log, even for a failed one. "
+            "For testing only!");
+TAG_FLAG(rpc_suppress_negotiation_trace, unsafe);
+
 using kudu::security::RpcAuthentication;
 using kudu::security::RpcEncryption;
 using std::string;
@@ -327,7 +333,8 @@ void Negotiation::RunNegotiation(const 
scoped_refptr<Connection>& conn,
       (s.IsNetworkError() && s.posix_code() == ECONNREFUSED) ||
       s.IsNotAuthorized());
 
-  if (is_bad || FLAGS_rpc_trace_negotiation) {
+  if ((is_bad || FLAGS_rpc_trace_negotiation) &&
+      PREDICT_TRUE(!FLAGS_rpc_suppress_negotiation_trace)) {
     string msg = Trace::CurrentTrace()->DumpToString();
     if (is_bad) {
       LOG(WARNING) << "Failed RPC negotiation. Trace:\n" << msg;
diff --git a/src/kudu/rpc/rpc-bench.cc b/src/kudu/rpc/rpc-bench.cc
index dca665e6b..6b0d4bfd5 100644
--- a/src/kudu/rpc/rpc-bench.cc
+++ b/src/kudu/rpc/rpc-bench.cc
@@ -15,12 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -29,8 +32,10 @@
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/rpc/messenger.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h" // IWYU pragma: keep
 #include "kudu/rpc/rpc-test-base.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rtest.pb.h"
@@ -40,16 +45,19 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using std::atomic;
 using std::shared_ptr;
 using std::string;
 using std::thread;
 using std::unique_ptr;
 using std::vector;
+using strings::Substitute;
 
 DEFINE_int32(client_threads, 16,
              "Number of client threads. For the synchronous benchmark, each 
thread has "
@@ -64,14 +72,20 @@ DEFINE_int32(async_call_concurrency, 60,
 DEFINE_int32(worker_threads, 1,
              "Number of server worker threads");
 
+DEFINE_int32(acceptor_threads, 1,
+             "Number of threads in the messenger's acceptor pool");
+
 DEFINE_int32(server_reactors, 4,
              "Number of server reactor threads");
 
+DEFINE_bool(enable_encryption, false, "Whether to enable TLS encryption for 
rpc-bench");
+
 DEFINE_int32(run_seconds, 1, "Seconds to run the test");
 
 DECLARE_bool(rpc_encrypt_loopback_connections);
-DEFINE_bool(enable_encryption, false, "Whether to enable TLS encryption for 
rpc-bench");
+DECLARE_bool(rpc_suppress_negotiation_trace);
 
+METRIC_DECLARE_histogram(acceptor_dispatch_times);
 METRIC_DECLARE_histogram(reactor_load_percent);
 METRIC_DECLARE_histogram(reactor_active_latency_us);
 
@@ -286,6 +300,91 @@ TEST_F(RpcBench, BenchmarkCallsAsync) {
   SummarizePerf(sw.elapsed(), total_reqs, false);
 }
 
+class RpcAcceptorBench : public RpcTestBase {
+ protected:
+  RpcAcceptorBench()
+      : should_run_(true),
+        server_addr_(Sockaddr::Wildcard()) {
+    // Instantiate as many acceptor pool's threads as requested.
+    n_acceptor_pool_threads_ = FLAGS_acceptor_threads;
+
+    // This test connects to the RPC socket and immediately closes the
+    // connection once it has been established. However, the server side passes
+    // the accepted connection to the connection negotiation handlers.
+    // To make the server side to be able to accept as many connections as
+    // possible, make sure the acceptor isn't not going to receive the
+    // Status::ServiceUnavailable status because the negotiation thread pool is
+    // at capacity and not able to accept one more connection negotiation task.
+    // The connection negotiations tasks are very fast because they end up with
+    // an error when trying to operate on a connection that has been already
+    // closed at the client side.
+    n_negotiation_threads_ = 2 * FLAGS_client_threads;
+
+    // Suppress connection negotiation tracing to avoid flooding the output
+    // with the traces of failed RPC connection negotiation attempts. In this
+    // test, all the opened TCP connections are closed immediately by the
+    // client side.
+    FLAGS_rpc_suppress_negotiation_trace = true;
+  }
+
+  void SetUp() override {
+    NO_FATALS(RpcTestBase::SetUp());
+    ASSERT_OK(StartTestServer(&server_addr_));
+  }
+
+  atomic<bool> should_run_;
+  Sockaddr server_addr_;
+};
+
+TEST_F(RpcAcceptorBench, MeasureAcceptorDispatchTimes) {
+  const size_t threads_num = FLAGS_client_threads;
+
+  thread threads[threads_num];
+  Status status[threads_num];
+
+  for (auto i = 0; i < threads_num; ++i) {
+    auto* my_status = &status[i];
+    threads[i] = thread([this, my_status]() {
+      while (should_run_) {
+        Socket socket;
+        if (auto s = socket.Init(server_addr_.family(), /*flags=*/0);
+            PREDICT_FALSE(!s.ok())) {
+          *my_status = s;
+          return;
+        }
+        if (auto s = socket.SetLinger(true); PREDICT_FALSE(!s.ok())) {
+          *my_status = s;
+          return;
+        }
+        if (auto s = socket.Connect(server_addr_); PREDICT_FALSE(!s.ok())) {
+          *my_status = s;
+          return;
+        }
+      }
+    });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(FLAGS_run_seconds));
+  should_run_ = false;
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  for (auto i = 0; i < threads_num; ++i) {
+    SCOPED_TRACE(Substitute("thread idx $0", i));
+    ASSERT_OK(status[i]);
+  }
+
+  scoped_refptr<Histogram> t =
+      
METRIC_acceptor_dispatch_times.Instantiate(server_messenger_->metric_entity());
+  LOG(INFO) << Substitute("Dispatched $0 connection requests in $1 seconds",
+                          t->TotalCount(), FLAGS_run_seconds);
+  LOG(INFO) << Substitute(
+      "Request dispatching time (us): min $0 max $1 average $2",
+      t->MinValueForTests(), t->MaxValueForTests(), t->MeanValueForTests());
+}
+
 } // namespace rpc
 } // namespace kudu
 
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 9f8efa11a..ae3a83bac 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -420,6 +420,7 @@ class RpcTestBase : public KuduTest {
  public:
   RpcTestBase()
       : n_acceptor_pool_threads_(2),
+        n_negotiation_threads_(4),
         n_server_reactor_threads_(3),
         n_worker_threads_(3),
         keepalive_time_ms_(1000),
@@ -470,9 +471,13 @@ class RpcTestBase : public KuduTest {
     }
     bld.set_metric_entity(metric_entity_);
     bld.set_rpc_negotiation_timeout_ms(rpc_negotiation_timeout_ms_);
+    bld.set_min_negotiation_threads(n_negotiation_threads_);
+    bld.set_max_negotiation_threads(n_negotiation_threads_);
+
     std::string hostname;
     RETURN_NOT_OK(GetFQDN(&hostname));
     bld.set_hostname(hostname);
+
     return bld.Build(messenger);
   }
 
@@ -613,13 +618,15 @@ static void DoTestSidecar(Proxy* p, int size1, int size2) 
{
 
   // Start a simple socket listening on a local port, returning the address.
   // This isn't an RPC server -- just a plain socket which can be helpful for 
testing.
-  static Status StartFakeServer(Socket *listen_sock, Sockaddr *listen_addr) {
+  static Status StartFakeServer(Socket* listen_sock,
+                                Sockaddr* listen_addr,
+                                int listen_backlog = 1) {
     Sockaddr bind_addr = Sockaddr::Wildcard();
     bind_addr.set_port(0);
     RETURN_NOT_OK(listen_sock->Init(bind_addr.family(), 0));
-    RETURN_NOT_OK(listen_sock->BindAndListen(bind_addr, 1));
+    RETURN_NOT_OK(listen_sock->BindAndListen(bind_addr, listen_backlog));
     RETURN_NOT_OK(listen_sock->GetSocketAddress(listen_addr));
-    LOG(INFO) << "Bound to: " << listen_addr->ToString();
+    VLOG(1) << "Bound to: " << listen_addr->ToString();
     return Status::OK();
   }
 
@@ -670,6 +677,7 @@ static void DoTestSidecar(Proxy* p, int size1, int size2) {
 
  protected:
   int n_acceptor_pool_threads_;
+  int n_negotiation_threads_;
   int n_server_reactor_threads_;
   int n_worker_threads_;
   int keepalive_time_ms_;
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index afaf9b23c..f90130aaf 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -63,6 +63,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/diagnostic_socket.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/net/socket_info.pb.h"
@@ -86,8 +87,11 @@ 
METRIC_DECLARE_counter(timed_out_on_response_kudu_rpc_test_CalculatorService_Sle
 METRIC_DECLARE_histogram(acceptor_dispatch_times);
 
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_histogram(rpc_incoming_queue_time);
+METRIC_DECLARE_histogram(rpc_listen_socket_rx_queue_size);
 
 DECLARE_bool(rpc_reopen_outbound_connections);
+DECLARE_bool(rpc_suppress_negotiation_trace);
+DECLARE_int32(rpc_listen_socket_stats_every_log2);
 DECLARE_int32(rpc_negotiation_inject_delay_ms);
 DECLARE_int32(tcp_keepalive_probe_period_s);
 DECLARE_int32(tcp_keepalive_retry_period_s);
@@ -1438,7 +1442,7 @@ TEST_P(TestRpc, AcceptorDispatchingTimesMetric) {
 
   {
     Socket socket;
-    ASSERT_OK(socket.Init(server_addr.family(), 0));
+    ASSERT_OK(socket.Init(server_addr.family(), /*flags=*/0));
     ASSERT_OK(socket.Connect(server_addr));
   }
 
@@ -1842,5 +1846,214 @@ TEST_P(TestRpc, TestCallId) {
   }
 }
 
+#if defined(__linux__)
+// A test to verify collecting information on the RX queue size of a listening
+// socket using the DiagnosticSocket wrapper.
+class TestRpcSocketTxRxQueue : public TestRpc {
+ protected:
+  TestRpcSocketTxRxQueue() = default;
+
+  Status RunAndGetSocketInfo(int listen_backlog,
+                             size_t num_clients,
+                             DiagnosticSocket::TcpSocketInfo* info) {
+    // Limit the backlog for the socket being listened to.
+    Sockaddr s_addr = bind_addr();
+    Socket s_sock;
+    RETURN_NOT_OK(StartFakeServer(&s_sock, &s_addr, listen_backlog));
+
+    vector<shared_ptr<Messenger>> c_messengers(num_clients);
+    vector<unique_ptr<Proxy>> proxies(num_clients);
+    vector<AddRequestPB> requests(num_clients);
+    vector<AddResponsePB> responses(num_clients);
+    vector<RpcController> ctls(num_clients);
+
+    for (auto i = 0; i < num_clients; ++i) {
+      RETURN_NOT_OK(CreateMessenger("client" + std::to_string(i), 
&c_messengers[i]));
+      proxies[i].reset(new Proxy(c_messengers[i],
+                                 s_addr,
+                                 kRemoteHostName,
+                                 
GenericCalculatorService::static_service_name()));
+      requests[i].set_x(2 * i);
+      requests[i].set_y(2 * i + 1);
+      proxies[i]->AsyncRequest(GenericCalculatorService::kAddMethodName,
+                               requests[i],
+                               &responses[i],
+                               &ctls[i],
+                               []() {});
+    }
+
+    // Let the messengers to send connect() requests.
+    // TODO(aserbin): find a more reliable way to track this.
+    SleepFor(MonoDelta::FromMilliseconds(250));
+
+    DiagnosticSocket ds;
+    RETURN_NOT_OK(ds.Init());
+
+    DiagnosticSocket::TcpSocketInfo result;
+    RETURN_NOT_OK(ds.Query(s_sock, &result));
+    *info = result;
+
+    // Close the socket explicitly to allow the connecting clients receiving
+    // RST on the connection to end up the connection negotiation attempts 
fast.
+    return s_sock.Close();
+  }
+};
+// All the tests run without SSL on TCP sockets: TestRpcSocketTxRxQueue 
inherits
+// from TestRpc, and the latter is parameterized. Running with SSL doesn't make
+// much sense since it's the same in this context: all the action happens
+// at the TCP level, and RPC connection negotiation doesn't happen.
+INSTANTIATE_TEST_SUITE_P(Parameters, TestRpcSocketTxRxQueue,
+                         testing::Combine(testing::Values(false),
+                                          testing::Values(false)));
+
+// This test scenario verifies the reported socket's stats when it's more than
+// enough space in the listening socket's RX queue to accommodate all the
+// incoming requests.
+TEST_P(TestRpcSocketTxRxQueue, UnderCapacity) {
+  constexpr int kListenBacklog = 16;
+  constexpr size_t kClientsNum = 5;
+
+  DiagnosticSocket::TcpSocketInfo info;
+  ASSERT_OK(RunAndGetSocketInfo(kListenBacklog, kClientsNum, &info));
+
+  // Since the fake server isn't handling incoming requests at all and even not
+  // accepting the corresponding TCP connections, all the connetions request
+  // end up in the RX queue.
+  ASSERT_EQ(kClientsNum, info.rx_queue_size);
+
+  // The TX queue size for a listening socket set to the size of the backlog
+  // as specified by the second parameter of the listen() system call, capped
+  // by the system-wide limit in /proc/sys/net/core/somaxconn).
+  ASSERT_EQ(kListenBacklog, info.tx_queue_size);
+}
+
+// This scenario is similar to the TestRpcSocketTxRxQueue.UnderCapacity 
scenario
+// above, but in this case the listening socket's backlog length equals
+// to the number of pending client TCP connections.
+TEST_P(TestRpcSocketTxRxQueue, AtCapacity) {
+  constexpr int kListenBacklog = 8;
+  constexpr size_t kClientsNum = 8;
+
+  DiagnosticSocket::TcpSocketInfo info;
+  ASSERT_OK(RunAndGetSocketInfo(kListenBacklog, kClientsNum, &info));
+
+  ASSERT_EQ(kClientsNum, info.rx_queue_size);
+  ASSERT_EQ(kListenBacklog, info.tx_queue_size);
+}
+
+// This scenario is similar to the couple of scenarios above, but it's not
+// enough space in the socket's RX queue to accommodate all the pending TCP
+// connections.
+TEST_P(TestRpcSocketTxRxQueue, OverCapacity) {
+  constexpr int kListenBacklog = 5;
+  constexpr size_t kClientsNum = 16;
+
+  DiagnosticSocket::TcpSocketInfo info;
+  ASSERT_OK(RunAndGetSocketInfo(kListenBacklog, kClientsNum, &info));
+
+  // Even if there are many more connection requests than the backlog of the
+  // listening socket can accommodate, the size of the socket's RX queue
+  // reflects only the requests that are fit into the backlog plus one extra.
+  // The rest of the incoming TCP packets do not affect the size of the RX
+  // queue as seen via the sock_diag netlink facility. Same behavior can also 
be
+  // observed via /proc/self/net/tcp, 'netstat', and 'ss' system utilities.
+  //
+  // On Linux, with the default setting of the net.ipv4.tcp_abort_on_overflow
+  // sysctl variable (see [1] for more details), the server's TCP stack just
+  // drops overflow packets, so the client's TCP stack retries sending initial
+  // SYN packet to re-attempt the TCP connection. That's exactly what the
+  // following paragraph from [2] refers to:
+  //
+  //   The backlog argument defines the maximum length to which the
+  //   queue of pending connections for sockfd may grow.  If a
+  //   connection request arrives when the queue is full, the client may
+  //   receive an error with an indication of ECONNREFUSED or, if the
+  //   underlying protocol supports retransmission, the request may be
+  //   ignored so that a later reattempt at connection succeeds.
+  //
+  // [1] https://sysctl-explorer.net/net/ipv4/tcp_abort_on_overflow/
+  // [2] https://man7.org/linux/man-pages/man2/listen.2.html
+  //
+  ASSERT_EQ(kListenBacklog + 1, info.rx_queue_size);
+  ASSERT_EQ(kListenBacklog, info.tx_queue_size);
+}
+
+// Basic verification for the numbers reported by the
+// 'rpc_listen_socket_rx_queue_size' histogram metric.
+TEST_P(TestRpcSocketTxRxQueue, AcceptorRxQueueSizeMetric) {
+  // Capture listening socket's metrics upon every accepted connection.
+  FLAGS_rpc_listen_socket_stats_every_log2 = 0;
+
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  {
+    Socket socket;
+    ASSERT_OK(socket.Init(server_addr.family(), /*flags=*/0));
+    ASSERT_OK(socket.Connect(server_addr));
+  }
+
+  const auto& metric_entity = server_messenger_->metric_entity();
+  scoped_refptr<Histogram> rx_queue_size =
+      METRIC_rpc_listen_socket_rx_queue_size.Instantiate(metric_entity);
+
+  // Using ASSERT_EVENTUALLY below because of relaxed memory ordering when
+  // fetching metrics' values. Eventually, metrics reports readings that are
+  // consistent with the expected numbers.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(1, rx_queue_size->TotalCount());
+    // The metric had been sampled after the only pending connection was
+    // accepted, so the maximum metric's value should be 0.
+    ASSERT_GE(rx_queue_size->MaxValueForTests(), 0);
+  });
+}
+
+TEST_P(TestRpcSocketTxRxQueue, DisableAcceptorRxQueueSampling) {
+  n_acceptor_pool_threads_ = 1;
+
+  // Disable listening RPC socket's statistics sampling.
+  FLAGS_rpc_listen_socket_stats_every_log2 = -1;
+
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  for (auto i = 0; i < 100; ++i) {
+    Socket socket;
+    ASSERT_OK(socket.Init(server_addr.family(), /*flags=*/0));
+    ASSERT_OK(socket.Connect(server_addr));
+  }
+
+  const auto& metric_entity = server_messenger_->metric_entity();
+  scoped_refptr<Histogram> rx_queue_size =
+      METRIC_rpc_listen_socket_rx_queue_size.Instantiate(metric_entity);
+  ASSERT_EQ(0, rx_queue_size->TotalCount());
+}
+
+TEST_P(TestRpcSocketTxRxQueue, CustomAcceptorRxQueueSamplingFrequency) {
+  n_acceptor_pool_threads_ = 1;
+
+  // Sampling the listening socket's stats every 8th request.
+  FLAGS_rpc_listen_socket_stats_every_log2 = 3;
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  for (auto i = 0; i < 16; ++i) {
+    Socket socket;
+    ASSERT_OK(socket.Init(server_addr.family(), /*flags=*/0));
+    ASSERT_OK(socket.Connect(server_addr));
+  }
+
+  const auto& metric_entity = server_messenger_->metric_entity();
+  scoped_refptr<Histogram> rx_queue_size =
+      METRIC_rpc_listen_socket_rx_queue_size.Instantiate(metric_entity);
+  // Using ASSERT_EVENTUALLY below because of relaxed memory ordering when
+  // fetching metrics' values. Eventually, metrics reports readings that are
+  // consistent with the expected numbers.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(2, rx_queue_size->TotalCount());
+  });
+}
+#endif
+
 } // namespace rpc
 } // namespace kudu
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 5a4940131..a921d9a68 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -222,6 +222,7 @@ set(UTIL_SRCS
   minidump.cc
   monotime.cc
   mutex.cc
+  net/diagnostic_socket.cc
   net/dns_resolver.cc
   net/net_util.cc
   net/sockaddr.cc
@@ -267,10 +268,6 @@ set(UTIL_SRCS
   zlib.cc
 )
 
-if(NOT APPLE)
-  set(UTIL_SRCS ${UTIL_SRCS} net/diagnostic_socket.cc)
-endif()
-
 if(NOT NO_TESTS)
   set(UTIL_SRCS ${UTIL_SRCS} test_graph.cc)
 endif()
diff --git a/src/kudu/util/net/diagnostic_socket.cc 
b/src/kudu/util/net/diagnostic_socket.cc
index 6a2f6a7a6..17ac931e8 100644
--- a/src/kudu/util/net/diagnostic_socket.cc
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -17,10 +17,12 @@
 
 #include "kudu/util/net/diagnostic_socket.h"
 
+#if defined(__linux__)
 #include <linux/inet_diag.h>
 #include <linux/netlink.h>
 #include <linux/sock_diag.h>
 #include <linux/types.h>
+#endif
 
 #include <netinet/in.h>
 #include <sys/socket.h>
@@ -63,6 +65,13 @@ const vector<DiagnosticSocket::SocketState>& 
DiagnosticSocket::SocketStateWildca
   return kSocketStateWildcard;
 }
 
+#if !defined(__linux__)
+namespace {
+  constexpr const char* const kNotSupportedMsg =
+      "DiagnosticSocket functionality is currently supported on Linux only";
+} // anonymous namespace
+#endif // #if !defined(__linux__)
+
 DiagnosticSocket::DiagnosticSocket()
     : fd_(-1) {
 }
@@ -72,6 +81,9 @@ DiagnosticSocket::~DiagnosticSocket() {
 }
 
 Status DiagnosticSocket::Init() {
+#if !defined(__linux__)
+  return Status::NotSupported(kNotSupportedMsg);
+#else
   auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
   if (fd < 0) {
     int err = errno;
@@ -81,6 +93,7 @@ Status DiagnosticSocket::Init() {
   fd_ = fd;
 
   return Status::OK();
+#endif // #if !defined(__linux__) ... #else ...
 }
 
 Status DiagnosticSocket::Close() {
@@ -104,6 +117,9 @@ Status DiagnosticSocket::Query(const Sockaddr& 
socket_src_addr,
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
+#if !defined(__linux__)
+  return Status::NotSupported(kNotSupportedMsg);
+#else
   uint32_t socket_states_bitmask = 0;
   for (auto state : socket_states) {
     socket_states_bitmask |= (1U << state);
@@ -115,12 +131,16 @@ Status DiagnosticSocket::Query(const Sockaddr& 
socket_src_addr,
   RETURN_NOT_OK(ReceiveResponse(&result));
   *info = std::move(result);
   return Status::OK();
+#endif // #if !defined(__linux__) ... #else ...
 }
 
 Status DiagnosticSocket::Query(const Socket& socket, TcpSocketInfo* info) {
   DCHECK_GE(fd_, 0) << "requires calling Init() first";
   DCHECK(info);
 
+#if !defined(__linux__)
+  return Status::NotSupported(kNotSupportedMsg);
+#else
   RETURN_NOT_OK(SendRequest(socket));
   vector<TcpSocketInfo> result;
   RETURN_NOT_OK(ReceiveResponse(&result));
@@ -133,12 +153,16 @@ Status DiagnosticSocket::Query(const Socket& socket, 
TcpSocketInfo* info) {
 
   *info = result.front();
   return Status::OK();
+#endif // #if !defined(__linux__) ... #else ...
 }
 
 // Send query about the specified socket.
 Status DiagnosticSocket::SendRequest(const Socket& socket) const {
   DCHECK_GE(fd_, 0);
 
+#if !defined(__linux__)
+  return Status::NotSupported(kNotSupportedMsg);
+#else
   static constexpr const char* const kNonIpErrMsg =
       "netlink diagnostics is currently supported only on IPv4 TCP sockets";
 
@@ -166,6 +190,7 @@ Status DiagnosticSocket::SendRequest(const Socket& socket) 
const {
   const uint32_t socket_state_bitmask =
       dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED);
   return SendRequest(src_addr, dst_addr, socket_state_bitmask);
+#endif // #if !defined(__linux__) ... #else ...
 }
 
 Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr,
@@ -173,6 +198,9 @@ Status DiagnosticSocket::SendRequest(const Sockaddr& 
socket_src_addr,
                                      uint32_t socket_states_bitmask) const {
   DCHECK_GE(fd_, 0);
 
+#if !defined(__linux__)
+  return Status::NotSupported(kNotSupportedMsg);
+#else
   const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr;
   const auto src_port = socket_src_addr.port();
   const in_addr& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr;
@@ -226,14 +254,18 @@ Status DiagnosticSocket::SendRequest(const Sockaddr& 
socket_src_addr,
   RETRY_ON_EINTR(rc, ::sendmsg(fd_, &msg, 0));
   if (rc < 0) {
     int err = errno;
-    return Status::NetworkError("semdmsg() failed", ErrnoToString(err), err);
+    return Status::NetworkError("sendmsg() failed", ErrnoToString(err), err);
   }
   return Status::OK();
+#endif // #if !defined(__linux__) ... #else ...
 }
 
 Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
   DCHECK_GE(fd_, 0);
 
+#if !defined(__linux__)
+  return Status::NotSupported(kNotSupportedMsg);
+#else
   uint8_t buf[8192];
   struct iovec iov = {
     .iov_base = buf,
@@ -323,6 +355,7 @@ Status 
DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
     }
   }
   return Status::OK();
+#endif // #if !defined(__linux__) ... #else ...
 }
 
 } // namespace kudu
diff --git a/src/kudu/util/net/diagnostic_socket.h 
b/src/kudu/util/net/diagnostic_socket.h
index bee3ba1b1..20877284f 100644
--- a/src/kudu/util/net/diagnostic_socket.h
+++ b/src/kudu/util/net/diagnostic_socket.h
@@ -86,6 +86,10 @@ class DiagnosticSocket final {
   // from the kernel using the netlink facility via the API of this class.
   Status Init() WARN_UNUSED_RESULT;
 
+  // Whether this wrapper has been initialized: the underlying netlink socket
+  // successfully opened, etc.
+  bool IsInitialized() const { return fd_ >= 0; }
+
   // Close the Socket, checking for errors.
   Status Close();
 
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index f68b4e635..7245f9b2a 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -404,6 +404,28 @@ Status Socket::SetCloseOnExec() {
   return Status::OK();
 }
 
+Status Socket::SetLinger(bool enable, int linger_timeout_sec) {
+#if defined(__APPLE__)
+  #ifdef SO_LINGER_SEC
+    struct linger arg = { enable ? 1 : 0, linger_timeout_sec };
+    RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_LINGER_SEC, arg),
+                          "failed to set SO_LINGER_SEC");
+    return Status::OK();
+  #else
+    return Status::NotSupported("failed to set SO_LINGER_SEC: protocol not 
available");
+  #endif
+#else
+  #ifdef SO_LINGER
+    struct linger arg = { enable ? 1 : 0, linger_timeout_sec };
+    RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_LINGER, arg),
+                          "failed to set SO_LINGER");
+    return Status::OK();
+  #else
+    return Status::NotSupported("failed to set SO_LINGER: protocol not 
available");
+  #endif
+#endif // #if defined(__APPLE__) ... #else ...
+}
+
 Status Socket::SetSendTimeout(const MonoDelta& timeout) {
   return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout);
 }
diff --git a/src/kudu/util/net/socket.h b/src/kudu/util/net/socket.h
index 10125eb97..1ab3c74ba 100644
--- a/src/kudu/util/net/socket.h
+++ b/src/kudu/util/net/socket.h
@@ -22,10 +22,16 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/util/status.h"
 
+#include <gtest/gtest_prod.h>
+
 struct iovec;
 
 namespace kudu {
 
+namespace rpc {
+class RpcAcceptorBench_MeasureAcceptorDispatchTimes_Test;
+}
+
 class SocketStatsPB;
 class TransportDetailsPB;
 
@@ -166,12 +172,30 @@ class Socket {
   virtual Status GetTransportDetails(TransportDetailsPB* pb) const;
 
  private:
+  FRIEND_TEST(rpc::RpcAcceptorBench, MeasureAcceptorDispatchTimes);
+
   // Called internally from SetSend/RecvTimeout().
   Status SetTimeout(int opt, const char* optname, const MonoDelta& timeout);
 
   // Called internally during socket setup.
   Status SetCloseOnExec();
 
+  // Set SO_LINGER (SO_LINGER_SEC on macOS): turn on/off the "linger on close"
+  // behavior for this socket according to the 'enable' parameter, setting
+  // the linger timeout (in seconds) to 'linger_timeout_sec'.
+  //
+  // Enabling the "lingering on close" behavior with zero linger timeout allows
+  // for short-circuiting the standard way of closing TCP sockets. For such a
+  // socket, the TCP stack discards any data from the socket's buffer and sends
+  // an RST packet to the peer immediately upon calling Close(). With that, the
+  // socket skips being in the TIME_WAIT state for the necessary period of 
time.
+  //
+  // The "short-circuiting on close" might be useful in various performance
+  // tests involving a lot of socket churn. To avoid unexpected complications,
+  // don't use it in the code that's a part of a system running in production
+  // environment.
+  Status SetLinger(bool enable, int linger_timeout_sec = 0);
+
   // Bind the socket to a local address before making an outbound connection,
   // based on the value of FLAGS_local_ip_for_outbound_sockets.
   Status BindForOutgoingConnection();


Reply via email to