This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 742ea4f5152 HIVE-29205: Iceberg: Upgrade iceberg version to 1.10.0 
(#6227)
742ea4f5152 is described below

commit 742ea4f5152f1fa910c015c03ba904da1bb4a9e6
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue Dec 9 14:58:48 2025 +0100

    HIVE-29205: Iceberg: Upgrade iceberg version to 1.10.0 (#6227)
---
 .../iceberg/hive/HMSTablePropertyHelper.java       | 104 ++--
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |   2 +-
 .../apache/iceberg/hive/HiveOperationsBase.java    | 121 +++--
 .../apache/iceberg/hive/HiveTableOperations.java   |  70 ++-
 .../apache/iceberg/hive/HiveViewOperations.java    | 175 +++----
 .../org/apache/iceberg/hive/HiveTableTest.java     |   2 +-
 ...veTableBaseTest.java => HiveTableTestBase.java} |   2 +-
 .../org/apache/iceberg/hive/TestHiveCatalog.java   |   2 +-
 .../org/apache/iceberg/hive/TestHiveCommits.java   |   2 +-
 ...leTest.java => TestHiveCreateReplaceTable.java} |   2 +-
 .../iceberg/hive/TestHiveTableConcurrency.java     |   2 +-
 .../describe_iceberg_metadata_tables.q.out         |  36 +-
 iceberg/patched-iceberg-api/pom.xml                |   6 -
 .../org/apache/iceberg/util/StructProjection.java  | 225 ---------
 iceberg/patched-iceberg-core/pom.xml               |   4 -
 .../org/apache/iceberg/ManifestFilterManager.java  | 546 ---------------------
 .../java/org/apache/iceberg/PartitionsTable.java   | 333 -------------
 .../org/apache/iceberg/util/StructLikeMap.java     | 187 -------
 .../org/apache/iceberg/util/StructLikeWrapper.java | 103 ----
 iceberg/pom.xml                                    |   2 +-
 pom.xml                                            |   8 +-
 .../metastore-rest-catalog/pom.xml                 |   2 +-
 22 files changed, 291 insertions(+), 1645 deletions(-)

diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
index 45253e1dd8d..f7de4dd0719 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
@@ -45,6 +45,7 @@
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.view.ViewMetadata;
 import org.apache.parquet.Strings;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
 import org.slf4j.Logger;
@@ -54,34 +55,34 @@
 
 public class HMSTablePropertyHelper {
   private static final Logger LOG = 
LoggerFactory.getLogger(HMSTablePropertyHelper.class);
-  public static final String HIVE_ICEBERG_STORAGE_HANDLER = 
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
+  public static final String HIVE_ICEBERG_STORAGE_HANDLER =
+      "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
   public static final String PARTITION_SPEC = 
"iceberg.mr.table.partition.spec";
 
+  /**
+   * Provides key translation where necessary between Iceberg and HMS props. 
This translation is
+   * needed because some properties control the same behaviour but are named 
differently in Iceberg
+   * and Hive. Therefore changes to these property pairs should be 
synchronized.
+   *
+   * <p>Example: Deleting data files upon DROP TABLE is enabled using 
gc.enabled=true in Iceberg and
+   * external.table.purge=true in Hive. Hive and Iceberg users are unaware of 
each other's control
+   * flags, therefore inconsistent behaviour can occur from e.g. a Hive user's 
point of view if
+   * external.table.purge=true is set on the HMS table but gc.enabled=false is 
set on the Iceberg
+   * table, resulting in no data file deletion.
+   */
   private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION = 
ImmutableBiMap.of(
-      // gc.enabled in Iceberg and external.table.purge in Hive are meant to 
do the same things
-      // but with different names
-      GC_ENABLED, "external.table.purge", TableProperties.PARQUET_COMPRESSION, 
ParquetOutputFormat.COMPRESSION,
-      TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, 
ParquetOutputFormat.BLOCK_SIZE);
+      GC_ENABLED, "external.table.purge",
+      TableProperties.PARQUET_COMPRESSION, ParquetOutputFormat.COMPRESSION,
+      TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, 
ParquetOutputFormat.BLOCK_SIZE
+  );
 
   private HMSTablePropertyHelper() {
   }
 
-  /**
-   * Provides key translation where necessary between Iceberg and HMS props. 
This translation is needed because some
-   * properties control the same behaviour but are named differently in 
Iceberg and Hive. Therefore, changes to these
-   * property pairs should be synchronized.
-   *
-   * Example: Deleting data files upon DROP TABLE is enabled using 
gc.enabled=true in Iceberg and
-   * external.table.purge=true in Hive. Hive and Iceberg users are unaware of 
each other's control flags, therefore
-   * inconsistent behaviour can occur from e.g. a Hive user's point of view if 
external.table.purge=true is set on the
-   * HMS table but gc.enabled=false is set on the Iceberg table, resulting in 
no data file deletion.
-   *
-   * @param hmsProp The HMS property that should be translated to Iceberg 
property
-   * @return Iceberg property equivalent to the hmsProp. If no such 
translation exists, the original hmsProp is returned
-   */
-  public static String translateToIcebergProp(String hmsProp) {
+  static String translateToIcebergProp(String hmsProp) {
     return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp);
   }
+
   /** Updates the HMS Table properties based on the Iceberg Table metadata. */
   public static void updateHmsTableForIcebergTable(
       String newMetadataLocation,
@@ -136,6 +137,33 @@ public static void updateHmsTableForIcebergTable(
     tbl.setParameters(parameters);
   }
 
+  /** Updates the HMS Table properties based on the Iceberg View metadata. */
+  public static void updateHmsTableForIcebergView(
+      String newMetadataLocation,
+      Table tbl,
+      ViewMetadata metadata,
+      Set<String> obsoleteProps,
+      long maxHiveTablePropertySize,
+      String currentLocation) {
+    Map<String, String> parameters =
+        Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap);
+
+    // push all Iceberg view properties into HMS
+    metadata.properties().entrySet().stream()
+        .filter(entry -> 
!entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER))
+        .forEach(entry -> parameters.put(entry.getKey(), entry.getValue()));
+    setCommonParameters(
+        newMetadataLocation,
+        metadata.uuid(),
+        obsoleteProps,
+        currentLocation,
+        parameters,
+        HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
+        metadata.schema(),
+        maxHiveTablePropertySize);
+    tbl.setParameters(parameters);
+  }
+
   public static SortOrder getSortOrder(Properties props, Schema schema) {
     String sortOrderJsonString = 
props.getProperty(TableProperties.DEFAULT_SORT_ORDER);
     return Strings.isNullOrEmpty(sortOrderJsonString) ? SortOrder.unsorted() : 
SortOrderParser.fromJson(schema,
@@ -168,8 +196,7 @@ private static void setCommonParameters(
     setSchema(schema, parameters, maxHiveTablePropertySize);
   }
 
-  @VisibleForTesting
-  static void setStorageHandler(Map<String, String> parameters, boolean 
hiveEngineEnabled) {
+  private static void setStorageHandler(Map<String, String> parameters, 
boolean hiveEngineEnabled) {
     // If needed, set the 'storage_handler' property to enable query from Hive
     if (hiveEngineEnabled) {
       parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, 
HIVE_ICEBERG_STORAGE_HANDLER);
@@ -179,7 +206,8 @@ static void setStorageHandler(Map<String, String> 
parameters, boolean hiveEngine
   }
 
   @VisibleForTesting
-  static void setSnapshotStats(TableMetadata metadata, Map<String, String> 
parameters, long maxHiveTablePropertySize) {
+  static void setSnapshotStats(
+      TableMetadata metadata, Map<String, String> parameters, long 
maxHiveTablePropertySize) {
     parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID);
     parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP);
     parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);
@@ -188,17 +216,15 @@ static void setSnapshotStats(TableMetadata metadata, 
Map<String, String> paramet
     if (exposeInHmsProperties(maxHiveTablePropertySize) && currentSnapshot != 
null) {
       parameters.put(TableProperties.CURRENT_SNAPSHOT_ID, 
String.valueOf(currentSnapshot.snapshotId()));
       parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, 
String.valueOf(currentSnapshot.timestampMillis()));
+
       setSnapshotSummary(parameters, currentSnapshot, 
maxHiveTablePropertySize);
     }
-
     parameters.put(TableProperties.SNAPSHOT_COUNT, 
String.valueOf(metadata.snapshots().size()));
   }
 
   @VisibleForTesting
   static void setSnapshotSummary(
-      Map<String, String> parameters,
-      Snapshot currentSnapshot,
-      long maxHiveTablePropertySize) {
+      Map<String, String> parameters, Snapshot currentSnapshot, long 
maxHiveTablePropertySize) {
     try {
       String summary = 
JsonUtil.mapper().writeValueAsString(currentSnapshot.summary());
       if (summary.length() <= maxHiveTablePropertySize) {
@@ -208,14 +234,17 @@ static void setSnapshotSummary(
             currentSnapshot.snapshotId(), maxHiveTablePropertySize);
       }
     } catch (JsonProcessingException e) {
-      LOG.warn("Failed to convert current snapshot({}) summary to a json 
string", currentSnapshot.snapshotId(), e);
+      LOG.warn("Failed to convert current snapshot({}) summary to a json 
string",
+          currentSnapshot.snapshotId(), e);
     }
   }
 
   @VisibleForTesting
-  static void setPartitionSpec(TableMetadata metadata, Map<String, String> 
parameters, long maxHiveTablePropertySize) {
+  static void setPartitionSpec(
+      TableMetadata metadata, Map<String, String> parameters, long 
maxHiveTablePropertySize) {
     parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC);
-    if (exposeInHmsProperties(maxHiveTablePropertySize) && metadata.spec() != 
null && metadata.spec().isPartitioned()) {
+    if (exposeInHmsProperties(maxHiveTablePropertySize) &&
+        metadata.spec() != null && metadata.spec().isPartitioned()) {
       String spec = PartitionSpecParser.toJson(metadata.spec());
       setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec, 
maxHiveTablePropertySize);
     }
@@ -232,17 +261,19 @@ public static PartitionSpec getPartitionSpec(Map<String, 
String> props, Schema s
   }
 
   @VisibleForTesting
-  static void setSortOrder(TableMetadata metadata, Map<String, String> 
parameters, long maxHiveTablePropertySize) {
+  static void setSortOrder(
+      TableMetadata metadata, Map<String, String> parameters, long 
maxHiveTablePropertySize) {
     parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
     if (exposeInHmsProperties(maxHiveTablePropertySize) &&
-        metadata.sortOrder() != null &&
-        metadata.sortOrder().isSorted()) {
+        metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {
       String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
       setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder, 
maxHiveTablePropertySize);
     }
   }
 
-  public static void setSchema(Schema schema, Map<String, String> parameters, 
long maxHiveTablePropertySize) {
+  @VisibleForTesting
+  static void setSchema(
+      Schema schema, Map<String, String> parameters, long 
maxHiveTablePropertySize) {
     parameters.remove(TableProperties.CURRENT_SCHEMA);
     if (exposeInHmsProperties(maxHiveTablePropertySize) && schema != null) {
       String jsonSchema = SchemaParser.toJson(schema);
@@ -251,13 +282,12 @@ public static void setSchema(Schema schema, Map<String, 
String> parameters, long
   }
 
   private static void setField(
-      Map<String, String> parameters,
-      String key, String value,
-      long maxHiveTablePropertySize) {
+      Map<String, String> parameters, String key, String value, long 
maxHiveTablePropertySize) {
     if (value.length() <= maxHiveTablePropertySize) {
       parameters.put(key, value);
     } else {
-      LOG.warn("Not exposing {} in HMS since it exceeds {} characters", key, 
maxHiveTablePropertySize);
+      LOG.warn("Not exposing {} in HMS since it exceeds {} characters",
+          key, maxHiveTablePropertySize);
     }
   }
 
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 72280449ad5..d7193d4510a 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -494,7 +494,7 @@ public void createNamespace(Namespace namespace, 
Map<String, String> meta) {
 
   @Override
   public List<Namespace> listNamespaces(Namespace namespace) {
-    if (!isValidateNamespace(namespace) && !namespace.isEmpty()) {
+    if (!namespace.isEmpty() && (!isValidateNamespace(namespace) || 
!namespaceExists(namespace))) {
       throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
     }
     if (!namespace.isEmpty()) {
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
index d69652d80b2..937939b4816 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
@@ -33,6 +33,8 @@
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
 import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -42,12 +44,11 @@
 import org.slf4j.LoggerFactory;
 
 /** All the HMS operations like table,view,materialized_view should implement 
this. */
-public interface HiveOperationsBase {
+interface HiveOperationsBase {
 
   Logger LOG = LoggerFactory.getLogger(HiveOperationsBase.class);
-  String HIVE_ICEBERG_STORAGE_HANDLER = 
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
-
-  // The max size is based on HMS backend database. For Hive versions below 
2.3, the max table parameter size is 4000
+  // The max size is based on HMS backend database. For Hive versions below 
2.3, the max table
+  // parameter size is 4000
   // characters, see https://issues.apache.org/jira/browse/HIVE-12274
   // set to 0 to not expose Iceberg metadata in HMS Table properties.
   String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
@@ -56,8 +57,6 @@ public interface HiveOperationsBase {
   String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value";
   String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view";
 
-  TableType tableType();
-
   enum ContentType {
     TABLE("Table"),
     VIEW("View");
@@ -73,6 +72,8 @@ public String value() {
     }
   }
 
+  TableType tableType();
+
   ClientPool<IMetaStoreClient, TException> metaClients();
 
   long maxHiveTablePropertySize();
@@ -92,51 +93,74 @@ default Table loadHmsTable() throws TException, 
InterruptedException {
 
   default Map<String, String> hmsEnvContext(String metadataLocation) {
     return metadataLocation == null ? ImmutableMap.of() :
-      ImmutableMap.of(
-        NO_LOCK_EXPECTED_KEY,
-        BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
-        NO_LOCK_EXPECTED_VALUE,
-        metadataLocation);
+        ImmutableMap.of(
+            NO_LOCK_EXPECTED_KEY,
+            BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+            NO_LOCK_EXPECTED_VALUE,
+            metadataLocation);
+  }
+
+  private static boolean isValidIcebergView(Table table) {
+    String tableType = 
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+    return 
TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType()) &&
+        ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableType);
+  }
+
+  private static boolean isValidIcebergTable(Table table) {
+    String tableType = 
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+    return 
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(tableType);
   }
 
   static void validateTableIsIceberg(Table table, String fullName) {
     String tableType = 
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
     NoSuchIcebergTableException.check(
-        tableType != null && 
tableType.equalsIgnoreCase(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE),
-        "Not an iceberg table: %s (type=%s)", fullName, tableType);
+        isValidIcebergTable(table), "Not an iceberg table: %s (type=%s)", 
fullName, tableType);
   }
 
   static void validateTableIsIcebergView(Table table, String fullName) {
     String tableTypeProp = 
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
     NoSuchIcebergViewException.check(
-            
TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType()) &&
-                    ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
-            "Not an iceberg view: %s (type=%s) (tableType=%s)",
-            fullName,
-            tableTypeProp,
-            table.getTableType());
+        isValidIcebergView(table),
+        "Not an iceberg view: %s (type=%s) (tableType=%s)",
+        fullName,
+        tableTypeProp,
+        table.getTableType());
+  }
+
+  static void validateIcebergTableNotLoadedAsIcebergView(Table table, String 
fullName) {
+    if (!isValidIcebergView(table) && isValidIcebergTable(table)) {
+      throw new NoSuchViewException("View does not exist: %s", fullName);
+    }
+  }
+
+  static void validateIcebergViewNotLoadedAsIcebergTable(Table table, String 
fullName) {
+    if (!isValidIcebergTable(table) && isValidIcebergView(table)) {
+      throw new NoSuchTableException("Table does not exist: %s", fullName);
+    }
   }
 
   default void persistTable(Table hmsTable, boolean updateHiveTable, String 
metadataLocation)
       throws TException, InterruptedException {
     if (updateHiveTable) {
-      metaClients().run(
-          client -> {
-            MetastoreUtil.alterTable(
-                client, database(), table(), hmsTable, 
hmsEnvContext(metadataLocation));
-            return null;
-          });
+      metaClients()
+          .run(
+              client -> {
+                MetastoreUtil.alterTable(
+                    client, database(), table(), hmsTable, 
hmsEnvContext(metadataLocation));
+                return null;
+              });
     } else {
-      metaClients().run(
-          client -> {
-            client.createTable(hmsTable);
-            return null;
-          });
+      metaClients()
+          .run(
+              client -> {
+                client.createTable(hmsTable);
+                return null;
+              });
     }
   }
 
   static StorageDescriptor storageDescriptor(
-          Schema schema, String location, boolean hiveEngineEnabled) {
+      Schema schema, String location, boolean hiveEngineEnabled) {
     final StorageDescriptor storageDescriptor = new StorageDescriptor();
     storageDescriptor.setCols(HiveSchemaUtil.convert(schema));
     storageDescriptor.setLocation(location);
@@ -169,10 +193,10 @@ static void cleanupMetadata(FileIO io, String 
commitStatus, String metadataLocat
   }
 
   static void cleanupMetadataAndUnlock(
-          FileIO io,
-          BaseMetastoreOperations.CommitStatus commitStatus,
-          String metadataLocation,
-          HiveLock lock) {
+      FileIO io,
+      BaseMetastoreOperations.CommitStatus commitStatus,
+      String metadataLocation,
+      HiveLock lock) {
     try {
       cleanupMetadata(io, commitStatus.name(), metadataLocation);
     } finally {
@@ -184,24 +208,27 @@ default Table newHmsTable(String hmsTableOwner) {
     Preconditions.checkNotNull(hmsTableOwner, "'hmsOwner' parameter can't be 
null");
     final long currentTimeMillis = System.currentTimeMillis();
 
-    Table newTable = new Table(table(),
-        database(),
-        hmsTableOwner,
-        (int) currentTimeMillis / 1000,
-        (int) currentTimeMillis / 1000,
-        Integer.MAX_VALUE,
-        null,
-        Collections.emptyList(),
-        Maps.newHashMap(),
-        null,
-        null,
-        tableType().name());
+    Table newTable =
+        new Table(
+            table(),
+            database(),
+            hmsTableOwner,
+            (int) currentTimeMillis / 1000,
+            (int) currentTimeMillis / 1000,
+            Integer.MAX_VALUE,
+            null,
+            Collections.emptyList(),
+            Maps.newHashMap(),
+            null,
+            null,
+            tableType().name());
 
     if (tableType().equals(TableType.EXTERNAL_TABLE)) {
       newTable
           .getParameters()
           .put("EXTERNAL", "TRUE"); // using the external table type also 
requires this
     }
+
     return newTable;
   }
 }
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 2eab6f28065..a0f0e779d9b 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -72,8 +72,12 @@ public class HiveTableOperations extends 
BaseMetastoreTableOperations
   private final ClientPool<IMetaStoreClient, TException> metaClients;
 
   protected HiveTableOperations(
-        Configuration conf, ClientPool<IMetaStoreClient, TException> 
metaClients, FileIO fileIO,
-        String catalogName, String database, String table) {
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      FileIO fileIO,
+      String catalogName,
+      String database,
+      String table) {
     this.conf = conf;
     this.metaClients = metaClients;
     this.fileIO = fileIO;
@@ -104,6 +108,10 @@ protected void doRefresh() {
     String metadataLocation = null;
     try {
       Table table = metaClients.run(client -> client.getTable(database, 
tableName));
+
+      // Check if we are trying to load an Iceberg View as a Table
+      HiveOperationsBase.validateIcebergViewNotLoadedAsIcebergTable(table, 
fullName);
+      // Check if it is a valid Iceberg Table
       HiveOperationsBase.validateTableIsIceberg(table, fullName);
 
       metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
@@ -135,7 +143,7 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
     boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, 
false);
 
     BaseMetastoreOperations.CommitStatus commitStatus =
-            BaseMetastoreOperations.CommitStatus.FAILURE;
+        BaseMetastoreOperations.CommitStatus.FAILURE;
     boolean updateHiveTable = false;
 
     HiveLock lock = lockObject(base);
@@ -149,7 +157,7 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
         if (newTable && 
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != 
null) {
           if 
(TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())) {
             throw new AlreadyExistsException(
-                    "View with same name already exists: %s.%s", database, 
tableName);
+                "View with same name already exists: %s.%s", database, 
tableName);
           }
           throw new AlreadyExistsException("Table already exists: %s.%s", 
database, tableName);
         }
@@ -157,11 +165,14 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
         updateHiveTable = true;
         LOG.debug("Committing existing table: {}", fullName);
       } else {
-        tbl = newHmsTable(metadata.property(HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser()));
+        tbl = newHmsTable(
+            metadata.property(HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser())
+        );
         LOG.debug("Committing new table: {}", fullName);
       }
 
-      tbl.setSd(HiveOperationsBase.storageDescriptor(
+      tbl.setSd(
+          HiveOperationsBase.storageDescriptor(
               metadata.schema(),
               metadata.location(),
               hiveEngineEnabled)); // set to pickup any schema changes
@@ -183,6 +194,7 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
                 .filter(key -> !metadata.properties().containsKey(key))
                 .collect(Collectors.toSet());
       }
+
       HMSTablePropertyHelper.updateHmsTableForIcebergTable(
           newMetadataLocation,
           tbl,
@@ -198,6 +210,7 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
       }
 
       lock.ensureActive();
+
       try {
         persistTable(
             tbl, updateHiveTable, hiveLockEnabled(base, conf) ? null : 
baseMetadataLocation);
@@ -208,9 +221,9 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
         commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
         throw new CommitStateUnknownException(
             "Failed to heartbeat for hive lock while " +
-              "committing changes. This can lead to a concurrent commit 
attempt be able to overwrite this commit. " +
-              "Please check the commit history. If you are running into this 
issue, try reducing " +
-              "iceberg.hive.lock-heartbeat-interval-ms.",
+                "committing changes. This can lead to a concurrent commit 
attempt be able to overwrite this commit. " +
+                "Please check the commit history. If you are running into this 
issue, try reducing " +
+                "iceberg.hive.lock-heartbeat-interval-ms.",
             le);
       } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
         throw new AlreadyExistsException(e, "Table already exists: %s.%s", 
database, tableName);
@@ -225,16 +238,16 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
         if (e.getMessage() != null && e.getMessage().contains("Table/View 
'HIVE_LOCKS' does not exist")) {
           throw new RuntimeException(
               "Failed to acquire locks from metastore because the underlying 
metastore " +
-                "table 'HIVE_LOCKS' does not exist. This can occur when using 
an embedded metastore which does not " +
-                "support transactions. To fix this use an alternative 
metastore.",
+                  "table 'HIVE_LOCKS' does not exist. This can occur when 
using an embedded metastore which does not " +
+                  "support transactions. To fix this use an alternative 
metastore.",
               e);
         }
 
         commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
         if (e.getMessage() != null && e.getMessage().contains(
-                "The table has been modified. The parameter value for key '" +
-                        HiveTableOperations.METADATA_LOCATION_PROP +
-                        "' is")) {
+            "The table has been modified. The parameter value for key '" +
+                HiveTableOperations.METADATA_LOCATION_PROP +
+                "' is")) {
           // It's possible the HMS client incorrectly retries a successful 
operation, due to network
           // issue for example, and triggers this exception. So we need 
double-check to make sure
           // this is really a concurrent modification. Hitting this exception 
means no pending
@@ -242,16 +255,17 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
           commitStatus = checkCommitStatusStrict(newMetadataLocation, 
metadata);
           if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
             throw new CommitFailedException(
-                    e, "The table %s.%s has been modified concurrently", 
database, tableName);
+                e, "The table %s.%s has been modified concurrently", database, 
tableName);
           }
         } else {
           LOG.error(
-                  "Cannot tell if commit to {}.{} succeeded, attempting to 
reconnect and check.",
-                  database,
-                  tableName,
-                  e);
+              "Cannot tell if commit to {}.{} succeeded, attempting to 
reconnect and check.",
+              database,
+              tableName,
+              e);
           commitStatus = checkCommitStatus(newMetadataLocation, metadata);
         }
+
         switch (commitStatus) {
           case SUCCESS:
             break;
@@ -276,7 +290,8 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
       HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, 
newMetadataLocation, lock);
     }
 
-    LOG.info("Committed to table {} with the new metadata location {}", 
fullName, newMetadataLocation);
+    LOG.info(
+        "Committed to table {} with the new metadata location {}", fullName, 
newMetadataLocation);
   }
 
   @Override
@@ -306,14 +321,17 @@ public ClientPool<IMetaStoreClient, TException> 
metaClients() {
 
   /**
    * Returns if the hive engine related values should be enabled on the table, 
or not.
-   * <p>
-   * The decision is made like this:
+   *
+   * <p>The decision is made like this:
+   *
    * <ol>
-   * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
-   * <li>If the table property is not set then check the hive-site.xml 
property value
-   * {@link ConfigProperties#ENGINE_HIVE_ENABLED}
-   * <li>If none of the above is enabled then use the default value {@link 
TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+   *   <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+   *   <li>If the table property is not set then check the hive-site.xml 
property value {@link
+   *       ConfigProperties#ENGINE_HIVE_ENABLED}
+   *   <li>If none of the above is enabled then use the default value {@link
+   *       TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
    * </ol>
+   *
    * @param metadata Table metadata to use
    * @param conf The hive configuration to use
    * @return if the hive engine related values should be enabled or not
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
index 583998709fb..c0c959974a6 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
@@ -20,10 +20,7 @@
 package org.apache.iceberg.hive;
 
 import java.util.Collections;
-import java.util.Locale;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
@@ -71,11 +68,11 @@ final class HiveViewOperations extends BaseViewOperations 
implements HiveOperati
   private final String catalogName;
 
   HiveViewOperations(
-          Configuration conf,
-          ClientPool<IMetaStoreClient, TException> metaClients,
-          FileIO fileIO,
-          String catalogName,
-          TableIdentifier viewIdentifier) {
+      Configuration conf,
+      ClientPool<IMetaStoreClient, TException> metaClients,
+      FileIO fileIO,
+      String catalogName,
+      TableIdentifier viewIdentifier) {
     this.conf = conf;
     this.catalogName = catalogName;
     this.metaClients = metaClients;
@@ -84,7 +81,7 @@ final class HiveViewOperations extends BaseViewOperations 
implements HiveOperati
     this.database = viewIdentifier.namespace().level(0);
     this.viewName = viewIdentifier.name();
     this.maxHiveTablePropertySize =
-            conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, 
HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+        conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, 
HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
   }
 
   @Override
@@ -94,10 +91,14 @@ public void doRefresh() {
 
     try {
       table = metaClients.run(client -> client.getTable(database, viewName));
+
+      // Check if we are trying to load an Iceberg Table as a View
+      HiveOperationsBase.validateIcebergTableNotLoadedAsIcebergView(table, 
fullName);
+      // Check if it is a valid Iceberg View
       HiveOperationsBase.validateTableIsIcebergView(table, fullName);
 
       metadataLocation =
-              
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+          
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
 
     } catch (NoSuchObjectException e) {
       if (currentMetadataLocation() != null) {
@@ -105,7 +106,7 @@ public void doRefresh() {
       }
     } catch (TException e) {
       String errMsg =
-              String.format("Failed to get view info from metastore %s.%s", 
database, viewName);
+          String.format("Failed to get view info from metastore %s.%s", 
database, viewName);
       throw new RuntimeException(errMsg, e);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -136,11 +137,11 @@ public void doCommit(ViewMetadata base, ViewMetadata 
metadata) {
         // concurrent commit
         if (newView && 
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) != 
null) {
           throw new AlreadyExistsException(
-                  "%s already exists: %s.%s",
-                  TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(
-                          tbl.getTableType()) ? ContentType.VIEW.value() : 
ContentType.TABLE.value(),
-                  database,
-                  viewName);
+              "%s already exists: %s.%s",
+              TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(
+                  tbl.getTableType()) ? ContentType.VIEW.value() : 
ContentType.TABLE.value(),
+              database,
+              viewName);
         }
 
         updateHiveView = true;
@@ -151,32 +152,36 @@ public void doCommit(ViewMetadata base, ViewMetadata 
metadata) {
       }
 
       tbl.setSd(
-              HiveOperationsBase.storageDescriptor(
-                      metadata.schema(),
-                      metadata.location(),
-                      hiveEngineEnabled)); // set to pick up any schema changes
+          HiveOperationsBase.storageDescriptor(
+              metadata.schema(),
+              metadata.location(),
+              hiveEngineEnabled)); // set to pick up any schema changes
 
       String metadataLocation =
-              
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+          
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
       String baseMetadataLocation = base != null ? base.metadataFileLocation() 
: null;
       if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
         throw new CommitFailedException(
-                "Cannot commit: Base metadata location '%s' is not same as the 
current view metadata location " +
-                        "'%s' for %s.%s",
-                baseMetadataLocation, metadataLocation, database, viewName);
+            "Cannot commit: Base metadata location '%s' is not same as the 
current view metadata location " +
+                "'%s' for %s.%s",
+            baseMetadataLocation, metadataLocation, database, viewName);
       }
 
       // get Iceberg props that have been removed
       Set<String> removedProps = emptySet();
       if (base != null) {
         removedProps =
-                base.properties().keySet().stream()
-                        .filter(key -> !metadata.properties().containsKey(key))
-                        .collect(Collectors.toSet());
+            base.properties().keySet().stream()
+                .filter(key -> !metadata.properties().containsKey(key))
+                .collect(Collectors.toSet());
       }
-
-      setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps);
-
+      HMSTablePropertyHelper.updateHmsTableForIcebergView(
+          newMetadataLocation,
+          tbl,
+          metadata,
+          removedProps,
+          maxHiveTablePropertySize,
+          currentMetadataLocation());
       lock.ensureActive();
 
       try {
@@ -187,12 +192,12 @@ public void doCommit(ViewMetadata base, ViewMetadata 
metadata) {
       } catch (LockException le) {
         commitStatus = CommitStatus.UNKNOWN;
         throw new CommitStateUnknownException(
-                "Failed to heartbeat for hive lock while " +
-                        "committing changes. This can lead to a concurrent 
commit attempt be able to overwrite " +
-                        "this commit. " +
-                        "Please check the commit history. If you are running 
into this issue, try reducing " +
-                        "iceberg.hive.lock-heartbeat-interval-ms.",
-                le);
+            "Failed to heartbeat for hive lock while " +
+                "committing changes. This can lead to a concurrent commit 
attempt be able to overwrite " +
+                "this commit. " +
+                "Please check the commit history. If you are running into this 
issue, try reducing " +
+                "iceberg.hive.lock-heartbeat-interval-ms.",
+            le);
       } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
         throw new AlreadyExistsException(e, "View already exists: %s.%s", 
database, viewName);
 
@@ -204,34 +209,34 @@ public void doCommit(ViewMetadata base, ViewMetadata 
metadata) {
 
       } catch (Throwable e) {
         if (e.getMessage() != null &&
-                e.getMessage().contains(
-                        "The table has been modified. The parameter value for 
key '" +
-                                
BaseMetastoreTableOperations.METADATA_LOCATION_PROP +
-                                "' is")) {
+            e.getMessage().contains(
+                "The table has been modified. The parameter value for key '" +
+                    BaseMetastoreTableOperations.METADATA_LOCATION_PROP +
+                    "' is")) {
           throw new CommitFailedException(
-                  e, "The view %s.%s has been modified concurrently", 
database, viewName);
+              e, "The view %s.%s has been modified concurrently", database, 
viewName);
         }
 
         if (e.getMessage() != null && e.getMessage().contains("Table/View 
'HIVE_LOCKS' does not exist")) {
           throw new RuntimeException(
-                  "Failed to acquire locks from metastore because the 
underlying metastore " +
-                          "view 'HIVE_LOCKS' does not exist. This can occur 
when using an embedded metastore " +
-                          "which does not support transactions. To fix this 
use an alternative metastore.",
-                  e);
+              "Failed to acquire locks from metastore because the underlying 
metastore " +
+                  "view 'HIVE_LOCKS' does not exist. This can occur when using 
an embedded metastore " +
+                  "which does not support transactions. To fix this use an 
alternative metastore.",
+              e);
         }
 
         LOG.error(
-                "Cannot tell if commit to {}.{} succeeded, attempting to 
reconnect and check.",
-                database,
-                viewName,
-                e);
+            "Cannot tell if commit to {}.{} succeeded, attempting to reconnect 
and check.",
+            database,
+            viewName,
+            e);
         commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
         commitStatus =
-                checkCommitStatus(
-                        viewName,
-                        newMetadataLocation,
-                        metadata.properties(),
-                        () -> 
checkCurrentMetadataLocation(newMetadataLocation));
+            checkCommitStatus(
+                viewName,
+                newMetadataLocation,
+                metadata.properties(),
+                () -> checkCurrentMetadataLocation(newMetadataLocation));
         switch (commitStatus) {
           case SUCCESS:
             break;
@@ -243,7 +248,7 @@ public void doCommit(ViewMetadata base, ViewMetadata 
metadata) {
       }
     } catch (TException e) {
       throw new RuntimeException(
-              String.format("Metastore operation failed for %s.%s", database, 
viewName), e);
+          String.format("Metastore operation failed for %s.%s", database, 
viewName), e);
 
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -257,7 +262,7 @@ public void doCommit(ViewMetadata base, ViewMetadata 
metadata) {
     }
 
     LOG.info(
-            "Committed to view {} with the new metadata location {}", 
fullName, newMetadataLocation);
+        "Committed to view {} with the new metadata location {}", fullName, 
newMetadataLocation);
   }
 
   /**
@@ -271,36 +276,6 @@ private boolean checkCurrentMetadataLocation(String 
newMetadataLocation) {
     return newMetadataLocation.equals(metadata.metadataFileLocation());
   }
 
-  private void setHmsTableParameters(
-          String newMetadataLocation, Table tbl, ViewMetadata metadata, 
Set<String> obsoleteProps) {
-    Map<String, String> parameters =
-            
Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap);
-
-    // push all Iceberg view properties into HMS
-    metadata.properties().entrySet().stream()
-            .filter(entry -> 
!entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER))
-            .forEach(entry -> parameters.put(entry.getKey(), 
entry.getValue()));
-    if (metadata.uuid() != null) {
-      parameters.put("uuid", metadata.uuid());
-    }
-
-    // remove any props from HMS that are no longer present in Iceberg view 
props
-    obsoleteProps.forEach(parameters::remove);
-
-    parameters.put(
-            BaseMetastoreTableOperations.TABLE_TYPE_PROP,
-            ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
-    parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, 
newMetadataLocation);
-
-    if (currentMetadataLocation() != null && 
!currentMetadataLocation().isEmpty()) {
-      parameters.put(
-              BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP, 
currentMetadataLocation());
-    }
-
-//    setSchema(metadata.schema(), parameters);
-    tbl.setParameters(parameters);
-  }
-
   private static boolean hiveLockEnabled(Configuration conf) {
     return conf.getBoolean(ConfigProperties.LOCK_HIVE_ENABLED, true);
   }
@@ -308,23 +283,23 @@ private static boolean hiveLockEnabled(Configuration 
conf) {
   private Table newHMSView(ViewMetadata metadata) {
     final long currentTimeMillis = System.currentTimeMillis();
     String hmsTableOwner =
-            PropertyUtil.propertyAsString(
-                    metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser());
+        PropertyUtil.propertyAsString(
+            metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser());
     String sqlQuery = sqlFor(metadata);
 
     return new Table(
-            table(),
-            database(),
-            hmsTableOwner,
-            (int) currentTimeMillis / 1000,
-            (int) currentTimeMillis / 1000,
-            Integer.MAX_VALUE,
-            null,
-            Collections.emptyList(),
-            Maps.newHashMap(),
-            sqlQuery,
-            sqlQuery,
-            tableType().name());
+        table(),
+        database(),
+        hmsTableOwner,
+        (int) currentTimeMillis / 1000,
+        (int) currentTimeMillis / 1000,
+        Integer.MAX_VALUE,
+        null,
+        Collections.emptyList(),
+        Maps.newHashMap(),
+        sqlQuery,
+        sqlQuery,
+        tableType().name());
   }
 
   private String sqlFor(ViewMetadata metadata) {
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index 70ce12a8310..7bf66012030 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -85,7 +85,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-public class HiveTableTest extends HiveTableBaseTest {
+public class HiveTableTest extends HiveTableTestBase {
   static final String NON_DEFAULT_DATABASE =  "nondefault";
 
   @TempDir
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTestBase.java
similarity index 99%
rename from 
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
rename to 
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTestBase.java
index 321c1e59716..46ee5ee0cf5 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTestBase.java
@@ -45,7 +45,7 @@
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 
-public class HiveTableBaseTest {
+public class HiveTableTestBase {
 
   static final String TABLE_NAME =  "tbl";
   static final String DB_NAME = "hivedb";
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index b59828466d6..36f08e58633 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -1076,6 +1076,7 @@ public void testSnapshotStatsTableProperties() throws 
Exception {
 
   @Test
   public void testSetSnapshotSummary() throws Exception {
+    final long maxHiveTablePropertySize = 4000;
     Snapshot snapshot = mock(Snapshot.class);
     Map<String, String> summary = Maps.newHashMap();
     when(snapshot.summary()).thenReturn(summary);
@@ -1086,7 +1087,6 @@ public void testSetSnapshotSummary() throws Exception {
     }
     
assertThat(JsonUtil.mapper().writeValueAsString(summary).length()).isLessThan(4000);
     Map<String, String> parameters = Maps.newHashMap();
-    final long maxHiveTablePropertySize = 4000;
     HMSTablePropertyHelper.setSnapshotSummary(parameters, snapshot, 
maxHiveTablePropertySize);
     assertThat(parameters).as("The snapshot summary must be in 
parameters").hasSize(1);
 
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index 7bd2c882314..343f37dcb42 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -48,7 +48,7 @@
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-public class TestHiveCommits extends HiveTableBaseTest {
+public class TestHiveCommits extends HiveTableTestBase {
 
   @Test
   public void testSuppressUnlockExceptions() {
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java
similarity index 99%
rename from 
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
rename to 
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java
index 2c5246e8f9d..994ca189971 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java
@@ -52,7 +52,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-public class HiveCreateReplaceTableTest {
+public class TestHiveCreateReplaceTable {
 
   private static final String DB_NAME = "hivedb";
   private static final String TABLE_NAME = "tbl";
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
index b2e41bfd446..48f60ec716e 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
@@ -40,7 +40,7 @@
 import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class TestHiveTableConcurrency extends HiveTableBaseTest {
+public class TestHiveTableConcurrency extends HiveTableTestBase {
 
   @Test
   public synchronized void testConcurrentFastAppends() {
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
index 388cdf35b3b..2259eff2916 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
@@ -58,7 +58,7 @@ key_metadata          binary                  Encryption key 
metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -171,7 +171,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -210,7 +210,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -237,7 +237,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -287,7 +287,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -314,7 +314,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -342,7 +342,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -462,7 +462,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -503,7 +503,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -531,7 +531,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -584,7 +584,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -612,7 +612,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -639,7 +639,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -752,7 +752,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -792,7 +792,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -820,7 +820,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -873,7 +873,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
@@ -901,7 +901,7 @@ key_metadata                binary                  
Encryption key metadata blob
 split_offsets          array<bigint>           Splittable offsets  
 equality_ids           array<int>              Equality comparison field IDs
 sort_order_id          int                     Sort order ID       
-first_row_id           bigint                  Starting row ID to assign to 
new rows
+first_row_id           bigint                  The first row ID assigned to 
the first row in the data file
 referenced_data_file   string                  Fully qualified location (URI 
with FS scheme) of a data file that all deletes reference
 content_offset         bigint                  The offset in the file where 
the content starts
 content_size_in_bytes  bigint                  The length of referenced 
content stored in the file
diff --git a/iceberg/patched-iceberg-api/pom.xml 
b/iceberg/patched-iceberg-api/pom.xml
index 19ec82fcb85..fbdd0b72322 100644
--- a/iceberg/patched-iceberg-api/pom.xml
+++ b/iceberg/patched-iceberg-api/pom.xml
@@ -36,11 +36,6 @@
       <version>${iceberg.version}</version>
       <optional>true</optional>
     </dependency>
-    <dependency>
-      <groupId>org.apache.iceberg</groupId>
-      <artifactId>iceberg-bundled-guava</artifactId>
-      <version>${iceberg.version}</version>
-    </dependency>
   </dependencies>
   <build>
     <plugins>
@@ -64,7 +59,6 @@
                   <overWrite>true</overWrite>
                   
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                   <excludes>
-                      **/StructProjection.class
                   </excludes>
                 </artifactItem>
               </artifactItems>
diff --git 
a/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
 
b/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
deleted file mode 100644
index 15f5d965206..00000000000
--- 
a/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.util;
-
-import java.util.List;
-import java.util.Set;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.ListType;
-import org.apache.iceberg.types.Types.MapType;
-import org.apache.iceberg.types.Types.StructType;
-
-public class StructProjection implements StructLike {
-  /**
-   * Creates a projecting wrapper for {@link StructLike} rows.
-   *
-   * <p>This projection does not work with repeated types like lists and maps.
-   *
-   * @param schema schema of rows wrapped by this projection
-   * @param ids field ids from the row schema to project
-   * @return a wrapper to project rows
-   */
-  public static StructProjection create(Schema schema, Set<Integer> ids) {
-    StructType structType = schema.asStruct();
-    return new StructProjection(structType, TypeUtil.project(structType, ids));
-  }
-
-  /**
-   * Creates a projecting wrapper for {@link StructLike} rows.
-   *
-   * <p>This projection does not work with repeated types like lists and maps.
-   *
-   * @param dataSchema schema of rows wrapped by this projection
-   * @param projectedSchema result schema of the projected rows
-   * @return a wrapper to project rows
-   */
-  public static StructProjection create(Schema dataSchema, Schema 
projectedSchema) {
-    return new StructProjection(dataSchema.asStruct(), 
projectedSchema.asStruct());
-  }
-
-  /**
-   * Creates a projecting wrapper for {@link StructLike} rows.
-   *
-   * <p>This projection does not work with repeated types like lists and maps.
-   *
-   * @param structType type of rows wrapped by this projection
-   * @param projectedStructType result type of the projected rows
-   * @return a wrapper to project rows
-   */
-  public static StructProjection create(StructType structType, StructType 
projectedStructType) {
-    return new StructProjection(structType, projectedStructType);
-  }
-
-  /**
-   * Creates a projecting wrapper for {@link StructLike} rows.
-   *
-   * <p>This projection allows missing fields and does not work with repeated 
types like lists and
-   * maps.
-   *
-   * @param structType type of rows wrapped by this projection
-   * @param projectedStructType result type of the projected rows
-   * @return a wrapper to project rows
-   */
-  public static StructProjection createAllowMissing(
-      StructType structType, StructType projectedStructType) {
-    return new StructProjection(structType, projectedStructType, true);
-  }
-
-  private final StructType type;
-  private final int[] positionMap;
-  private final StructProjection[] nestedProjections;
-  private StructLike struct;
-
-  private StructProjection(
-      StructType type, int[] positionMap, StructProjection[] 
nestedProjections) {
-    this.type = type;
-    this.positionMap = positionMap;
-    this.nestedProjections = nestedProjections;
-  }
-
-  private StructProjection(StructType structType, StructType projection) {
-    this(structType, projection, false);
-  }
-
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
-  private StructProjection(StructType structType, StructType projection, 
boolean allowMissing) {
-    this.type = projection;
-    this.positionMap = new int[projection.fields().size()];
-    this.nestedProjections = new StructProjection[projection.fields().size()];
-
-    // set up the projection positions and any nested projections that are 
needed
-    List<Types.NestedField> dataFields = structType.fields();
-    for (int pos = 0; pos < positionMap.length; pos += 1) {
-      Types.NestedField projectedField = projection.fields().get(pos);
-
-      boolean found = false;
-      for (int i = 0; !found && i < dataFields.size(); i += 1) {
-        Types.NestedField dataField = dataFields.get(i);
-        if (projectedField.fieldId() == dataField.fieldId()) {
-          found = true;
-          positionMap[pos] = i;
-          switch (projectedField.type().typeId()) {
-            case STRUCT:
-              nestedProjections[pos] =
-                  new StructProjection(
-                  dataField.type().asStructType(), 
projectedField.type().asStructType());
-              break;
-            case MAP:
-              MapType projectedMap = projectedField.type().asMapType();
-              MapType originalMap = dataField.type().asMapType();
-
-              boolean keyProjectable =
-                  !projectedMap.keyType().isNestedType() ||
-                      projectedMap.keyType().equals(originalMap.keyType());
-              boolean valueProjectable =
-                  !projectedMap.valueType().isNestedType() ||
-                      projectedMap.valueType().equals(originalMap.valueType());
-              Preconditions.checkArgument(
-                  keyProjectable && valueProjectable,
-                  "Cannot project a partial map key or value struct. Trying to 
project %s out of %s",
-                  projectedField,
-                  dataField);
-
-              nestedProjections[pos] = null;
-              break;
-            case LIST:
-              ListType projectedList = projectedField.type().asListType();
-              ListType originalList = dataField.type().asListType();
-
-              boolean elementProjectable =
-                  !projectedList.elementType().isNestedType() ||
-                      
projectedList.elementType().equals(originalList.elementType());
-              Preconditions.checkArgument(
-                  elementProjectable,
-                  "Cannot project a partial list element struct. Trying to 
project %s out of %s",
-                  projectedField,
-                  dataField);
-
-              nestedProjections[pos] = null;
-              break;
-            default:
-              nestedProjections[pos] = null;
-          }
-        }
-      }
-
-      if (!found && projectedField.isOptional() && allowMissing) {
-        positionMap[pos] = -1;
-        nestedProjections[pos] = null;
-      } else if (!found) {
-        throw new IllegalArgumentException(
-          String.format("Cannot find field %s in %s", projectedField, 
structType));
-      }
-    }
-  }
-
-  public int projectedFields() {
-    return (int) Ints.asList(positionMap).stream().filter(val -> val != 
-1).count();
-  }
-
-  public StructProjection wrap(StructLike newStruct) {
-    this.struct = newStruct;
-    return this;
-  }
-
-  public StructProjection copyFor(StructLike newStruct) {
-    return new StructProjection(type, positionMap, 
nestedProjections).wrap(newStruct);
-  }
-
-  @Override
-  public int size() {
-    return type.fields().size();
-  }
-
-  @Override
-  public <T> T get(int pos, Class<T> javaClass) {
-    // struct can be null if wrap is not called first before the get call
-    // or if a null struct is wrapped.
-    if (struct == null) {
-      return null;
-    }
-
-    int structPos = positionMap[pos];
-    if (nestedProjections[pos] != null) {
-      StructLike nestedStruct = struct.get(structPos, StructLike.class);
-      if (nestedStruct == null) {
-        return null;
-      }
-
-      return javaClass.cast(nestedProjections[pos].wrap(nestedStruct));
-    }
-
-    if (structPos != -1) {
-      return struct.get(structPos, javaClass);
-    } else {
-      return null;
-    }
-  }
-
-  @Override
-  public <T> void set(int pos, T value) {
-    throw new UnsupportedOperationException("Cannot set fields in a 
TypeProjection");
-  }
-}
diff --git a/iceberg/patched-iceberg-core/pom.xml 
b/iceberg/patched-iceberg-core/pom.xml
index 6311294f51f..d266b814a45 100644
--- a/iceberg/patched-iceberg-core/pom.xml
+++ b/iceberg/patched-iceberg-core/pom.xml
@@ -94,10 +94,6 @@
                   <excludes>
                       **/HadoopInputFile.class
                       **/HadoopTableOperations.class
-                      **/StructLikeMap.class
-                      **/StructLikeWrapper.class
-                      org.apache.iceberg.PartitionsTable.class
-                      org.apache.iceberg.ManifestFilterManager.class
                   </excludes>
                 </artifactItem>
               </artifactItems>
diff --git 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
deleted file mode 100644
index 716f8a8a0a8..00000000000
--- 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.ResidualEvaluator;
-import org.apache.iceberg.expressions.StrictMetricsEvaluator;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.CharSequenceSet;
-import org.apache.iceberg.util.ManifestFileUtil;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.PartitionSet;
-import org.apache.iceberg.util.StructLikeMap;
-import org.apache.iceberg.util.Tasks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * This patch effectively reverts the change introduced in Iceberg version 
1.9.1
- * (related to #11131, "Core: Optimize MergingSnapshotProducer to use 
referenced manifests to determine if manifest
- * needs to be rewritten"). The original change caused orphan delete files to 
remain in manifests,
- * which led to incorrect Iceberg table statistics and inaccurate row counts 
when tables were
- * queried via Hive. This patch aims to correct the handling of orphan 
positional delete files during manifest
- * filtering to ensure accurate table state.
- *
- */
-
-abstract class ManifestFilterManager<F extends ContentFile<F>> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFilterManager.class);
-  private static final Joiner COMMA = Joiner.on(",");
-
-  protected static class DeleteException extends ValidationException {
-    private final String partition;
-
-    private DeleteException(String partition) {
-      super("Operation would delete existing data");
-      this.partition = partition;
-    }
-
-    public String partition() {
-      return partition;
-    }
-  }
-
-  private final Map<Integer, PartitionSpec> specsById;
-  private final PartitionSet deleteFilePartitions;
-  private final Set<F> deleteFiles = newFileSet();
-  private final PartitionSet dropPartitions;
-  private final CharSequenceSet deletePaths = CharSequenceSet.empty();
-  private Expression deleteExpression = Expressions.alwaysFalse();
-  private long minSequenceNumber = 0;
-  private boolean failAnyDelete = false;
-  private boolean failMissingDeletePaths = false;
-  private int duplicateDeleteCount = 0;
-  private boolean caseSensitive = true;
-
-  // cache filtered manifests to avoid extra work when commits fail.
-  private final Map<ManifestFile, ManifestFile> filteredManifests = 
Maps.newConcurrentMap();
-
-  // tracking where files were deleted to validate retries quickly
-  private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
-      Maps.newConcurrentMap();
-
-  private final Supplier<ExecutorService> workerPoolSupplier;
-
-  protected ManifestFilterManager(
-      Map<Integer, PartitionSpec> specsById, Supplier<ExecutorService> 
executorSupplier) {
-    this.specsById = specsById;
-    this.deleteFilePartitions = PartitionSet.create(specsById);
-    this.dropPartitions = PartitionSet.create(specsById);
-    this.workerPoolSupplier = executorSupplier;
-  }
-
-  protected abstract void deleteFile(String location);
-
-  protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
-
-  protected abstract ManifestReader<F> newManifestReader(ManifestFile 
manifest);
-
-  protected abstract Set<F> newFileSet();
-
-  protected void failAnyDelete() {
-    this.failAnyDelete = true;
-  }
-
-  protected void failMissingDeletePaths() {
-    this.failMissingDeletePaths = true;
-  }
-
-  /**
-   * Add a filter to match files to delete. A file will be deleted if all of 
the rows it contains
-   * match this or any other filter passed to this method.
-   *
-   * @param expr an expression to match rows.
-   */
-  protected void deleteByRowFilter(Expression expr) {
-    Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
-    invalidateFilteredCache();
-    this.deleteExpression = Expressions.or(deleteExpression, expr);
-  }
-
-  /** Add a partition tuple to drop from the table during the delete phase. */
-  protected void dropPartition(int specId, StructLike partition) {
-    Preconditions.checkNotNull(partition, "Cannot delete files in invalid 
partition: null");
-    invalidateFilteredCache();
-    dropPartitions.add(specId, partition);
-  }
-
-  /**
-   * Set the sequence number used to remove old delete files.
-   *
-   * <p>Delete files with a sequence number older than the given value will be 
removed. By setting
-   * this to the sequence number of the oldest data file in the table, this 
will continuously remove
-   * delete files that are no longer needed because deletes cannot match any 
existing rows in the
-   * table.
-   *
-   * @param sequenceNumber a sequence number used to remove old delete files
-   */
-  protected void dropDeleteFilesOlderThan(long sequenceNumber) {
-    Preconditions.checkArgument(
-        sequenceNumber >= 0, "Invalid minimum data sequence number: %s", 
sequenceNumber);
-    this.minSequenceNumber = sequenceNumber;
-  }
-
-  void caseSensitive(boolean newCaseSensitive) {
-    this.caseSensitive = newCaseSensitive;
-  }
-
-  /** Add a specific path to be deleted in the new snapshot. */
-  void delete(F file) {
-    Preconditions.checkNotNull(file, "Cannot delete file: null");
-    invalidateFilteredCache();
-    deleteFiles.add(file);
-    deleteFilePartitions.add(file.specId(), file.partition());
-  }
-
-  /** Add a specific path to be deleted in the new snapshot. */
-  void delete(CharSequence path) {
-    Preconditions.checkNotNull(path, "Cannot delete file path: null");
-    invalidateFilteredCache();
-    deletePaths.add(path);
-  }
-
-  boolean containsDeletes() {
-    return !deletePaths.isEmpty() || !deleteFiles.isEmpty() || 
deleteExpression != Expressions.alwaysFalse() ||
-        !dropPartitions.isEmpty();
-  }
-
-  /**
-   * Filter deleted files out of a list of manifests.
-   *
-   * @param tableSchema the current table schema
-   * @param manifests a list of manifests to be filtered
-   * @return an array of filtered manifests
-   */
-  List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> 
manifests) {
-    if (manifests == null || manifests.isEmpty()) {
-      validateRequiredDeletes();
-      return ImmutableList.of();
-    }
-
-    ManifestFile[] filtered = new ManifestFile[manifests.size()];
-    // open all of the manifest files in parallel, use index to avoid 
reordering
-    Tasks.range(filtered.length)
-        .stopOnFailure()
-        .throwFailureWhenFinished()
-        .executeWith(workerPoolSupplier.get())
-        .run(
-            index -> {
-              ManifestFile manifest = filterManifest(tableSchema, 
manifests.get(index));
-              filtered[index] = manifest;
-            });
-
-    validateRequiredDeletes(filtered);
-
-    return Arrays.asList(filtered);
-  }
-
-  /**
-   * Creates a snapshot summary builder with the files deleted from the set of 
filtered manifests.
-   *
-   * @param manifests a set of filtered manifests
-   */
-  SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
-    SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
-
-    for (ManifestFile manifest : manifests) {
-      PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
-      Iterable<F> manifestDeletes = 
filteredManifestToDeletedFiles.get(manifest);
-      if (manifestDeletes != null) {
-        for (F file : manifestDeletes) {
-          summaryBuilder.deletedFile(manifestSpec, file);
-        }
-      }
-    }
-
-    summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount);
-
-    return summaryBuilder;
-  }
-
-  /**
-   * Throws a {@link ValidationException} if any deleted file was not present 
in a filtered
-   * manifest.
-   *
-   * @param manifests a set of filtered manifests
-   */
-  @SuppressWarnings("CollectionUndefinedEquality")
-  private void validateRequiredDeletes(ManifestFile... manifests) {
-    if (failMissingDeletePaths) {
-      Set<F> deletedFiles = deletedFiles(manifests);
-      ValidationException.check(
-          deletedFiles.containsAll(deleteFiles),
-          "Missing required files to delete: %s",
-          COMMA.join(
-              deleteFiles.stream()
-                  .filter(f -> !deletedFiles.contains(f))
-                  .map(ContentFile::location)
-                  .collect(Collectors.toList())));
-
-      CharSequenceSet deletedFilePaths =
-          deletedFiles.stream()
-              .map(ContentFile::location)
-              .collect(Collectors.toCollection(CharSequenceSet::empty));
-
-      ValidationException.check(
-          deletedFilePaths.containsAll(deletePaths),
-          "Missing required files to delete: %s",
-          COMMA.join(Iterables.filter(deletePaths, path -> 
!deletedFilePaths.contains(path))));
-    }
-  }
-
-  private Set<F> deletedFiles(ManifestFile[] manifests) {
-    Set<F> deletedFiles = newFileSet();
-
-    if (manifests != null) {
-      for (ManifestFile manifest : manifests) {
-        Iterable<F> manifestDeletes = 
filteredManifestToDeletedFiles.get(manifest);
-        if (manifestDeletes != null) {
-          for (F file : manifestDeletes) {
-            deletedFiles.add(file);
-          }
-        }
-      }
-    }
-
-    return deletedFiles;
-  }
-
-  /**
-   * Deletes filtered manifests that were created by this class, but are not 
in the committed
-   * manifest set.
-   *
-   * @param committed the set of manifest files that were committed
-   */
-  void cleanUncommitted(Set<ManifestFile> committed) {
-    // iterate over a copy of entries to avoid concurrent modification
-    List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
-        Lists.newArrayList(filteredManifests.entrySet());
-
-    for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
-      // remove any new filtered manifests that aren't in the committed list
-      ManifestFile manifest = entry.getKey();
-      ManifestFile filtered = entry.getValue();
-      if (!committed.contains(filtered)) {
-        // only delete if the filtered copy was created
-        if (!manifest.equals(filtered)) {
-          deleteFile(filtered.path());
-        }
-
-        // remove the entry from the cache
-        filteredManifests.remove(manifest);
-      }
-    }
-  }
-
-  private void invalidateFilteredCache() {
-    cleanUncommitted(SnapshotProducer.EMPTY_SET);
-  }
-
-  /**
-   * @return a ManifestReader that is a filtered version of the input manifest.
-   */
-  private ManifestFile filterManifest(Schema tableSchema, ManifestFile 
manifest) {
-    ManifestFile cached = filteredManifests.get(manifest);
-    if (cached != null) {
-      return cached;
-    }
-
-    boolean hasLiveFiles = manifest.hasAddedFiles() || 
manifest.hasExistingFiles();
-    if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
-      filteredManifests.put(manifest, manifest);
-      return manifest;
-    }
-
-    try (ManifestReader<F> reader = newManifestReader(manifest)) {
-      PartitionSpec spec = reader.spec();
-      PartitionAndMetricsEvaluator evaluator =
-          new PartitionAndMetricsEvaluator(tableSchema, spec, 
deleteExpression);
-
-      // this assumes that the manifest doesn't have files to remove and 
streams through the
-      // manifest without copying data. if a manifest does have a file to 
remove, this will break
-      // out of the loop and move on to filtering the manifest.
-      boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
-      if (!hasDeletedFiles) {
-        filteredManifests.put(manifest, manifest);
-        return manifest;
-      }
-
-      return filterManifestWithDeletedFiles(evaluator, manifest, reader);
-
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to close manifest: %s", 
manifest);
-    }
-  }
-
-  private boolean canContainDeletedFiles(ManifestFile manifest) {
-    boolean canContainExpressionDeletes;
-    if (deleteExpression != null && deleteExpression != 
Expressions.alwaysFalse()) {
-      ManifestEvaluator manifestEvaluator =
-          ManifestEvaluator.forRowFilter(
-              deleteExpression, specsById.get(manifest.partitionSpecId()), 
caseSensitive);
-      canContainExpressionDeletes = manifestEvaluator.eval(manifest);
-    } else {
-      canContainExpressionDeletes = false;
-    }
-
-    boolean canContainDroppedPartitions;
-    if (!dropPartitions.isEmpty()) {
-      canContainDroppedPartitions =
-          ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
-    } else {
-      canContainDroppedPartitions = false;
-    }
-
-    boolean canContainDroppedFiles;
-    if (!deletePaths.isEmpty()) {
-      canContainDroppedFiles = true;
-    } else if (!deleteFiles.isEmpty()) {
-      // because there were no path-only deletes, the set of deleted file 
partitions is valid
-      canContainDroppedFiles =
-          ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, 
specsById);
-    } else {
-      canContainDroppedFiles = false;
-    }
-
-    boolean canContainDropBySeq =
-        manifest.content() == ManifestContent.DELETES && 
manifest.minSequenceNumber() < minSequenceNumber;
-
-    return canContainExpressionDeletes || canContainDroppedPartitions || 
canContainDroppedFiles || canContainDropBySeq;
-  }
-
-  @SuppressWarnings({"CollectionUndefinedEquality", 
"checkstyle:CyclomaticComplexity"})
-  private boolean manifestHasDeletedFiles(
-      PartitionAndMetricsEvaluator evaluator, ManifestReader<F> reader) {
-    boolean isDelete = reader.isDeleteManifestReader();
-
-    for (ManifestEntry<F> entry : reader.liveEntries()) {
-      F file = entry.file();
-      boolean markedForDelete =
-          deletePaths.contains(file.location()) || deleteFiles.contains(file) 
||
-              dropPartitions.contains(file.specId(), file.partition()) ||
-              isDelete && entry.isLive() && entry.dataSequenceNumber() > 0 &&
-                  entry.dataSequenceNumber() < minSequenceNumber;
-
-      if (markedForDelete || evaluator.rowsMightMatch(file)) {
-        boolean allRowsMatch = markedForDelete || 
evaluator.rowsMustMatch(file);
-        ValidationException.check(
-            allRowsMatch || isDelete, // ignore delete files where some 
records may not match the expression
-            "Cannot delete file where some, but not all, rows match filter %s: 
%s",
-            this.deleteExpression,
-            file.location());
-
-        if (allRowsMatch) {
-          if (failAnyDelete) {
-            throw new 
DeleteException(reader.spec().partitionToPath(file.partition()));
-          }
-
-          // as soon as a deleted file is detected, stop scanning
-          return true;
-        }
-      }
-    }
-
-    return false;
-  }
-
-  @SuppressWarnings({"CollectionUndefinedEquality", 
"checkstyle:CyclomaticComplexity"})
-  private ManifestFile filterManifestWithDeletedFiles(
-      PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, 
ManifestReader<F> reader) {
-    boolean isDelete = reader.isDeleteManifestReader();
-    // when this point is reached, there is at least one file that will be 
deleted in the
-    // manifest. produce a copy of the manifest with all deleted files removed.
-    Set<F> deletedFiles = newFileSet();
-
-    try {
-      ManifestWriter<F> writer = newManifestWriter(reader.spec());
-      try {
-        reader
-            .liveEntries()
-            .forEach(
-                entry -> {
-                  F file = entry.file();
-                  boolean markedForDelete =
-                      deletePaths.contains(file.location()) || 
deleteFiles.contains(file) ||
-                          dropPartitions.contains(file.specId(), 
file.partition()) ||
-                          isDelete && entry.isLive() && 
entry.dataSequenceNumber() > 0 &&
-                              entry.dataSequenceNumber() < minSequenceNumber;
-                  if (markedForDelete || evaluator.rowsMightMatch(file)) {
-                    boolean allRowsMatch = markedForDelete || 
evaluator.rowsMustMatch(file);
-                    ValidationException.check(
-                        allRowsMatch || isDelete, // ignore delete files where 
some records may not match
-                        // the expression
-                        "Cannot delete file where some, but not all, rows 
match filter %s: %s",
-                        this.deleteExpression,
-                        file.location());
-
-                    if (allRowsMatch) {
-                      writer.delete(entry);
-
-                      if (deletedFiles.contains(file)) {
-                        LOG.warn(
-                            "Deleting a duplicate path from manifest {}: {}",
-                            manifest.path(),
-                            file.location());
-                        duplicateDeleteCount += 1;
-                      } else {
-                        // only add the file to deletes if it is a new delete
-                        // this keeps the snapshot summary accurate for 
non-duplicate data
-                        deletedFiles.add(file.copyWithoutStats());
-                      }
-                    } else {
-                      writer.existing(entry);
-                    }
-
-                  } else {
-                    writer.existing(entry);
-                  }
-                });
-      } finally {
-        writer.close();
-      }
-
-      // return the filtered manifest as a reader
-      ManifestFile filtered = writer.toManifestFile();
-
-      // update caches
-      filteredManifests.put(manifest, filtered);
-      filteredManifestToDeletedFiles.put(filtered, deletedFiles);
-
-      return filtered;
-
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to close manifest writer");
-    }
-  }
-
-  // an evaluator that checks whether rows in a file may/must match a given 
expression
-  // this class first partially evaluates the provided expression using the 
partition tuple
-  // and then checks the remaining part of the expression using metrics 
evaluators
-  private class PartitionAndMetricsEvaluator {
-    private final Schema tableSchema;
-    private final ResidualEvaluator residualEvaluator;
-    private final StructLikeMap<Pair<InclusiveMetricsEvaluator, 
StrictMetricsEvaluator>>
-        metricsEvaluators;
-
-    PartitionAndMetricsEvaluator(Schema tableSchema, PartitionSpec spec, 
Expression expr) {
-      this.tableSchema = tableSchema;
-      this.residualEvaluator = ResidualEvaluator.of(spec, expr, caseSensitive);
-      this.metricsEvaluators = StructLikeMap.create(spec.partitionType());
-    }
-
-    boolean rowsMightMatch(F file) {
-      Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> evaluators = 
metricsEvaluators(file);
-      InclusiveMetricsEvaluator inclusiveMetricsEvaluator = evaluators.first();
-      return inclusiveMetricsEvaluator.eval(file);
-    }
-
-    boolean rowsMustMatch(F file) {
-      Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> evaluators = 
metricsEvaluators(file);
-      StrictMetricsEvaluator strictMetricsEvaluator = evaluators.second();
-      return strictMetricsEvaluator.eval(file);
-    }
-
-    private Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> 
metricsEvaluators(F file) {
-      // ResidualEvaluator removes predicates in the expression using 
strict/inclusive projections
-      // if strict projection returns true -> the pred would return true -> 
replace the pred with
-      // true
-      // if inclusive projection returns false -> the pred would return false 
-> replace the pred
-      // with false
-      // otherwise, keep the original predicate and proceed to other 
predicates in the expression
-      // in other words, ResidualEvaluator returns a part of the expression 
that needs to be
-      // evaluated
-      // for rows in the given partition using metrics
-      PartitionData partition = (PartitionData) file.partition();
-      if (!metricsEvaluators.containsKey(partition)) {
-        Expression residual = residualEvaluator.residualFor(partition);
-        InclusiveMetricsEvaluator inclusive =
-            new InclusiveMetricsEvaluator(tableSchema, residual, 
caseSensitive);
-        StrictMetricsEvaluator strict =
-            new StrictMetricsEvaluator(tableSchema, residual, caseSensitive);
-
-        metricsEvaluators.put(
-            partition.copy(), // The partition may be a re-used container so a 
copy is required
-            Pair.of(inclusive, strict));
-      }
-      return metricsEvaluators.get(partition);
-    }
-  }
-}
diff --git 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
deleted file mode 100644
index a1f31e28443..00000000000
--- 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.io.CloseableIterable;
-import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ParallelIterable;
-import org.apache.iceberg.util.PartitionUtil;
-import org.apache.iceberg.util.StructLikeMap;
-import org.apache.iceberg.util.StructProjection;
-
-/** A {@link Table} implementation that exposes a table's partitions as rows. 
*/
-public class PartitionsTable extends BaseMetadataTable {
-
-  private final Schema schema;
-
-  private final boolean unpartitionedTable;
-
-  PartitionsTable(Table table) {
-    this(table, table.name() + ".partitions");
-  }
-
-  PartitionsTable(Table table, String name) {
-    super(table, name);
-
-    this.schema =
-        new Schema(
-            Types.NestedField.required(1, "partition", 
Partitioning.partitionType(table)),
-            Types.NestedField.required(4, "spec_id", Types.IntegerType.get()),
-            Types.NestedField.required(
-                2, "record_count", Types.LongType.get(), "Count of records in 
data files"),
-            Types.NestedField.required(
-                3, "file_count", Types.IntegerType.get(), "Count of data 
files"),
-            Types.NestedField.required(
-                11,
-                "total_data_file_size_in_bytes",
-                Types.LongType.get(),
-                "Total size in bytes of data files"),
-            Types.NestedField.required(
-                5,
-                "position_delete_record_count",
-                Types.LongType.get(),
-                "Count of records in position delete files"),
-            Types.NestedField.required(
-                6,
-                "position_delete_file_count",
-                Types.IntegerType.get(),
-                "Count of position delete files"),
-            Types.NestedField.required(
-                7,
-                "equality_delete_record_count",
-                Types.LongType.get(),
-                "Count of records in equality delete files"),
-            Types.NestedField.required(
-                8,
-                "equality_delete_file_count",
-                Types.IntegerType.get(),
-                "Count of equality delete files"),
-            Types.NestedField.optional(
-                9,
-                "last_updated_at",
-                Types.TimestampType.withZone(),
-                "Commit time of snapshot that last updated this partition"),
-            Types.NestedField.optional(
-                10,
-                "last_updated_snapshot_id",
-                Types.LongType.get(),
-                "Id of snapshot that last updated this partition"));
-    this.unpartitionedTable = 
Partitioning.partitionType(table).fields().isEmpty();
-  }
-
-  @Override
-  public TableScan newScan() {
-    return new PartitionsScan(table());
-  }
-
-  @Override
-  public Schema schema() {
-    if (unpartitionedTable) {
-      return schema.select(
-          "record_count",
-          "file_count",
-          "total_data_file_size_in_bytes",
-          "position_delete_record_count",
-          "position_delete_file_count",
-          "equality_delete_record_count",
-          "equality_delete_file_count",
-          "last_updated_at",
-          "last_updated_snapshot_id");
-    }
-    return schema;
-  }
-
-  @Override
-  MetadataTableType metadataTableType() {
-    return MetadataTableType.PARTITIONS;
-  }
-
-  private DataTask task(StaticTableScan scan) {
-    Iterable<Partition> partitions = partitions(table(), scan);
-    if (unpartitionedTable) {
-      // the table is unpartitioned, partitions contains only the root 
partition
-      return StaticDataTask.of(
-          
io().newInputFile(table().operations().current().metadataFileLocation()),
-          schema(),
-          scan.schema(),
-          partitions,
-          root ->
-              StaticDataTask.Row.of(
-                  root.dataRecordCount,
-                  root.dataFileCount,
-                  root.dataFileSizeInBytes,
-                  root.posDeleteRecordCount,
-                  root.posDeleteFileCount,
-                  root.eqDeleteRecordCount,
-                  root.eqDeleteFileCount,
-                  root.lastUpdatedAt,
-                  root.lastUpdatedSnapshotId));
-    } else {
-      return StaticDataTask.of(
-          
io().newInputFile(table().operations().current().metadataFileLocation()),
-          schema(),
-          scan.schema(),
-          partitions,
-          PartitionsTable::convertPartition);
-    }
-  }
-
-  private static StaticDataTask.Row convertPartition(Partition partition) {
-    return StaticDataTask.Row.of(
-        partition.partitionData,
-        partition.specId,
-        partition.dataRecordCount,
-        partition.dataFileCount,
-        partition.dataFileSizeInBytes,
-        partition.posDeleteRecordCount,
-        partition.posDeleteFileCount,
-        partition.eqDeleteRecordCount,
-        partition.eqDeleteFileCount,
-        partition.lastUpdatedAt,
-        partition.lastUpdatedSnapshotId);
-  }
-
-  private static Iterable<Partition> partitions(Table table, StaticTableScan 
scan) {
-    Types.StructType partitionType = Partitioning.partitionType(table);
-
-    StructLikeMap<Partition> partitions =
-        StructLikeMap.create(partitionType, new 
PartitionComparator(partitionType));
-
-    try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries = 
planEntries(scan)) {
-      for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
-        Snapshot snapshot = table.snapshot(entry.snapshotId());
-        ContentFile<?> file = entry.file();
-        StructLike key =
-            PartitionUtil.coercePartition(
-                partitionType, table.specs().get(file.specId()), 
file.partition());
-        partitions
-            .computeIfAbsent(key, () -> new Partition(key, partitionType))
-            .update(file, snapshot);
-      }
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-
-    return partitions.values();
-  }
-
-  @VisibleForTesting
-  static CloseableIterable<ManifestEntry<?>> planEntries(StaticTableScan scan) 
{
-    Table table = scan.table();
-
-    CloseableIterable<ManifestFile> filteredManifests =
-        filteredManifests(scan, table, 
scan.snapshot().allManifests(table.io()));
-
-    Iterable<CloseableIterable<ManifestEntry<?>>> tasks =
-        CloseableIterable.transform(filteredManifests, manifest -> 
readEntries(manifest, scan));
-
-    return new ParallelIterable<>(tasks, scan.planExecutor());
-  }
-
-  private static CloseableIterable<ManifestEntry<?>> readEntries(
-      ManifestFile manifest, StaticTableScan scan) {
-    Table table = scan.table();
-    return CloseableIterable.transform(
-        ManifestFiles.open(manifest, table.io(), table.specs())
-            .caseSensitive(scan.isCaseSensitive())
-            .select(BaseScan.scanColumns(manifest.content())) // don't select 
stats columns
-            .liveEntries(),
-        t ->
-            (ManifestEntry<? extends ContentFile<?>>)
-                // defensive copy of manifest entry without stats columns
-                t.copyWithoutStats());
-  }
-
-  private static CloseableIterable<ManifestFile> filteredManifests(
-      StaticTableScan scan, Table table, List<ManifestFile> manifestFilesList) 
{
-    CloseableIterable<ManifestFile> manifestFiles =
-        CloseableIterable.withNoopClose(manifestFilesList);
-
-    LoadingCache<Integer, ManifestEvaluator> evalCache =
-        Caffeine.newBuilder()
-            .build(
-                specId -> {
-                  PartitionSpec spec = table.specs().get(specId);
-                  PartitionSpec transformedSpec = 
transformSpec(scan.tableSchema(), spec);
-                  return ManifestEvaluator.forRowFilter(
-                      scan.filter(), transformedSpec, scan.isCaseSensitive());
-                });
-
-    return CloseableIterable.filter(
-        manifestFiles, manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
-  }
-
-  private class PartitionsScan extends StaticTableScan {
-    PartitionsScan(Table table) {
-      super(
-          table,
-          PartitionsTable.this.schema(),
-          MetadataTableType.PARTITIONS,
-          PartitionsTable.this::task);
-    }
-  }
-
-  private static class PartitionComparator implements Comparator<StructLike> {
-    private Comparator<StructLike> comparator;
-
-    private PartitionComparator(Types.StructType struct) {
-      this.comparator = Comparators.forType(struct);
-    }
-
-    @Override
-    public int compare(StructLike o1, StructLike o2) {
-      if (o1 instanceof StructProjection && o2 instanceof StructProjection) {
-        int cmp =
-            Integer.compare(
-                ((StructProjection) o1).projectedFields(),
-                ((StructProjection) o2).projectedFields());
-        if (cmp != 0) {
-          return cmp;
-        }
-      }
-      return comparator.compare(o1, o2);
-    }
-  }
-
-  static class Partition {
-    private final PartitionData partitionData;
-    private int specId;
-    private long dataRecordCount;
-    private int dataFileCount;
-    private long dataFileSizeInBytes;
-    private long posDeleteRecordCount;
-    private int posDeleteFileCount;
-    private long eqDeleteRecordCount;
-    private int eqDeleteFileCount;
-    private Long lastUpdatedAt;
-    private Long lastUpdatedSnapshotId;
-
-    Partition(StructLike key, Types.StructType keyType) {
-      this.partitionData = toPartitionData(key, keyType);
-      this.specId = 0;
-      this.dataRecordCount = 0L;
-      this.dataFileCount = 0;
-      this.dataFileSizeInBytes = 0L;
-      this.posDeleteRecordCount = 0L;
-      this.posDeleteFileCount = 0;
-      this.eqDeleteRecordCount = 0L;
-      this.eqDeleteFileCount = 0;
-    }
-
-    void update(ContentFile<?> file, Snapshot snapshot) {
-      if (snapshot != null) {
-        long snapshotCommitTime = snapshot.timestampMillis() * 1000;
-        if (this.lastUpdatedAt == null || snapshotCommitTime > 
this.lastUpdatedAt) {
-          this.specId = file.specId();
-
-          this.lastUpdatedAt = snapshotCommitTime;
-          this.lastUpdatedSnapshotId = snapshot.snapshotId();
-        }
-      }
-      switch (file.content()) {
-        case DATA:
-          this.dataRecordCount += file.recordCount();
-          this.dataFileCount += 1;
-          this.dataFileSizeInBytes += file.fileSizeInBytes();
-          break;
-        case POSITION_DELETES:
-          this.posDeleteRecordCount += file.recordCount();
-          this.posDeleteFileCount += 1;
-          break;
-        case EQUALITY_DELETES:
-          this.eqDeleteRecordCount += file.recordCount();
-          this.eqDeleteFileCount += 1;
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              "Unsupported file content type: " + file.content());
-      }
-    }
-
-    /** Needed because StructProjection is not serializable */
-    private static PartitionData toPartitionData(StructLike key, 
Types.StructType keyType) {
-      PartitionData keyTemplate = new PartitionData(keyType);
-      return keyTemplate.copyFor(key);
-    }
-  }
-}
diff --git 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
deleted file mode 100644
index 0efc9e44680..00000000000
--- 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.util;
-
-import java.util.AbstractMap;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-
-public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements 
Map<StructLike, T> {
-
-  public static <T> StructLikeMap<T> create(
-      Types.StructType type, Comparator<StructLike> comparator) {
-    return new StructLikeMap<>(type, comparator);
-  }
-
-  public static <T> StructLikeMap<T> create(Types.StructType type) {
-    return create(type, Comparators.forType(type));
-  }
-
-  private final Types.StructType type;
-  private final Map<StructLikeWrapper, T> wrapperMap;
-  private final ThreadLocal<StructLikeWrapper> wrappers;
-
-  private StructLikeMap(Types.StructType type, Comparator<StructLike> 
comparator) {
-    this.type = type;
-    this.wrapperMap = Maps.newHashMap();
-    this.wrappers = ThreadLocal.withInitial(() -> 
StructLikeWrapper.forType(type, comparator));
-  }
-
-  @Override
-  public int size() {
-    return wrapperMap.size();
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return wrapperMap.isEmpty();
-  }
-
-  @Override
-  public boolean containsKey(Object key) {
-    if (key instanceof StructLike || key == null) {
-      StructLikeWrapper wrapper = wrappers.get();
-      boolean result = wrapperMap.containsKey(wrapper.set((StructLike) key));
-      wrapper.set(null); // don't hold a reference to the key.
-      return result;
-    }
-    return false;
-  }
-
-  @Override
-  public boolean containsValue(Object value) {
-    return wrapperMap.containsValue(value);
-  }
-
-  @Override
-  public T get(Object key) {
-    if (key instanceof StructLike || key == null) {
-      StructLikeWrapper wrapper = wrappers.get();
-      T value = wrapperMap.get(wrapper.set((StructLike) key));
-      wrapper.set(null); // don't hold a reference to the key.
-      return value;
-    }
-    return null;
-  }
-
-  @Override
-  public T put(StructLike key, T value) {
-    return wrapperMap.put(wrappers.get().copyFor(key), value);
-  }
-
-  @Override
-  public T remove(Object key) {
-    if (key instanceof StructLike || key == null) {
-      StructLikeWrapper wrapper = wrappers.get();
-      T value = wrapperMap.remove(wrapper.set((StructLike) key));
-      wrapper.set(null); // don't hold a reference to the key.
-      return value;
-    }
-    return null;
-  }
-
-  @Override
-  public void clear() {
-    wrapperMap.clear();
-  }
-
-  @Override
-  public Set<StructLike> keySet() {
-    StructLikeSet keySet = StructLikeSet.create(type);
-    for (StructLikeWrapper wrapper : wrapperMap.keySet()) {
-      keySet.add(wrapper.get());
-    }
-    return keySet;
-  }
-
-  @Override
-  public Collection<T> values() {
-    return wrapperMap.values();
-  }
-
-  @Override
-  public Set<Entry<StructLike, T>> entrySet() {
-    Set<Entry<StructLike, T>> entrySet = Sets.newHashSet();
-    for (Entry<StructLikeWrapper, T> entry : wrapperMap.entrySet()) {
-      entrySet.add(new StructLikeEntry<>(entry));
-    }
-    return entrySet;
-  }
-
-  public T computeIfAbsent(StructLike struct, Supplier<T> valueSupplier) {
-    return wrapperMap.computeIfAbsent(wrappers.get().copyFor(struct), key -> 
valueSupplier.get());
-  }
-
-  private static class StructLikeEntry<R> implements Entry<StructLike, R> {
-
-    private final Entry<StructLikeWrapper, R> inner;
-
-    private StructLikeEntry(Entry<StructLikeWrapper, R> inner) {
-      this.inner = inner;
-    }
-
-    @Override
-    public StructLike getKey() {
-      return inner.getKey().get();
-    }
-
-    @Override
-    public R getValue() {
-      return inner.getValue();
-    }
-
-    @Override
-    public int hashCode() {
-      return inner.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      } else if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      StructLikeEntry<?> that = (StructLikeEntry<?>) o;
-      return inner.equals(that.inner);
-    }
-
-    @Override
-    public R setValue(R value) {
-      throw new UnsupportedOperationException("Does not support setValue.");
-    }
-  }
-
-  public <U> StructLikeMap<U> transformValues(Function<T, U> func) {
-    StructLikeMap<U> result = create(type);
-    wrapperMap.forEach((key, value) -> result.put(key.get(), 
func.apply(value)));
-    return result;
-  }
-}
diff --git 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
 
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
deleted file mode 100644
index 3dbb91a43ce..00000000000
--- 
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.util;
-
-import java.util.Comparator;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.JavaHash;
-import org.apache.iceberg.types.Types;
-
-/** Wrapper to adapt StructLike for use in maps and sets by implementing 
equals and hashCode. */
-public class StructLikeWrapper {
-
-  public static StructLikeWrapper forType(
-      Types.StructType type, Comparator<StructLike> comparator) {
-    return new StructLikeWrapper(comparator, JavaHash.forType(type));
-  }
-
-  public static StructLikeWrapper forType(Types.StructType type) {
-    return forType(type, Comparators.forType(type));
-  }
-
-  private final Comparator<StructLike> comparator;
-  private final JavaHash<StructLike> structHash;
-  private Integer hashCode;
-  private StructLike struct;
-
-  private StructLikeWrapper(Comparator<StructLike> comparator, 
JavaHash<StructLike> structHash) {
-    this.comparator = comparator;
-    this.structHash = structHash;
-    this.hashCode = null;
-  }
-
-  /**
-   * Creates a copy of this wrapper that wraps a struct.
-   *
-   * <p>This is equivalent to {@code new 
StructLikeWrapper(type).set(newStruct)} but is cheaper
-   * because no analysis of the type is necessary.
-   *
-   * @param newStruct a {@link StructLike} row
-   * @return a copy of this wrapper wrapping the give struct
-   */
-  public StructLikeWrapper copyFor(StructLike newStruct) {
-    return new StructLikeWrapper(comparator, structHash).set(newStruct);
-  }
-
-  public StructLikeWrapper set(StructLike newStruct) {
-    this.struct = newStruct;
-    this.hashCode = null;
-    return this;
-  }
-
-  public StructLike get() {
-    return struct;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    } else if (!(other instanceof StructLikeWrapper)) {
-      return false;
-    }
-
-    StructLikeWrapper that = (StructLikeWrapper) other;
-
-    if (this.struct == that.struct) {
-      return true;
-    }
-
-    if (this.struct == null ^ that.struct == null) {
-      return false;
-    }
-
-    return comparator.compare(this.struct, that.struct) == 0;
-  }
-
-  @Override
-  public int hashCode() {
-    if (hashCode == null) {
-      this.hashCode = structHash.hash(struct);
-    }
-
-    return hashCode;
-  }
-}
diff --git a/iceberg/pom.xml b/iceberg/pom.xml
index 10ce7e12c1c..1c5bfed17c0 100644
--- a/iceberg/pom.xml
+++ b/iceberg/pom.xml
@@ -26,7 +26,7 @@
     <hive.path.to.root>..</hive.path.to.root>
     <path.to.iceberg.root>.</path.to.iceberg.root>
     <!-- Upgrade roaringbit version in parent pom.xml whenever upgrading 
iceberg version -->
-    <iceberg.version>1.9.1</iceberg.version>
+    <iceberg.version>1.10.0</iceberg.version>
     <kryo-shaded.version>4.0.3</kryo-shaded.version>
     <iceberg.mockito-core.version>5.2.0</iceberg.mockito-core.version>
     <iceberg.avro.version>1.12.0</iceberg.avro.version>
diff --git a/pom.xml b/pom.xml
index 6247b18bf2c..3630ea868e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,8 +151,8 @@
     <httpcomponents.client.version>4.5.13</httpcomponents.client.version>
     <httpcomponents.core.version>4.4.13</httpcomponents.core.version>
     <immutables.version>2.9.2</immutables.version>
-    <httpcomponents5.core.version>5.3.1</httpcomponents5.core.version>
-    <httpcomponents5.client.version>5.3.1</httpcomponents5.client.version>
+    <httpcomponents5.core.version>5.3.4</httpcomponents5.core.version>
+    <httpcomponents5.client.version>5.5</httpcomponents5.client.version>
     <ivy.version>2.5.2</ivy.version>
     <jackson.version>2.16.1</jackson.version>
     <jamon.plugin.version>2.3.4</jamon.plugin.version>
@@ -195,7 +195,7 @@
     <!-- used by druid storage handler -->
     <pac4j-saml.version>4.5.8</pac4j-saml.version>
     <paranamer.version>2.8</paranamer.version>
-    <parquet.version>1.15.2</parquet.version>
+    <parquet.version>1.16.0</parquet.version>
     <pig.version>0.16.0</pig.version>
     <plexus.version>1.5.6</plexus.version>
     <protobuf.version>3.25.5</protobuf.version>
@@ -597,7 +597,7 @@
       <dependency>
         <groupId>org.apache.httpcomponents.client5</groupId>
         <artifactId>httpclient5</artifactId>
-        <version>${httpcomponents5.core.version}</version>
+        <version>${httpcomponents5.client.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.httpcomponents.core5</groupId>
diff --git a/standalone-metastore/metastore-rest-catalog/pom.xml 
b/standalone-metastore/metastore-rest-catalog/pom.xml
index c1468e4f0c1..896723d2276 100644
--- a/standalone-metastore/metastore-rest-catalog/pom.xml
+++ b/standalone-metastore/metastore-rest-catalog/pom.xml
@@ -23,7 +23,7 @@
     <standalone.metastore.path.to.root>..</standalone.metastore.path.to.root>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <log4j2.debug>false</log4j2.debug>
-    <iceberg.version>1.9.1</iceberg.version>
+    <iceberg.version>1.10.0</iceberg.version>
   </properties>
   <dependencies>
     <dependency>

Reply via email to