This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit af46585801387a41153d4a957336d6f0101d09f7 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Fri Nov 11 16:11:14 2022 +0100 IMPALA-11721: Impala query keep being retried over frequently updated iceberg table Iceberg table loading can fail in local catalog mode if the table gets updated frequently. This is what happens during table loading in local catalog mode: every query starts with it's own empty local catalog. Table metadata is fetched in multiple requests via a MetaProvider which is always a CatalogdMetaProvider. CatalogdMetaProvider caches requests and the cache key also includes the table's catalog version. The Iceberg table is loaded by the following requests: 1 CatalogdMetaProvider.loadTable() 2 CatalogdMetaProvider.loadIcebergTable() 3 CatalogdMetaProvider.loadIcebergApiTable() # This actually directly # loads the Iceberg table # via Iceberg API # (no CatalogD involved) 4 CatalogdMetaProvider.loadTableColumnStatistics() 5 CatalogdMetaProvider.loadPartitionList() 6 CatalogdMetaProvider.loadPartitionsByRefs() Steps 1-4 happens during table loading, steps 5-6 happens during planning. We cannot really reorder these invocations, but since CatalogdMetaProvider caches these, only the very first invocations need to reach out to CatalogD and check the table's catalog version. Subsequent invocations, i.e. subsequent queries that use the Iceberg table can use the cached metadata, and no need to check the catalog version of the cached metadata since the cache key also includes the catalog version, hence we have corresponding metadata in the cache. This patch resolves the issue by pre-warming the metaprovider's cache before issuing loadIcebergApiTable() so the CatalogdMetaProvider.load*() operations can be served from cache. So what happens when the metaprovider's cache gets invalidated due to concurrent updates to the table and we are still processing the query? No problem, only the top-level TableCacheKey gets invalidated. The cache will still be able to answer the fine-grained load requests that are keyed by the now outdated catalog version. E.g. ColStatsCacheKey hashes db name, table name, catalog version, and column name as a key in the cache. Therefore the current query processing can be finished using a consistent state of the metadata. Subsequent queries will use a newer version of the table. Testing: * modified test_insert_stress.py so it won't tolerate inconsistent metadata fetch exceptions (Frontend already tolerates them to some degree) Change-Id: Iac28224b2b6d67725eeb17f3e9d813ba622edb43 Reviewed-on: http://gerrit.cloudera.org:8080/19234 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/local/LocalIcebergTable.java | 42 ++++++++++++++++++---- tests/stress/test_insert_stress.py | 26 ++++---------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java index 1813ce00e..219da5e85 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog.local; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -33,8 +34,8 @@ import org.apache.impala.catalog.FeFsPartition; import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.IcebergContentFileStore; -import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.TableLoadingException; +import org.apache.impala.catalog.local.MetaProvider.TableMetaRef; import org.apache.impala.thrift.TCompressionCodec; import org.apache.impala.thrift.THdfsPartition; import org.apache.impala.thrift.THdfsTable; @@ -87,6 +88,8 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable { TableParams tableParams = new TableParams(msTable); TPartialTableInfo tableInfo = db.getCatalog().getMetaProvider() .loadIcebergTable(ref); + LocalFsTable fsTable = LocalFsTable.load(db, msTable, ref); + warmupMetaProviderCache(db, msTable, ref, fsTable); org.apache.iceberg.Table icebergApiTable = db.getCatalog().getMetaProvider() .loadIcebergApiTable(ref, tableParams, msTable); List<Column> iceColumns = IcebergSchemaConverter.convertToImpalaSchema( @@ -96,8 +99,11 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable { /*numClusteringCols=*/ 0, db.getName() + "." + msTable.getTableName(), /*isFullAcidSchema=*/false); - return new LocalIcebergTable(db, msTable, ref, colMap, tableInfo, tableParams, - icebergApiTable); + return new LocalIcebergTable(db, msTable, ref, fsTable, colMap, tableInfo, + tableParams, icebergApiTable); + } catch (InconsistentMetadataFetchException e) { + // Just rethrow this so the query can be retried by the Frontend. + throw e; } catch (Exception e) { String fullTableName = msTable.getDbName() + "." + msTable.getTableName(); throw new TableLoadingException( @@ -105,13 +111,37 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable { } } + /** + * Eagerly warmup metaprovider cache before calling loadIcebergApiTable(). So + * later they can be served from cache when really needed. Otherwise, if there are + * frequent updates to the table, CatalogD might refresh the Iceberg table during + * loadIcebergApiTable(). + * If that happens, subsequent load*() calls via the metaprovider would + * fail due to InconsistentMetadataFetchException. + */ + private static void warmupMetaProviderCache(LocalDb db, Table msTable, TableMetaRef ref, + LocalFsTable fsTable) throws Exception { + db.getCatalog().getMetaProvider().loadTableColumnStatistics(ref, + getHmsColumnNames(msTable)); + FeCatalogUtils.loadAllPartitions(fsTable); + } + + private static List<String> getHmsColumnNames(Table msTable) { + List<String> ret = new ArrayList<>(); + for (FieldSchema fs : msTable.getSd().getCols()) { + ret.add(fs.getName()); + } + return ret; + } + private LocalIcebergTable(LocalDb db, Table msTable, MetaProvider.TableMetaRef ref, - ColumnMap cmap, TPartialTableInfo tableInfo, TableParams tableParams, - org.apache.iceberg.Table icebergApiTable) throws TableLoadingException { + LocalFsTable fsTable, ColumnMap cmap, TPartialTableInfo tableInfo, + TableParams tableParams, org.apache.iceberg.Table icebergApiTable) + throws TableLoadingException { super(db, msTable, ref, cmap); Preconditions.checkNotNull(tableInfo); - localFsTable_ = LocalFsTable.load(db, msTable, ref); + localFsTable_ = fsTable; tableParams_ = tableParams; fileStore_ = IcebergContentFileStore.fromThrift( tableInfo.getIceberg_table().getContent_files(), diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py index 5caf3db7c..fe43a38c9 100644 --- a/tests/stress/test_insert_stress.py +++ b/tests/stress/test_insert_stress.py @@ -48,16 +48,9 @@ class TestInsertStress(ImpalaTestSuite): try: insert_cnt = 0 while insert_cnt < num_inserts: - try: - impalad_client.execute("insert into table %s values (%i, %i)" % ( - tbl_name, wid, insert_cnt)) - insert_cnt += 1 - except Exception as e: - # It's possible that the Iceberg table is concurrently updated in CatalogD - # during data load in local catalog. - if "InconsistentMetadataFetchException" in str(e): - continue - raise e + impalad_client.execute("insert into table %s values (%i, %i)" % ( + tbl_name, wid, insert_cnt)) + insert_cnt += 1 finally: with counter.get_lock(): counter.value += 1 @@ -79,16 +72,9 @@ class TestInsertStress(ImpalaTestSuite): impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) try: while counter.value != writers: - try: - result = impalad_client.execute("select * from %s" % tbl_name) - verify_result_set(result) - time.sleep(random.random()) - except Exception as e: - # It's possible that the Iceberg table is concurrently updated in CatalogD - # during data load in local catalog. - if "InconsistentMetadataFetchException" in str(e): - continue - raise e + result = impalad_client.execute("select * from %s" % tbl_name) + verify_result_set(result) + time.sleep(random.random()) finally: impalad_client.close()
