This is an automated email from the ASF dual-hosted git repository. okumin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new ac3ea055053 HIVE-28952: TableFetcher to return Table objects instead of names (#6020) ac3ea055053 is described below commit ac3ea0550537510cf72ca796b5d8e5cb30494b45 Author: Neeraj Khatri <108523406+neer...@users.noreply.github.com> AuthorDate: Fri Aug 15 11:52:41 2025 +0530 HIVE-28952: TableFetcher to return Table objects instead of names (#6020) --- .../mr/hive/compaction/IcebergTableOptimizer.java | 6 +-- .../metastore/task/IcebergHouseKeeperService.java | 32 ++++---------- .../task/TestIcebergHouseKeeperService.java | 9 ++-- .../hadoop/hive/metastore/utils/TableFetcher.java | 50 +++++++++++++++++----- .../hive/metastore/PartitionManagementTask.java | 2 +- 5 files changed, 57 insertions(+), 42 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java index e7a4afa6bb4..c04e8c013cd 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java @@ -88,7 +88,7 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac Set<String> skipDBs, Set<String> skipTables) { Set<CompactionInfo> compactionTargets = Sets.newHashSet(); - getTables().stream() + getTableNames().stream() .filter(table -> !skipDBs.contains(table.getDb())) .filter(table -> !skipTables.contains(table.getNotEmptyDbTable())) .map(table -> { @@ -126,9 +126,9 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac return compactionTargets; } - private List<org.apache.hadoop.hive.common.TableName> getTables() { + private List<org.apache.hadoop.hive.common.TableName> getTableNames() { try { - return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTables(); + return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTableNames(); } catch (Exception e) { throw new RuntimeMetaException(e, "Error getting table names"); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index 0478d9c5c0f..aa732e27eaf 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; -import org.apache.hadoop.hive.metastore.api.GetTableRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -37,7 +36,6 @@ import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.Table; import org.apache.iceberg.mr.hive.IcebergTableUtil; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,35 +77,21 @@ public void run() { private void expireTables(String catalogName, String dbPattern, String tablePattern) { try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { - // TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly, - // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually - List<TableName> tables = IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); - + int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); + List<org.apache.hadoop.hive.metastore.api.Table> tables = + IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(maxBatchSize); LOG.debug("{} candidate tables found", tables.size()); - - for (TableName table : tables) { - try { - expireSnapshotsForTable(getIcebergTable(table, msc)); - } catch (Exception e) { - LOG.error("Exception while running iceberg expiry service on catalog/db/table: {}/{}/{}", - catalogName, dbPattern, tablePattern, e); - } + for (org.apache.hadoop.hive.metastore.api.Table table : tables) { + expireSnapshotsForTable(getIcebergTable(table)); } } catch (Exception e) { throw new RuntimeException("Error while getting tables from metastore", e); } } - private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) { - return tableCache.get(tableName, key -> { - LOG.debug("Getting iceberg table from metastore as it's not present in table cache: {}", tableName); - GetTableRequest request = new GetTableRequest(tableName.getDb(), tableName.getTable()); - try { - return IcebergTableUtil.getTable(conf, msc.getTable(request)); - } catch (TException e) { - throw new RuntimeException(e); - } - }); + private Table getIcebergTable(org.apache.hadoop.hive.metastore.api.Table table) { + TableName tableName = TableName.fromString(table.getTableName(), table.getCatName(), table.getDbName()); + return tableCache.get(tableName, key -> IcebergTableUtil.getTable(conf, table)); } /** diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java index ca248e9b1a0..879d59b8de1 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java @@ -23,11 +23,11 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetTableRequest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.TableFetcher; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; @@ -69,8 +69,11 @@ public void testIcebergTableFetched() throws Exception { TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(), null, "default", "*"); - List<TableName> tables = tableFetcher.getTables(); - Assert.assertEquals(new TableName("hive", "default", "iceberg_table"), tables.get(0)); + int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); + List<org.apache.hadoop.hive.metastore.api.Table> tables = tableFetcher.getTables(maxBatchSize); + Assert.assertEquals("hive", tables.get(0).getCatName()); + Assert.assertEquals("default", tables.get(0).getDbName()); + Assert.assertEquals("iceberg_table", tables.get(0).getTableName()); } @Test diff --git a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java index 0e10635bc74..8b3c08aa72b 100644 --- a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java +++ b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java @@ -20,9 +20,11 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableIterable; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +92,7 @@ private void buildTableFilter(String tablePattern, List<String> conditions) { this.tableFilter = String.join(" and ", conditions); } - public List<TableName> getTables() throws Exception { + public List<TableName> getTableNames() throws Exception { List<TableName> candidates = new ArrayList<>(); // if tableTypes is empty, then a list with single empty string has to specified to scan no tables. @@ -102,21 +104,47 @@ public List<TableName> getTables() throws Exception { List<String> databases = client.getDatabases(catalogName, dbPattern); for (String db : databases) { - Database database = client.getDatabase(catalogName, db); - if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) { - LOG.debug("Skipping table under database: {}", db); - continue; - } - if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) { - LOG.info("Skipping table that belongs to database {} being failed over.", db); - continue; - } - List<String> tablesNames = client.listTableNamesByFilter(catalogName, db, tableFilter, -1); + List<String> tablesNames = getTableNamesForDatabase(catalogName, db); tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db))); } return candidates; } + public List<Table> getTables(int maxBatchSize) throws Exception { + List<Table> candidates = new ArrayList<>(); + + // if tableTypes is empty, then a list with single empty string has to specified to scan no tables. + if (tableTypes.isEmpty()) { + LOG.info("Table fetcher returns empty list as no table types specified"); + return candidates; + } + + List<String> databases = client.getDatabases(catalogName, dbPattern); + + for (String db : databases) { + List<String> tablesNames = getTableNamesForDatabase(catalogName, db); + for (Table table : new TableIterable(client, db, tablesNames, maxBatchSize)) { + candidates.add(table); + } + } + return candidates; + } + + private List<String> getTableNamesForDatabase(String catalogName, String dbName) throws Exception { + List<String> tableNames = new ArrayList<>(); + Database database = client.getDatabase(catalogName, dbName); + if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) { + LOG.debug("Skipping table under database: {}", dbName); + return tableNames; + } + if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) { + LOG.info("Skipping table that belongs to database {} being failed over.", dbName); + return tableNames; + } + tableNames = client.listTableNamesByFilter(catalogName, dbName, tableFilter, -1); + return tableNames; + } + public static class Builder { private final IMetaStoreClient client; private final String catalogName; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index 0749985392f..fa9d5e2e9dd 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -101,7 +101,7 @@ public void run() { .tableCondition( hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "discover__partitions like \"true\" ") .build() - .getTables(); + .getTableNames(); if (candidates.isEmpty()) { LOG.info("Got empty table list in catalog: {}, dbPattern: {}", catalogName, dbPattern);