This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 59bb9259ef44d7c9d3f3eb1355d500d3ba75f513 Author: ttttttz <[email protected]> AuthorDate: Sun Jun 25 13:42:32 2023 +0800 IMPALA-12131: Fix empty partition map in non-partitioned table when file metadata loading fails When inserting non-partitioned tables, the catalog update request could fail due to file not found exceptions. At that point we have reset(cleared) the partition map so it becomes empty after the failure, which is an illegal state and will cause failures in later operations. Currently, users have to manually invalidate the metadata of the table to recover. We can improve this by making all the updates happen after all the external loadings succeed. So any failures in loading the file metadata won't leave the table metadata in a partially updated state. Testing: 1. Added a test which simulates a failure in a catalog update request by throwing an exception through the debug action and confirms that subsequent catalog update requests are not affected by the failure. Change-Id: I28e76a73b7905c24eb93b935124d20ea7abe8513 Reviewed-on: http://gerrit.cloudera.org:8080/19878 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../java/org/apache/impala/catalog/HdfsTable.java | 12 +++++- .../apache/impala/service/CatalogOpExecutor.java | 10 ++--- .../java/org/apache/impala/util/DebugUtils.java | 4 ++ tests/query_test/test_insert.py | 44 ++++++++++++++++++++++ 4 files changed, 63 insertions(+), 7 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index da83a47ac..7037df6de 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -774,6 +774,12 @@ public class HdfsTable extends Table implements FeFsTable { final Clock clock = Clock.defaultClock(); long startTime = clock.getTick(); + if (DebugUtils.hasDebugAction(debugActions, + DebugUtils.LOAD_FILE_METADATA_THROW_EXCEPTION)) { + throw new CatalogException("Threw a catalog exception due to the debug action " + + "during loading file metadata."); + } + //TODO: maybe it'd be better to load the valid txn list in the context of a // transaction to have consistent valid write ids and valid transaction ids. // Currently tables are loaded when they are first referenced and stay in catalog @@ -1372,11 +1378,10 @@ public class HdfsTable extends Table implements FeFsTable { if (LOG.isTraceEnabled()) { LOG.trace("update unpartitioned table: " + getFullName()); } + // Step 1: fetch external metadata HdfsPartition oldPartition = Iterables.getOnlyElement(partitionMap_.values()); - resetPartitions(); org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); Preconditions.checkNotNull(msTbl); - setPrototypePartition(msTbl.getSd()); HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(), /*msPartition=*/null, new FsPermissionCache()); // Copy over the FDs from the old partition to the new one, so that @@ -1389,6 +1394,9 @@ public class HdfsTable extends Table implements FeFsTable { partBuilder.setPrevId(oldPartition.getId()); long fileMdLoadTime = loadFileMetadataForPartitions(client, ImmutableList.of(partBuilder), /*isRefresh=*/true, debugAction); + // Step 2: update internal fields + resetPartitions(); + setPrototypePartition(msTbl.getSd()); setUnpartitionedTableStats(partBuilder); addPartition(partBuilder.build()); return fileMdLoadTime; diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 1d7ac7bbf..35057b9f9 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -1500,7 +1500,7 @@ public class CatalogOpExecutor { boolean reloadFileMetadata, boolean reloadTableSchema, Set<String> partitionsToUpdate, String reason) throws CatalogException { loadTableMetadata(tbl, newCatalogVersion, reloadFileMetadata, reloadTableSchema, - partitionsToUpdate, null, reason); + partitionsToUpdate, null, reason, null); } /** @@ -1511,7 +1511,7 @@ public class CatalogOpExecutor { private void loadTableMetadata(Table tbl, long newCatalogVersion, boolean reloadFileMetadata, boolean reloadTableSchema, Set<String> partitionsToUpdate, @Nullable Map<String, Long> partitionToEventId, - String reason) + String reason, @Nullable String debugAction) throws CatalogException { Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { @@ -1519,8 +1519,8 @@ public class CatalogOpExecutor { getMetaStoreTable(msClient, tbl); if (tbl instanceof HdfsTable) { ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl, - reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, null, - partitionToEventId, reason); + reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, + debugAction, partitionToEventId, reason); } else { tbl.load(true, msClient.getHiveClient(), msTbl, reason); } @@ -6856,7 +6856,7 @@ public class CatalogOpExecutor { } loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata, - partitionToEventId, "INSERT"); + partitionToEventId, "INSERT", update.getDebug_action()); addTableToCatalogUpdate(table, update.header.want_minimal_response, response.result); } finally { diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 97cd2b522..45c5eede7 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -54,6 +54,10 @@ public class DebugUtils { // CatalogOpExecutor#updateCatalog() finishes. public static final String INSERT_FINISH_DELAY = "catalogd_insert_finish_delay"; + // debug action label for throwing an exception during loadFileMetadataForPartitions. + public static final String LOAD_FILE_METADATA_THROW_EXCEPTION = + "catalogd_load_file_metadata_throw_exception"; + // debug action label to abort the transaction in updateCatalog. public static final String UPDATE_CATALOG_ABORT_INSERT_TXN = "catalogd_update_catalog_abort_txn"; diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index b48e6ed09..d70af89da 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -491,3 +491,47 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite): assert num_instances_per_host == expected_num_instances_per_host, \ result.runtime_profile self.client.clear_configuration() + + +class TestInsertNonPartitionedTable(ImpalaTestSuite): + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestInsertNonPartitionedTable, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'text' + and v.get_value('table_format').compression_codec == 'none') + cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) + + @classmethod + def setup_class(cls): + super(TestInsertNonPartitionedTable, cls).setup_class() + + def test_insert_load_file_fail(self, vector, unique_database): + """Tests metadata won't be corrupted after file metadata loading fails + in non-partitioned tables.""" + table_name = '{0}.{1}'.format(unique_database, 'test_unpartition_tbl') + self.client.execute('create table {0}(f0 int)' + .format(table_name)) + self.client.execute('insert overwrite table {0} select 0' + .format(table_name)) + result = self.client.execute("select f0 from {0}".format(table_name)) + assert result.data == ["0"] + + exec_options = vector.get_value('exec_option') + exec_options['debug_action'] = 'catalogd_load_file_metadata_throw_exception' + try: + self.execute_query("insert overwrite table {0} select 1" + .format(table_name), exec_options) + assert False, "Expected query to fail." + except Exception as e: + assert "Failed to load metadata for table:" in str(e) + + exec_options['debug_action'] = '' + self.execute_query("insert overwrite table {0} select 2" + .format(table_name), exec_options) + result = self.client.execute("select f0 from {0}".format(table_name)) + assert result.data == ["2"]
