This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d41d325b4154f9526991b6fb568b59fa1ffe5501 Author: Riza Suminto <riza.sumi...@cloudera.com> AuthorDate: Mon Jul 14 08:56:02 2025 -0700 IMPALA-14220: CatalogServer::IsActive must not hold catalog_lock_ When catalogd HA is enabled, catalogd will check whether it is the active one, via CatalogServer::IsActive, before serving each request. Calling CatalogServer::IsActive require obtaining catalog_lock_, which can contend with long catalog operation such as GatherCatalogUpdatesThread. Checking current catalog active status does not need to obtain catalog_lock_. Instead, it is sufficient to change is_active_ field from boolean to AtomicBoolean. This patch applies that change. With this change, CatalogServiceThriftIf::AcceptRequest can return faster. Other CatalogServiceThriftIf methods that previously blocked on AcceptRequest method can proceeed faster, but might still contend over Catalog's versionLock_ in JVM later. Testing: Run and pass test_catalogd_ha.py. Change-Id: I15fb925f1eb4ea5d213075b66a676d2bc9b9e9f1 Reviewed-on: http://gerrit.cloudera.org:8080/23168 Reviewed-by: Abhishek Rawat <ara...@cloudera.com> Reviewed-by: Wenzhe Zhou <wz...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/catalog/catalog-server.cc | 17 ++++++++--------- be/src/catalog/catalog-server.h | 4 ++-- be/src/catalog/workload-management-init.cc | 2 +- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 44352901d..4b522b343 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -622,7 +622,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics) thrift_iface_(new CatalogServiceThriftIf(this)), thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics), - is_active_(!FLAGS_enable_catalogd_ha), + is_active_{!FLAGS_enable_catalogd_ha}, is_ha_determined_(!FLAGS_enable_catalogd_ha), topic_updates_ready_(false), last_sent_catalog_version_(0L), @@ -718,7 +718,7 @@ Status CatalogServer::Start() { // Notify the thread to start for the first time. { lock_guard<mutex> l(catalog_lock_); - if (is_active_) catalog_update_cv_.NotifyOne(); + if (is_active_.Load()) catalog_update_cv_.NotifyOne(); } return Status::OK(); } @@ -762,7 +762,7 @@ void CatalogServer::UpdateCatalogTopicCallback( // Return if unable to acquire the catalog_lock_, or this instance is not active, // or if the topic update data is not yet ready for processing. This indicates the // catalog_update_gathering_thread_ is still building a topic update. - if (!l || !is_active_ || !topic_updates_ready_) return; + if (!l || !is_active_.Load() || !topic_updates_ready_) return; const TTopicDelta& delta = topic->second; @@ -816,8 +816,8 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply, bool is_matching = (catalogd_registration.address.hostname == FLAGS_hostname && catalogd_registration.address.port == FLAGS_catalog_service_port); if (is_matching) { - if (!is_active_) { - is_active_ = true; + if (!is_active_.Load()) { + is_active_.Store(true); active_status_metric_->SetValue(true); num_ha_active_status_change_metric_->Increment(1); // Reset last_sent_catalog_version_ when the catalogd become active. This will @@ -843,8 +843,8 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply, LOG(INFO) << "This catalogd instance is changed to active status"; } } else { - if (is_active_) { - is_active_ = false; + if (is_active_.Load()) { + is_active_.Store(false); active_status_metric_->SetValue(false); num_ha_active_status_change_metric_->Increment(1); LOG(INFO) << "This catalogd instance is changed to inactive status. " @@ -897,8 +897,7 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply, } bool CatalogServer::IsActive() { - lock_guard<mutex> l(catalog_lock_); - return is_active_; + return is_active_.Load(); } [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() { diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h index 128531a9e..786fc00d2 100644 --- a/be/src/catalog/catalog-server.h +++ b/be/src/catalog/catalog-server.h @@ -225,7 +225,7 @@ class CatalogServer { std::mutex catalog_lock_; /// Set to true if this catalog instance is active. - bool is_active_; + AtomicBool is_active_; /// Set to true after active catalog has been determined. Will be true if catalog ha /// is not enabled. @@ -289,7 +289,7 @@ class CatalogServer { void UpdateActiveCatalogd(bool is_registration_reply, int64_t active_catalogd_version, const TCatalogRegistration& catalogd_registration); - /// Returns the active status of the catalogd. + /// Returns the current active status of the catalogd. bool IsActive(); /// Executed by the catalog_update_gathering_thread_. Calls into JniCatalog diff --git a/be/src/catalog/workload-management-init.cc b/be/src/catalog/workload-management-init.cc index a7eae391b..8810cff57 100644 --- a/be/src/catalog/workload-management-init.cc +++ b/be/src/catalog/workload-management-init.cc @@ -504,7 +504,7 @@ inline bool CatalogServer::IsCatalogInitialized() { // incremented on the active catalogd. // The second expression evaluates to true when the the standby catalogd determines that // it is the standby. - return last_sent_catalog_version_ > 0 || (is_ha_determined_ && !is_active_); + return last_sent_catalog_version_ > 0 || (is_ha_determined_ && !is_active_.Load()); } // CatalogServer::IsCatalogInitialized bool CatalogServer::WaitForCatalogReady() {