This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6ecb8bfcf4d9e68e0091aea64540e0fb64aeb3e0
Author: wzhou-code <[email protected]>
AuthorDate: Fri Jul 28 09:33:17 2023 -0700

    IMPALA-12323: DDL hang with SYNC_DDL=1 when CatalogD HA enabled
    
    When CatalogD HA is enabled, standby catalogd does not receive catalog
    topic updates from statestore and does not apply catalog updates from
    the active catalogd. Its min topic version is not changed.
    Function Statestore::GetMinSubscriberTopicVersion() loops through all
    subscribers to find min topic version. Standby catalogd causes
    min topic version not increased, and hence Impala server waits
    indefinitely in ImpalaServer::WaitForCatalogUpdateTopicPropagation().
    
    This patch fixed the issue by skipping standby catalogd when finding min
    topic version in Statestore::GetMinSubscriberTopicVersion().
    
    Testing:
     - Added unit-test code for CatalogD HA to run DDL with SYNC_DDL as 1.
       Verified that test cases hang without fix, and test cases were passed
       after fix.
     - Passed test_catalogd_ha.py.
    
    Change-Id: Ie559c711078f32171dfb2d2e2fda54773c0927c3
    Reviewed-on: http://gerrit.cloudera.org:8080/20280
    Reviewed-by: Andrew Sherman <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/statestore/statestore.cc          | 6 ++++++
 tests/custom_cluster/test_catalogd_ha.py | 8 +++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 945d3a088..0ec515f73 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -986,6 +986,12 @@ Statestore::TopicEntry::Version 
Statestore::GetMinSubscriberTopicVersion(
   bool found = false;
   // Find the minimum version processed for this topic across all topic 
subscribers.
   for (const SubscriberMap::value_type& subscriber: subscribers_) {
+    if (FLAGS_enable_catalogd_ha && subscriber.second->IsCatalogd()
+        && !catalog_manager_.IsActiveCatalogd(subscriber.second->id())) {
+      // Skip inactive catalogd since it does not apply catalog updates from 
the active
+      // catalogd.
+      continue;
+    }
     auto subscribed_topics = subscriber.second->GetTopicsMapForId(topic_id);
     if (subscribed_topics->find(topic_id) != subscribed_topics->end()) {
       found = true;
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 3803883f7..97b666f14 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -55,8 +55,10 @@ class TestCatalogdHA(CustomClusterTestSuite):
     _, catalog_service_port = active_catalogd_address.split(":")
     assert(int(catalog_service_port) == 
catalogd_service.get_catalog_service_port())
 
-  def __run_simple_queries(self):
+  def __run_simple_queries(self, sync_ddl=False):
     try:
+      if sync_ddl:
+        self.execute_query_expect_success(self.client, "set SYNC_DDL=1")
       self.execute_query_expect_success(
           self.client, "drop table if exists test_catalogd_ha")
       self.execute_query_expect_success(
@@ -90,6 +92,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_1)
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
+    # Verify simple queries with sync_ddl as 1.
+    self.__run_simple_queries(sync_ddl=True)
 
     # Restart one coordinator. Verify it get active catalogd address from 
statestore.
     self.cluster.impalads[0].restart()
@@ -154,6 +158,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
     self.__verify_impalad_active_catalogd_port(2, catalogd_service_2)
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
+    # Verify simple queries with sync_ddl as 1.
+    self.__run_simple_queries(sync_ddl=True)
 
     end_count_clear_topic_entries = statestore_service.get_metric_value(
         "statestore.num-clear-topic-entries-requests")

Reply via email to