This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6831076983bedc0a8ba9ff199709ff93e6af2702 Author: Riza Suminto <[email protected]> AuthorDate: Wed May 14 14:25:41 2025 -0700 IMPALA-14072: Fix NPE in Catalogd during rename. test_rename_drop fail with NPE after IMPALA-14042. This is because CatalogServiceCatalog.renameTable() return null for not finding the database of oldTableName. This patch change renameTable() to return Pair.create(null, null) for that scenario. Refactor test_rename_drop slightly to ensure that invalidating the renamed table and dropping it are successful. Testing: - Add checkNotNull precondition in CatalogOpExecutor.alterTableOrViewRename() - Increase catalogd_table_rename_delay delay to 6s to ensure that DROP query happen in Catalogd before renameTable() called. Manually observed that No NPE is shown anymore. Change-Id: I7a421a71cf3703290645e85180de8e9d5e86368a Reviewed-on: http://gerrit.cloudera.org:8080/22899 Reviewed-by: Quanlong Huang <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/CatalogServiceCatalog.java | 2 +- .../apache/impala/service/CatalogOpExecutor.java | 1 + tests/custom_cluster/test_concurrent_rename.py | 24 ++++++++++++++++++---- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index f1373cd04..0e40679dc 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -2796,7 +2796,7 @@ public class CatalogServiceCatalog extends Catalog { TTableName oldTableName, TTableName newTableName) { // Remove the old table name from the cache and add the new table. Db db = getDb(oldTableName.getDb_name()); - if (db == null) return null; + if (db == null) return Pair.create(null, null); versionLock_.writeLock().lock(); try { Table oldTable = diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index b5acac0ad..1ce5bd01a 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -5774,6 +5774,7 @@ public class CatalogOpExecutor { // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP. Pair<Table, Table> result = catalog_.renameTable(tableName.toThrift(), newTableName.toThrift()); + Preconditions.checkNotNull(result); if (renamedTable != null) { org.apache.hadoop.hive.metastore.api.Table tblBefore = renamedTable.second.first; addToDeleteEventLog(renamedTable.first, DeleteEventLog diff --git a/tests/custom_cluster/test_concurrent_rename.py b/tests/custom_cluster/test_concurrent_rename.py index 100be5bd3..4f910c71c 100644 --- a/tests/custom_cluster/test_concurrent_rename.py +++ b/tests/custom_cluster/test_concurrent_rename.py @@ -39,7 +39,7 @@ class TestConcurrentRename(CustomClusterTestSuite): new_vector = deepcopy(vector) new_vector.get_value('exec_option')['debug_action'] = \ - "catalogd_table_rename_delay:SLEEP@5000" + "catalogd_table_rename_delay:SLEEP@6000" with self.create_impala_client_from_vector(new_vector) as alter_client, \ self.create_impala_client() as reset_client: version_after_create = catalogd.service.get_catalog_version() @@ -52,9 +52,12 @@ class TestConcurrentRename(CustomClusterTestSuite): while (time.time() - start_time < 10.0 and catalogd.service.get_catalog_version() <= version_after_create): time.sleep(0.05) - reset_handle = reset_client.execute_async("invalidate metadata {}".format(name)) - self.execute_query("invalidate metadata {}2".format(name)) - self.execute_query("drop table {}2".format(name)) + invalidate_1 = "invalidate metadata {}".format(name) + invalidate_2 = "invalidate metadata {}2".format(name) + drop_query = "drop table {}2".format(name) + reset_handle = reset_client.execute_async(invalidate_1) + inv_success = self.execute_query_until_success(invalidate_2) + drop_success = self.execute_query_until_success(drop_query) try: alter_client.wait_for_finished_timeout(alter_handle, timeout=10) @@ -66,6 +69,8 @@ class TestConcurrentRename(CustomClusterTestSuite): finally: reset_client.wait_for_finished_timeout(reset_handle, timeout=10) reset_client.close_query(reset_handle) + assert inv_success, "Expect '{}' to succeed, but failed.".format(invalidate_2) + assert drop_success, "Expect '{}' to succeed, but failed.".format(drop_query) def test_rename_invalidate(self, vector, unique_database): name = "{}.tbl".format(unique_database) @@ -83,3 +88,14 @@ class TestConcurrentRename(CustomClusterTestSuite): alter_client.wait_for_finished_timeout(alter_handle, timeout=10) alter_client.close_query(alter_handle) + + def execute_query_until_success(self, query, max_retry=3): + success = False + while not success and max_retry > 0: + try: + result = self.execute_query(query) + success = result.success + except Exception: + # ignore + max_retry -= 1 + return success
