This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d2abbbbe6594fbf313cec01217f620a5f3b20945
Author: ttttttz <[email protected]>
AuthorDate: Wed Nov 8 09:57:04 2023 +0800

    IMPALA-12513: Allow to reset metadata when the CatalogD becomes active
    
    When switching active catalogd, the loaded metadata in the standby
    catalogd may not be the latest. A backend flag should be provided
    to control whether to reset metadata when the catalogd become active.
    Adds the following startup flags for catalogd:
    'catalogd_ha_reset_metadata_on_failover'. Default is false. If true,
    reset all metadata when the catalogd becomes active.
    
    Testing:
    - Added a test case to start both catalogds with flag
       'catalogd_ha_reset_metadata_on_failover' as true.
    - Passed core tests
    
    Change-Id: I2b54f36f96e7901499105e51790d8e2300d7fea9
    Reviewed-on: http://gerrit.cloudera.org:8080/20614
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc         | 16 ++++++++++++++
 tests/custom_cluster/test_catalogd_ha.py | 36 ++++++++++++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 9b907b69f..1d2d678f1 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -152,6 +152,9 @@ DEFINE_bool(enable_skipping_older_events, false, "This 
configuration is used to
 DEFINE_int32(catalog_operation_log_size, 100, "Number of catalog operation log 
records "
     "to retain in catalogd. If -1, the operation log has unbounded size.");
 
+DEFINE_bool(catalogd_ha_reset_metadata_on_failover, false, "If true, reset all 
metadata "
+    "when the catalogd becomes active.");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
@@ -542,6 +545,19 @@ void CatalogServer::UpdateActiveCatalogd(bool 
is_registration_reply,
       catalog_->RegenerateServiceId();
       // Clear pending topic updates.
       pending_topic_updates_.clear();
+      if (FLAGS_catalogd_ha_reset_metadata_on_failover) {
+        // Reset all metadata when the catalogd becomes active.
+        TResetMetadataRequest req;
+        TResetMetadataResponse resp;
+        req.__set_header(TCatalogServiceRequestHeader());
+        req.header.__set_want_minimal_response(false);
+        req.__set_is_refresh(false);
+        req.__set_sync_ddl(false);
+        Status status = catalog_->ResetMetadata(req, &resp);
+        if (!status.ok()) {
+          LOG(ERROR) << "Failed to reset metadata triggered by catalogd 
failover.";
+        }
+      }
       // Signal the catalog update gathering thread to start.
       topic_updates_ready_ = false;
       catalog_update_cv_.NotifyOne();
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 6cc966e61..62575ef66 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -21,6 +21,7 @@ import re
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout
+from tests.util.filesystem_utils import get_fs_path
 from time import sleep
 
 LOG = logging.getLogger('catalogd_ha_test')
@@ -389,3 +390,38 @@ class TestCatalogdHA(CustomClusterTestSuite):
 
     # Verify simple queries are ran successfully.
     self.__run_simple_queries()
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+    catalogd_args="--catalogd_ha_reset_metadata_on_failover=true",
+    start_args="--enable_catalogd_ha")
+  def test_metadata_after_failover(self, unique_database):
+    """Verify that the metadata is correct after failover."""
+    catalogds = self.cluster.catalogds()
+    assert(len(catalogds) == 2)
+    catalogd_service_1 = catalogds[0].service
+    catalogd_service_2 = catalogds[1].service
+    assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+    assert(not 
catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    create_func_impala = ("create function {database}.identity_tmp(bigint) "
+                          "returns bigint location '{location}' 
symbol='Identity'")
+    self.client.execute(create_func_impala.format(
+        database=unique_database,
+        location=get_fs_path('/test-warehouse/libTestUdfs.so')))
+    self.execute_query_expect_success(
+        self.client, "select %s.identity_tmp(10)" % unique_database)
+
+    # Kill active catalogd
+    catalogds[0].kill()
+
+    # Wait for long enough for the statestore to detect the failure of active 
catalogd
+    # and assign active role to standby catalogd.
+    catalogd_service_2.wait_for_metric_value(
+        "catalog-server.active-status", expected_value=True, timeout=30)
+    assert(catalogd_service_2.get_metric_value(
+        "catalog-server.ha-number-active-status-change") > 0)
+    assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+    self.execute_query_expect_success(
+        self.client, "select %s.identity_tmp(10)" % unique_database)

Reply via email to