This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new fc40fcda3 KUDU-3633 shutdown DnsResolver in ServerBase::ShutdownImpl()
fc40fcda3 is described below

commit fc40fcda30a93baabf50299a68af6023a44b369d
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jan 31 12:31:53 2025 -0800

    KUDU-3633 shutdown DnsResolver in ServerBase::ShutdownImpl()
    
    The thread pool of the DNS resolver should be shut down along with the
    messenger in ServerBase to prevent retrying of RPCs that failed as a
    collateral of the shutdown process in progress.  Those RPCs might be
    retried by invoking rpc::Proxy::RefreshDnsAndEnqueueRequest(), etc.
    
    On the related note, I also added a guard to protect ThreadPool::tokens_
    in the destructor of the ThreadPool class, as elsewhere.  I also snuck
    in an update to call DCHECK() in a loop only when DCHECK_IS_ON()
    macro evaluates to 'true'.
    
    This addresses flakiness reported at least in one of the RemoteKsckTest
    scenarios (e.g., TestFilterOnNotabletTable in [1]).  One of the related
    TSAN reports looked like below:
    
    RemoteKsckTest.TestFilterOnNotabletTable: WARNING: ThreadSanitizer: data 
race
      Read of size 8 at 0x7b54001e5118 by main thread:
        #0 std::__1::__hash_table<kudu::ThreadPoolToken*, ...>::size() const
        #1 std::__1::unordered_set<kudu::ThreadPoolToken*, ...>::size() const
        #2 kudu::ThreadPool::~ThreadPool()
        ...
        #6 kudu::kserver::KuduServer::~KuduServer()
        #7 kudu::tserver::TabletServer::~TabletServer()
        ...
    
      Previous write of size 8 at 0x7b54001e5118 by thread T262 ...:
        #0 std::__1::__hash_table<kudu::ThreadPoolToken*, ...>::remove(...)
        ...
        #4 kudu::ThreadPool::ReleaseToken(...)
        #5 kudu::ThreadPoolToken::~ThreadPoolToken()
        ...
        #24 kudu::consensus::LeaderElection::~LeaderElection()
        ...
        #35 kudu::rpc::Proxy::RefreshDnsAndEnqueueRequest(...)
        ...
        #41 kudu::DnsResolver::RefreshAddressesAsync()
        ...
    
      Thread T262 'dns-resolver [w' (tid=29102, running) created by thread T182 
at:
        #0 pthread_create
        #1 kudu::Thread::StartThread(...)
        #2 kudu::Thread::Create(...)
        #3 kudu::ThreadPool::CreateThread()
        #4 kudu::ThreadPool::DoSubmit(..., kudu::ThreadPoolToken*)
        #5 kudu::ThreadPool::Submit(...)
        #6 kudu::DnsResolver::RefreshAddressesAsync(..)
        #7 kudu::rpc::Proxy::RefreshDnsAndEnqueueRequest(...)
        #8 kudu::rpc::Proxy::AsyncRequest(...)
        ...
        #15 kudu::rpc::OutboundCall::CallCallback()
        #16 kudu::rpc::OutboundCall::SetFailed()
        #17 kudu::rpc::Connection::Shutdown()
        #18 kudu::rpc::ReactorThread::ShutdownInternal()
        ...
        #25 kudu::rpc::ReactorThread::RunThread()
        ...
    
    [1] 
http://dist-test.cloudera.org:8080/test_drilldown?test_name=ksck_remote-test
    
    Change-Id: I525f1078a349dbd2926938bb4fcc3e80888dfbb4
    Reviewed-on: http://gerrit.cloudera.org:8080/22434
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/server/server_base.cc    |  1 +
 src/kudu/util/net/dns_resolver.cc |  8 ++++++--
 src/kudu/util/net/dns_resolver.h  |  8 ++++++++
 src/kudu/util/threadpool.cc       | 15 +++++++++++----
 4 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 1f770587b..6ba567a29 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -1222,6 +1222,7 @@ void ServerBase::ShutdownImpl() {
     web_server_->Stop();
   }
   rpc_server_->Shutdown();
+  dns_resolver_->Shutdown();
   if (messenger_) {
     messenger_->Shutdown();
   }
diff --git a/src/kudu/util/net/dns_resolver.cc 
b/src/kudu/util/net/dns_resolver.cc
index d1fb9df30..3c57dec43 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -67,6 +67,10 @@ DnsResolver::DnsResolver(int max_threads_num,
 }
 
 DnsResolver::~DnsResolver() {
+  Shutdown();
+}
+
+void DnsResolver::Shutdown() {
   pool_->Shutdown();
 }
 
@@ -87,7 +91,7 @@ void DnsResolver::ResolveAddressesAsync(const HostPort& 
hostport,
   const auto s = pool_->Submit([=]() {
     this->DoResolutionCb(hostport, addresses, cb);
   });
-  if (!s.ok()) {
+  if (PREDICT_FALSE(!s.ok())) {
     cb(s);
   }
 }
@@ -107,7 +111,7 @@ void DnsResolver::RefreshAddressesAsync(const HostPort& 
hostport,
     }
     this->DoResolutionCb(hostport, addresses, cb);
   });
-  if (!s.ok()) {
+  if (PREDICT_FALSE(!s.ok())) {
     cb(s);
   }
 }
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index 3031a6be6..6cdeb1b6b 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -52,6 +52,14 @@ class DnsResolver {
                        MonoDelta cache_ttl = MonoDelta::FromSeconds(60));
   ~DnsResolver();
 
+  // Shutdown the resolver's thread pool. After shutting down the thread pool,
+  // RefreshAddressesAsync() returns Status::ServiceUnavailable(),
+  // while ResolveAddressesAsync() returns cached not-yet-expired entry
+  // from the cache (if present) or Status::ServiceUnavailable() otherwise.
+  // The behavior of ResolveAddresses() isn't affected by the status of the
+  // resolver's thread pool.
+  void Shutdown();
+
   // Synchronously resolve addresses corresponding to the specified host:port
   // pair in 'hostport'. Note that a host may resolve to more than one IP
   // address.
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index d22cda2c8..6c0895f53 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -387,10 +387,15 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
 }
 
 ThreadPool::~ThreadPool() {
-  // There should only be one live token: the one used in tokenless submission.
-  CHECK_EQ(1, tokens_.size()) << Substitute(
-      "Threadpool $0 destroyed with $1 allocated tokens",
-      name_, tokens_.size());
+#if DCHECK_IS_ON()
+  {
+    // There should only be one live token: the one used in tokenless 
submission.
+    std::lock_guard guard(lock_);
+    DCHECK_EQ(1, tokens_.size()) << Substitute(
+        "Threadpool $0 destroyed with $1 allocated tokens",
+        name_, tokens_.size());
+  }
+#endif
   Shutdown();
 }
 
@@ -472,11 +477,13 @@ void ThreadPool::Shutdown() {
     no_threads_cond_.Wait();
   }
 
+#if DCHECK_IS_ON()
   // All the threads have exited. Check the state of each token.
   for (auto* t : tokens_) {
     DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
            t->state() == ThreadPoolToken::State::QUIESCED);
   }
+#endif
 
   // Finally release the queued tasks, outside the lock.
   lock.unlock();

Reply via email to