This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d41d325b4154f9526991b6fb568b59fa1ffe5501
Author: Riza Suminto <riza.sumi...@cloudera.com>
AuthorDate: Mon Jul 14 08:56:02 2025 -0700

    IMPALA-14220: CatalogServer::IsActive must not hold catalog_lock_
    
    When catalogd HA is enabled, catalogd will check whether it is the
    active one, via CatalogServer::IsActive, before serving each request.
    Calling CatalogServer::IsActive require obtaining catalog_lock_, which
    can contend with long catalog operation such as
    GatherCatalogUpdatesThread.
    
    Checking current catalog active status does not need to obtain
    catalog_lock_. Instead, it is sufficient to change is_active_ field from
    boolean to AtomicBoolean. This patch applies that change.
    
    With this change, CatalogServiceThriftIf::AcceptRequest can return
    faster. Other CatalogServiceThriftIf methods that previously blocked on
    AcceptRequest method can proceeed faster, but might still contend over
    Catalog's versionLock_ in JVM later.
    
    Testing:
    Run and pass test_catalogd_ha.py.
    
    Change-Id: I15fb925f1eb4ea5d213075b66a676d2bc9b9e9f1
    Reviewed-on: http://gerrit.cloudera.org:8080/23168
    Reviewed-by: Abhishek Rawat <ara...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/catalog/catalog-server.cc           | 17 ++++++++---------
 be/src/catalog/catalog-server.h            |  4 ++--
 be/src/catalog/workload-management-init.cc |  2 +-
 3 files changed, 11 insertions(+), 12 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 44352901d..4b522b343 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -622,7 +622,7 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
     thrift_iface_(new CatalogServiceThriftIf(this)),
     thrift_serializer_(FLAGS_compact_catalog_topic),
     metrics_(metrics),
-    is_active_(!FLAGS_enable_catalogd_ha),
+    is_active_{!FLAGS_enable_catalogd_ha},
     is_ha_determined_(!FLAGS_enable_catalogd_ha),
     topic_updates_ready_(false),
     last_sent_catalog_version_(0L),
@@ -718,7 +718,7 @@ Status CatalogServer::Start() {
   // Notify the thread to start for the first time.
   {
     lock_guard<mutex> l(catalog_lock_);
-    if (is_active_) catalog_update_cv_.NotifyOne();
+    if (is_active_.Load()) catalog_update_cv_.NotifyOne();
   }
   return Status::OK();
 }
@@ -762,7 +762,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
   // Return if unable to acquire the catalog_lock_, or this instance is not 
active,
   // or if the topic update data is not yet ready for processing. This 
indicates the
   // catalog_update_gathering_thread_ is still building a topic update.
-  if (!l || !is_active_ || !topic_updates_ready_) return;
+  if (!l || !is_active_.Load() || !topic_updates_ready_) return;
 
   const TTopicDelta& delta = topic->second;
 
@@ -816,8 +816,8 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
   bool is_matching = (catalogd_registration.address.hostname == FLAGS_hostname
       && catalogd_registration.address.port == FLAGS_catalog_service_port);
   if (is_matching) {
-    if (!is_active_) {
-      is_active_ = true;
+    if (!is_active_.Load()) {
+      is_active_.Store(true);
       active_status_metric_->SetValue(true);
       num_ha_active_status_change_metric_->Increment(1);
       // Reset last_sent_catalog_version_ when the catalogd become active. 
This will
@@ -843,8 +843,8 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
       LOG(INFO) << "This catalogd instance is changed to active status";
     }
   } else {
-    if (is_active_) {
-      is_active_ = false;
+    if (is_active_.Load()) {
+      is_active_.Store(false);
       active_status_metric_->SetValue(false);
       num_ha_active_status_change_metric_->Increment(1);
       LOG(INFO) << "This catalogd instance is changed to inactive status. "
@@ -897,8 +897,7 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
 }
 
 bool CatalogServer::IsActive() {
-  lock_guard<mutex> l(catalog_lock_);
-  return is_active_;
+  return is_active_.Load();
 }
 
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 128531a9e..786fc00d2 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -225,7 +225,7 @@ class CatalogServer {
   std::mutex catalog_lock_;
 
   /// Set to true if this catalog instance is active.
-  bool is_active_;
+  AtomicBool is_active_;
 
   /// Set to true after active catalog has been determined. Will be true if 
catalog ha
   /// is not enabled.
@@ -289,7 +289,7 @@ class CatalogServer {
   void UpdateActiveCatalogd(bool is_registration_reply, int64_t 
active_catalogd_version,
       const TCatalogRegistration& catalogd_registration);
 
-  /// Returns the active status of the catalogd.
+  /// Returns the current active status of the catalogd.
   bool IsActive();
 
   /// Executed by the catalog_update_gathering_thread_. Calls into JniCatalog
diff --git a/be/src/catalog/workload-management-init.cc 
b/be/src/catalog/workload-management-init.cc
index a7eae391b..8810cff57 100644
--- a/be/src/catalog/workload-management-init.cc
+++ b/be/src/catalog/workload-management-init.cc
@@ -504,7 +504,7 @@ inline bool CatalogServer::IsCatalogInitialized() {
   // incremented on the active catalogd.
   // The second expression evaluates to true when the the standby catalogd 
determines that
   // it is the standby.
-  return last_sent_catalog_version_ > 0 || (is_ha_determined_ && !is_active_);
+  return last_sent_catalog_version_ > 0 || (is_ha_determined_ && 
!is_active_.Load());
 } // CatalogServer::IsCatalogInitialized
 
 bool CatalogServer::WaitForCatalogReady() {

Reply via email to