This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new fc40fcda3 KUDU-3633 shutdown DnsResolver in ServerBase::ShutdownImpl()
fc40fcda3 is described below
commit fc40fcda30a93baabf50299a68af6023a44b369d
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jan 31 12:31:53 2025 -0800
KUDU-3633 shutdown DnsResolver in ServerBase::ShutdownImpl()
The thread pool of the DNS resolver should be shut down along with the
messenger in ServerBase to prevent retrying of RPCs that failed as a
collateral of the shutdown process in progress. Those RPCs might be
retried by invoking rpc::Proxy::RefreshDnsAndEnqueueRequest(), etc.
On the related note, I also added a guard to protect ThreadPool::tokens_
in the destructor of the ThreadPool class, as elsewhere. I also snuck
in an update to call DCHECK() in a loop only when DCHECK_IS_ON()
macro evaluates to 'true'.
This addresses flakiness reported at least in one of the RemoteKsckTest
scenarios (e.g., TestFilterOnNotabletTable in [1]). One of the related
TSAN reports looked like below:
RemoteKsckTest.TestFilterOnNotabletTable: WARNING: ThreadSanitizer: data
race
Read of size 8 at 0x7b54001e5118 by main thread:
#0 std::__1::__hash_table<kudu::ThreadPoolToken*, ...>::size() const
#1 std::__1::unordered_set<kudu::ThreadPoolToken*, ...>::size() const
#2 kudu::ThreadPool::~ThreadPool()
...
#6 kudu::kserver::KuduServer::~KuduServer()
#7 kudu::tserver::TabletServer::~TabletServer()
...
Previous write of size 8 at 0x7b54001e5118 by thread T262 ...:
#0 std::__1::__hash_table<kudu::ThreadPoolToken*, ...>::remove(...)
...
#4 kudu::ThreadPool::ReleaseToken(...)
#5 kudu::ThreadPoolToken::~ThreadPoolToken()
...
#24 kudu::consensus::LeaderElection::~LeaderElection()
...
#35 kudu::rpc::Proxy::RefreshDnsAndEnqueueRequest(...)
...
#41 kudu::DnsResolver::RefreshAddressesAsync()
...
Thread T262 'dns-resolver [w' (tid=29102, running) created by thread T182
at:
#0 pthread_create
#1 kudu::Thread::StartThread(...)
#2 kudu::Thread::Create(...)
#3 kudu::ThreadPool::CreateThread()
#4 kudu::ThreadPool::DoSubmit(..., kudu::ThreadPoolToken*)
#5 kudu::ThreadPool::Submit(...)
#6 kudu::DnsResolver::RefreshAddressesAsync(..)
#7 kudu::rpc::Proxy::RefreshDnsAndEnqueueRequest(...)
#8 kudu::rpc::Proxy::AsyncRequest(...)
...
#15 kudu::rpc::OutboundCall::CallCallback()
#16 kudu::rpc::OutboundCall::SetFailed()
#17 kudu::rpc::Connection::Shutdown()
#18 kudu::rpc::ReactorThread::ShutdownInternal()
...
#25 kudu::rpc::ReactorThread::RunThread()
...
[1]
http://dist-test.cloudera.org:8080/test_drilldown?test_name=ksck_remote-test
Change-Id: I525f1078a349dbd2926938bb4fcc3e80888dfbb4
Reviewed-on: http://gerrit.cloudera.org:8080/22434
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/server/server_base.cc | 1 +
src/kudu/util/net/dns_resolver.cc | 8 ++++++--
src/kudu/util/net/dns_resolver.h | 8 ++++++++
src/kudu/util/threadpool.cc | 15 +++++++++++----
4 files changed, 26 insertions(+), 6 deletions(-)
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 1f770587b..6ba567a29 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -1222,6 +1222,7 @@ void ServerBase::ShutdownImpl() {
web_server_->Stop();
}
rpc_server_->Shutdown();
+ dns_resolver_->Shutdown();
if (messenger_) {
messenger_->Shutdown();
}
diff --git a/src/kudu/util/net/dns_resolver.cc
b/src/kudu/util/net/dns_resolver.cc
index d1fb9df30..3c57dec43 100644
--- a/src/kudu/util/net/dns_resolver.cc
+++ b/src/kudu/util/net/dns_resolver.cc
@@ -67,6 +67,10 @@ DnsResolver::DnsResolver(int max_threads_num,
}
DnsResolver::~DnsResolver() {
+ Shutdown();
+}
+
+void DnsResolver::Shutdown() {
pool_->Shutdown();
}
@@ -87,7 +91,7 @@ void DnsResolver::ResolveAddressesAsync(const HostPort&
hostport,
const auto s = pool_->Submit([=]() {
this->DoResolutionCb(hostport, addresses, cb);
});
- if (!s.ok()) {
+ if (PREDICT_FALSE(!s.ok())) {
cb(s);
}
}
@@ -107,7 +111,7 @@ void DnsResolver::RefreshAddressesAsync(const HostPort&
hostport,
}
this->DoResolutionCb(hostport, addresses, cb);
});
- if (!s.ok()) {
+ if (PREDICT_FALSE(!s.ok())) {
cb(s);
}
}
diff --git a/src/kudu/util/net/dns_resolver.h b/src/kudu/util/net/dns_resolver.h
index 3031a6be6..6cdeb1b6b 100644
--- a/src/kudu/util/net/dns_resolver.h
+++ b/src/kudu/util/net/dns_resolver.h
@@ -52,6 +52,14 @@ class DnsResolver {
MonoDelta cache_ttl = MonoDelta::FromSeconds(60));
~DnsResolver();
+ // Shutdown the resolver's thread pool. After shutting down the thread pool,
+ // RefreshAddressesAsync() returns Status::ServiceUnavailable(),
+ // while ResolveAddressesAsync() returns cached not-yet-expired entry
+ // from the cache (if present) or Status::ServiceUnavailable() otherwise.
+ // The behavior of ResolveAddresses() isn't affected by the status of the
+ // resolver's thread pool.
+ void Shutdown();
+
// Synchronously resolve addresses corresponding to the specified host:port
// pair in 'hostport'. Note that a host may resolve to more than one IP
// address.
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index d22cda2c8..6c0895f53 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -387,10 +387,15 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
}
ThreadPool::~ThreadPool() {
- // There should only be one live token: the one used in tokenless submission.
- CHECK_EQ(1, tokens_.size()) << Substitute(
- "Threadpool $0 destroyed with $1 allocated tokens",
- name_, tokens_.size());
+#if DCHECK_IS_ON()
+ {
+ // There should only be one live token: the one used in tokenless
submission.
+ std::lock_guard guard(lock_);
+ DCHECK_EQ(1, tokens_.size()) << Substitute(
+ "Threadpool $0 destroyed with $1 allocated tokens",
+ name_, tokens_.size());
+ }
+#endif
Shutdown();
}
@@ -472,11 +477,13 @@ void ThreadPool::Shutdown() {
no_threads_cond_.Wait();
}
+#if DCHECK_IS_ON()
// All the threads have exited. Check the state of each token.
for (auto* t : tokens_) {
DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
t->state() == ThreadPoolToken::State::QUIESCED);
}
+#endif
// Finally release the queued tasks, outside the lock.
lock.unlock();