This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 295f74ef12df0b851058efe2c43c9826b617d21f Author: Michael Smith <[email protected]> AuthorDate: Wed Apr 23 15:50:30 2025 -0700 IMPALA-13989: Invalidate table on rename failure Handles the error "Table/view rename succeeded in the Hive Metastore, but failed in Impala's Catalog Server" rather than failing the table rename. This error happens when catalog state catches up to the alter event from our alter_table RPC to HMS before we renameTable explicitly in the catalog. The catalog can update independently due to a concurrent 'invalidate metadata' call. In that case we use the oldTbl definition we already have - updated from the delete log if possible - and fetch the new table definition with invalidateTable to continue, automating most of the work that the error message suggested users do via 'invalidate metadata <tbl>'. Updated the test_concurrent_ddls test to remove handle_rename_failure and ran the tests a dozen times. Adds concurrency tests with simultaneous rename and invalidate metadata that previously would fail. Change-Id: Ic2a276b6e5ceb35b7f3ce788cc47052387ae8980 Reviewed-on: http://gerrit.cloudera.org:8080/22807 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../apache/impala/service/CatalogOpExecutor.java | 60 +++++++++++------ tests/custom_cluster/test_concurrent_ddls.py | 16 +---- tests/custom_cluster/test_concurrent_rename.py | 77 ++++++++++++++++++++++ 3 files changed, 117 insertions(+), 36 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index f1ada478a..195605e6e 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -5742,28 +5742,46 @@ public class CatalogOpExecutor { result.second.setCreateEventId(renamedTable.first); } } - if (result.first == null || result.second == null) { - // The rename succeeded in the HMS but failed in the catalog cache. The cache is in - // an inconsistent state, but can likely be fixed by running "invalidate metadata". - throw new ImpalaRuntimeException(String.format( - "Table/view rename succeeded in the Hive Metastore, but failed in Impala's " + - "Catalog Server. Running 'invalidate metadata <tbl>' on the old table name " + - "'%s' and the new table name '%s' may fix the problem." , tableName, - newTableName)); - } - // TODO: call addVersionsForInflightEvents using InProgressTableModification object - // that is passed into catalog_.renameTable() - catalog_.addVersionsForInflightEvents( - false, result.second, modification.newVersionNumber()); - if (wantMinimalResult) { - response.result.addToRemoved_catalog_objects(result.first.toInvalidationObject()); - response.result.addToUpdated_catalog_objects(result.second.toInvalidationObject()); + TCatalogObject oldTblDesc = null, newTblDesc = null; + if (result.first == null) { + // The old table object has been removed by a concurrent operation, e.g. INVALIDATE + // METADATA <table>. Fetch the latest delete from deleteLog. + oldTblDesc = wantMinimalResult ? + oldTbl.toInvalidationObject() : oldTbl.toMinimalTCatalogObject(); + long version = catalog_.getDeleteLog().getLatestRemovedVersion(oldTblDesc); + if (version > 0) { + oldTblDesc.setCatalog_version(version); + } else { + LOG.warn("Deletion update on the old table {} not found. Impalad might still " + + "have its metadata until the deletion update arrives from statestore.", + tableName); + } } else { - response.result.addToRemoved_catalog_objects( - result.first.toMinimalTCatalogObject()); - response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject()); - } - response.result.setVersion(result.second.getCatalogVersion()); + oldTblDesc = wantMinimalResult ? + result.first.toInvalidationObject() : result.first.toMinimalTCatalogObject(); + } + if (result.second == null) { + // The rename succeeded in HMS but failed in the catalog cache. The cache is in an + // inconsistent state, so invalidate the new table to reload it. + newTblDesc = catalog_.invalidateTable(newTableName.toThrift(), + new Reference<>(), new Reference<>(), catalogTimeline); + if (newTblDesc == null) { + throw new ImpalaRuntimeException(String.format( + "The new table/view %s was concurrently removed during rename.", + newTableName)); + } + } else { + Preconditions.checkNotNull(result.first); + // TODO: call addVersionsForInflightEvents using InProgressTableModification object + // that is passed into catalog_.renameTable() + catalog_.addVersionsForInflightEvents( + false, result.second, modification.newVersionNumber()); + newTblDesc = wantMinimalResult ? + result.second.toInvalidationObject() : result.second.toTCatalogObject(); + } + response.result.addToRemoved_catalog_objects(oldTblDesc); + response.result.addToUpdated_catalog_objects(newTblDesc); + response.result.setVersion(newTblDesc.getCatalog_version()); addSummary(response, "Renaming was successful."); } diff --git a/tests/custom_cluster/test_concurrent_ddls.py b/tests/custom_cluster/test_concurrent_ddls.py index aa19d2018..c94be7977 100644 --- a/tests/custom_cluster/test_concurrent_ddls.py +++ b/tests/custom_cluster/test_concurrent_ddls.py @@ -143,10 +143,7 @@ class TestConcurrentDdls(CustomClusterTestSuite): break except IMPALA_CONNECTION_EXCEPTION as e: err = str(e) - if self.handle_rename_failure(tls.client, tbl_name, err): - # Table was successfully renamed, next case. - break - elif self.is_transient_error(err): + if self.is_transient_error(err): # Retry the query. continue assert self.is_acceptable_error(err, sync_ddl), err @@ -194,17 +191,6 @@ class TestConcurrentDdls(CustomClusterTestSuite): return True return False - def handle_rename_failure(self, client, tbl_name, err): - if "Table/view rename succeeded in the Hive Metastore, " \ - "but failed in Impala's Catalog Server." in err: - # Invalidate the target table so we reload it from HMS. - tbl_names = re.findall(r"{}[^']*".format(tbl_name), err) - assert len(tbl_names) == 2 - self.execute_query_expect_success( - client, "invalidate metadata {0}".format(tbl_names[1])) - return True - return False - @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", diff --git a/tests/custom_cluster/test_concurrent_rename.py b/tests/custom_cluster/test_concurrent_rename.py new file mode 100644 index 000000000..e2dfb58ea --- /dev/null +++ b/tests/custom_cluster/test_concurrent_rename.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function +import time + +from copy import deepcopy +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + + [email protected]_args( + catalogd_args="--hms_event_polling_interval_s=0", + # Requires longer update frequency so invalidate->drop can happen within an update. + # Still shorter than default so tests are quicker. + statestored_args="--statestore_update_frequency_ms=1000", + cluster_size=1) +class TestConcurrentRename(CustomClusterTestSuite): + """Test concurrent rename with invalidate and other DDLs.""" + + def test_rename_drop(self, vector, unique_database): + name = "{}.tbl".format(unique_database) + self.execute_query("create table {} (s string)".format(name)) + self.execute_query("describe {}".format(name)) + + new_vector = deepcopy(vector) + new_vector.get_value('exec_option')['debug_action'] = \ + "catalogd_table_rename_delay:SLEEP@5000" + with self.create_impala_client_from_vector(new_vector) as alter_client, \ + self.create_impala_client() as reset_client: + alter_handle = alter_client.execute_async( + "alter table {0} rename to {0}2".format(name)) + time.sleep(0.1) + reset_handle = reset_client.execute_async("invalidate metadata {}".format(name)) + self.execute_query("invalidate metadata {}2".format(name)) + self.execute_query("drop table {}2".format(name)) + + try: + alter_client.wait_for_finished_timeout(alter_handle, timeout=10) + alter_client.close_query(alter_handle) + assert False, "Expected alter to fail" + except Exception as e: + assert "The new table/view {}2 was concurrently removed during rename."\ + .format(name) in str(e) + finally: + reset_client.wait_for_finished_timeout(reset_handle, timeout=10) + reset_client.close_query(reset_handle) + + def test_rename_invalidate(self, vector, unique_database): + name = "{}.tbl".format(unique_database) + self.execute_query("create table {} (s string)".format(name)) + self.execute_query("describe {}".format(name)) + + new_vector = deepcopy(vector) + new_vector.get_value('exec_option')['debug_action'] = \ + "catalogd_table_rename_delay:SLEEP@5000" + with self.create_impala_client_from_vector(new_vector) as alter_client: + alter_handle = alter_client.execute_async( + "alter table {0} rename to {0}2".format(name)) + time.sleep(0.1) + self.execute_query("invalidate metadata {}".format(name)) + + alter_client.wait_for_finished_timeout(alter_handle, timeout=10) + alter_client.close_query(alter_handle)
