This is an automated email from the ASF dual-hosted git repository. difin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 0f6868b55db HIVE-29028: Iceberg: Addendum: Retrieve tables without session (#5944) 0f6868b55db is described below commit 0f6868b55dba877c0cca897fed1c47deb7bab4c3 Author: Dmitriy Fingerman <dmitriy.finger...@gmail.com> AuthorDate: Thu Jul 10 23:22:32 2025 -0400 HIVE-29028: Iceberg: Addendum: Retrieve tables without session (#5944) --- .../apache/iceberg/mr/hive/IcebergTableUtil.java | 11 +++++ .../mr/hive/compaction/IcebergTableOptimizer.java | 47 ++++++---------------- .../metastore/task/IcebergHouseKeeperService.java | 14 +------ .../task/TestIcebergHouseKeeperService.java | 3 +- 4 files changed, 25 insertions(+), 50 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 2e4e5f0a094..d52775ead8f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -39,11 +39,13 @@ import org.apache.hadoop.hive.common.type.TimestampTZ; import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.utils.TableFetcher; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -572,4 +574,13 @@ public static boolean hasUndergonePartitionEvolution(Table table) { .map(ManifestFile::partitionSpecId) .anyMatch(id -> id != table.spec().specId()); } + + public static TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String dbPattern, + String tablePattern) { + return new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes( + "EXTERNAL_TABLE") + .tableCondition( + hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type like \"ICEBERG\" ") + .build(); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java index ab785844461..76badb57204 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java @@ -38,13 +38,11 @@ import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache; import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer; -import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName; import org.apache.iceberg.hive.RuntimeMetaException; import org.apache.iceberg.mr.hive.IcebergTableUtil; import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.thrift.TException; public class IcebergTableOptimizer extends TableOptimizer { private HiveMetaStoreClient client; @@ -71,34 +69,21 @@ public IcebergTableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache m * @param skipTables A {@link Set} of fully qualified table names to explicitly skip during the scan. * @return A {@link Set} of {@link CompactionInfo} objects representing tables and/or partitions * identified as eligible for compaction. - * @throws MetaException If an unrecoverable error occurs during Metastore communication or * during {@link SessionState} initialization. */ @Override public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompactResponse currentCompactions, - Set<String> skipDBs, Set<String> skipTables) throws MetaException { + Set<String> skipDBs, Set<String> skipTables) { Set<CompactionInfo> compactionTargets = Sets.newHashSet(); - try { - SessionState sessionState = SessionState.get(); - if (sessionState == null) { - sessionState = new SessionState(conf); - SessionState.start(sessionState); - } - } catch (Exception e) { - throw new MetaException(String.format("Error while finding compaction targets for Iceberg tables: %s", - e.getMessage())); - } - getDatabases().stream() - .filter(dbName -> !skipDBs.contains(dbName)) - .flatMap(dbName -> getTables(dbName).stream() - .map(tableName -> TableName.getDbTable(dbName, tableName))) - .filter(qualifiedTableName -> !skipTables.contains(qualifiedTableName)) - .map(qualifiedTableName -> Pair.of(qualifiedTableName, resolveMetastoreTable(qualifiedTableName))) + getTables().stream() + .filter(table -> !skipDBs.contains(table.getDb())) + .filter(table -> !skipTables.contains(table.getNotEmptyDbTable())) + .map(table -> Pair.of(table, resolveMetastoreTable(table.getNotEmptyDbTable()))) .filter(tablePair -> MetaStoreUtils.isIcebergTable(tablePair.getValue().getParameters())) .filter(tablePair -> { long currentSnapshotId = Long.parseLong(tablePair.getValue().getParameters().get("current-snapshot-id")); - Long cachedSnapshotId = snapshotIdCache.get(tablePair.getKey()); + Long cachedSnapshotId = snapshotIdCache.get(tablePair.getKey().getNotEmptyDbTable()); return cachedSnapshotId == null || cachedSnapshotId != currentSnapshotId; }) .forEach(tablePair -> { @@ -116,25 +101,17 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac currentCompactions, skipDBs, skipTables); } - snapshotIdCache.put(tablePair.getKey(), icebergTable.currentSnapshot().snapshotId()); + snapshotIdCache.put(tablePair.getKey().getNotEmptyDbTable(), icebergTable.currentSnapshot().snapshotId()); }); return compactionTargets; } - private List<String> getDatabases() { - try { - return client.getAllDatabases(); - } catch (TException e) { - throw new RuntimeMetaException(e, "Error getting database names"); - } - } - - private List<String> getTables(String dbName) { + private List<org.apache.hadoop.hive.common.TableName> getTables() { try { - return client.getAllTables(dbName); - } catch (TException e) { - throw new RuntimeMetaException(e, "Error getting table names of %s database", dbName); + return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTables(); + } catch (Exception e) { + throw new RuntimeMetaException(e, "Error getting table names"); } } @@ -157,7 +134,7 @@ private Table resolveMetastoreTable(String qualifiedTableName) { } public void init() throws MetaException { - client = new HiveMetaStoreClient(conf); + client = new HiveMetaStoreClient(new HiveConf()); snapshotIdCache = Maps.newConcurrentMap(); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index 8c2e010c238..0478d9c5c0f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -30,16 +30,13 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetTableRequest; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.metastore.utils.TableFetcher; import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.Table; import org.apache.iceberg.mr.hive.IcebergTableUtil; -import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +81,7 @@ private void expireTables(String catalogName, String dbPattern, String tablePatt try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) { // TODO: HIVE-28952 – modify TableFetcher to return HMS Table API objects directly, // avoiding the need for subsequent msc.getTable calls to fetch each matched table individually - List<TableName> tables = getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); + List<TableName> tables = IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(); LOG.debug("{} candidate tables found", tables.size()); @@ -101,15 +98,6 @@ private void expireTables(String catalogName, String dbPattern, String tablePatt } } - @VisibleForTesting - TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String dbPattern, String tablePattern) { - return new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes( - "EXTERNAL_TABLE") - .tableCondition( - hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type like \"ICEBERG\" ") - .build(); - } - private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) { return tableCache.get(tableName, key -> { LOG.debug("Getting iceberg table from metastore as it's not present in table cache: {}", tableName); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java index cd77beac66f..ca248e9b1a0 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java @@ -67,8 +67,7 @@ public static void afterClass() { public void testIcebergTableFetched() throws Exception { createIcebergTable("iceberg_table"); - IcebergHouseKeeperService service = new IcebergHouseKeeperService(); - TableFetcher tableFetcher = service.getTableFetcher(db.getMSC(), null, "default", "*"); + TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(), null, "default", "*"); List<TableName> tables = tableFetcher.getTables(); Assert.assertEquals(new TableName("hive", "default", "iceberg_table"), tables.get(0));