This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d2abbbbe6594fbf313cec01217f620a5f3b20945 Author: ttttttz <[email protected]> AuthorDate: Wed Nov 8 09:57:04 2023 +0800 IMPALA-12513: Allow to reset metadata when the CatalogD becomes active When switching active catalogd, the loaded metadata in the standby catalogd may not be the latest. A backend flag should be provided to control whether to reset metadata when the catalogd become active. Adds the following startup flags for catalogd: 'catalogd_ha_reset_metadata_on_failover'. Default is false. If true, reset all metadata when the catalogd becomes active. Testing: - Added a test case to start both catalogds with flag 'catalogd_ha_reset_metadata_on_failover' as true. - Passed core tests Change-Id: I2b54f36f96e7901499105e51790d8e2300d7fea9 Reviewed-on: http://gerrit.cloudera.org:8080/20614 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 16 ++++++++++++++ tests/custom_cluster/test_catalogd_ha.py | 36 ++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 9b907b69f..1d2d678f1 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -152,6 +152,9 @@ DEFINE_bool(enable_skipping_older_events, false, "This configuration is used to DEFINE_int32(catalog_operation_log_size, 100, "Number of catalog operation log records " "to retain in catalogd. If -1, the operation log has unbounded size."); +DEFINE_bool(catalogd_ha_reset_metadata_on_failover, false, "If true, reset all metadata " + "when the catalogd becomes active."); + DECLARE_string(state_store_host); DECLARE_int32(state_store_port); DECLARE_string(state_store_2_host); @@ -542,6 +545,19 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply, catalog_->RegenerateServiceId(); // Clear pending topic updates. pending_topic_updates_.clear(); + if (FLAGS_catalogd_ha_reset_metadata_on_failover) { + // Reset all metadata when the catalogd becomes active. + TResetMetadataRequest req; + TResetMetadataResponse resp; + req.__set_header(TCatalogServiceRequestHeader()); + req.header.__set_want_minimal_response(false); + req.__set_is_refresh(false); + req.__set_sync_ddl(false); + Status status = catalog_->ResetMetadata(req, &resp); + if (!status.ok()) { + LOG(ERROR) << "Failed to reset metadata triggered by catalogd failover."; + } + } // Signal the catalog update gathering thread to start. topic_updates_ready_ = false; catalog_update_cv_.NotifyOne(); diff --git a/tests/custom_cluster/test_catalogd_ha.py b/tests/custom_cluster/test_catalogd_ha.py index 6cc966e61..62575ef66 100644 --- a/tests/custom_cluster/test_catalogd_ha.py +++ b/tests/custom_cluster/test_catalogd_ha.py @@ -21,6 +21,7 @@ import re from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout +from tests.util.filesystem_utils import get_fs_path from time import sleep LOG = logging.getLogger('catalogd_ha_test') @@ -389,3 +390,38 @@ class TestCatalogdHA(CustomClusterTestSuite): # Verify simple queries are ran successfully. self.__run_simple_queries() + + @CustomClusterTestSuite.with_args( + statestored_args="--use_subscriber_id_as_catalogd_priority=true", + catalogd_args="--catalogd_ha_reset_metadata_on_failover=true", + start_args="--enable_catalogd_ha") + def test_metadata_after_failover(self, unique_database): + """Verify that the metadata is correct after failover.""" + catalogds = self.cluster.catalogds() + assert(len(catalogds) == 2) + catalogd_service_1 = catalogds[0].service + catalogd_service_2 = catalogds[1].service + assert(catalogd_service_1.get_metric_value("catalog-server.active-status")) + assert(not catalogd_service_2.get_metric_value("catalog-server.active-status")) + + create_func_impala = ("create function {database}.identity_tmp(bigint) " + "returns bigint location '{location}' symbol='Identity'") + self.client.execute(create_func_impala.format( + database=unique_database, + location=get_fs_path('/test-warehouse/libTestUdfs.so'))) + self.execute_query_expect_success( + self.client, "select %s.identity_tmp(10)" % unique_database) + + # Kill active catalogd + catalogds[0].kill() + + # Wait for long enough for the statestore to detect the failure of active catalogd + # and assign active role to standby catalogd. + catalogd_service_2.wait_for_metric_value( + "catalog-server.active-status", expected_value=True, timeout=30) + assert(catalogd_service_2.get_metric_value( + "catalog-server.ha-number-active-status-change") > 0) + assert(catalogd_service_2.get_metric_value("catalog-server.active-status")) + + self.execute_query_expect_success( + self.client, "select %s.identity_tmp(10)" % unique_database)
