This is an automated email from the ASF dual-hosted git repository.

difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fd3fb9f1e5 HIVE-28723: Iceberg: Support metadata.json files clean-up 
(#6218)
1fd3fb9f1e5 is described below

commit 1fd3fb9f1e5fa894c3888699caaec67d4c42d661
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Tue Dec 16 10:38:57 2025 -0500

    HIVE-28723: Iceberg: Support metadata.json files clean-up (#6218)
---
 ...logUtils.java => IcebergCatalogProperties.java} |  92 ++--------
 .../iceberg/hive/IcebergTableProperties.java       | 125 +++++++++++++
 .../org/apache/iceberg/hive/MetastoreUtil.java     |   6 +-
 .../iceberg/hive/client/HiveRESTCatalogClient.java |  17 +-
 .../main/java/org/apache/iceberg/mr/Catalogs.java  |   8 +-
 .../iceberg/mr/hive/BaseHiveIcebergMetaHook.java   |  45 ++---
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |  56 +++---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |   4 +-
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |   4 +-
 .../java/org/apache/iceberg/mr/TestCatalogs.java   |  21 ++-
 .../apache/iceberg/mr/TestIcebergInputFormats.java |   6 +-
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  |   1 +
 .../hive/TestHiveIcebergWriteMetadataCleanup.java  | 201 +++++++++++++++++++++
 .../org/apache/iceberg/mr/hive/TestTables.java     |  14 +-
 .../alter_multi_part_table_to_iceberg.q.out        |   6 +
 .../positive/alter_part_table_to_iceberg.q.out     |   5 +
 .../results/positive/alter_table_to_iceberg.q.out  |   3 +
 .../src/test/results/positive/col_stats.q.out      |   2 +
 .../results/positive/create_iceberg_table.q.out    |   1 +
 ...create_iceberg_table_stored_as_fileformat.q.out |   5 +
 .../create_iceberg_table_stored_by_iceberg.q.out   |   1 +
 ...le_stored_by_iceberg_with_serdeproperties.q.out |   1 +
 .../positive/ctas_iceberg_partitioned_orc.q.out    |   1 +
 .../src/test/results/positive/ctlt_iceberg.q.out   |   4 +
 .../test/results/positive/delete_all_iceberg.q.out |   3 +
 .../results/positive/describe_iceberg_table.q.out  |   4 +
 .../positive/iceberg_insert_into_partition.q.out   |   6 +
 .../iceberg_insert_into_partition_transforms.q.out |   6 +
 ...berg_insert_into_partition_with_evolution.q.out |   1 +
 .../iceberg_insert_overwrite_partition.q.out       |   6 +
 ...erg_insert_overwrite_partition_transforms.q.out |   4 +
 .../test/results/positive/iceberg_v2_deletes.q.out |   3 +
 .../positive/iceberg_v3_deletion_vectors.q.out     |   1 +
 .../iceberg_alter_locally_zordered_table.q.out     |   2 +
 .../iceberg_create_locally_ordered_table.q.out     |   2 +
 .../iceberg_create_locally_zordered_table.q.out    |   4 +
 ...berg_major_compaction_partition_evolution.q.out |   2 +
 ...erg_major_compaction_partition_evolution2.q.out |   2 +
 ...or_compaction_partition_evolution_ordered.q.out |   2 +
 ...n_partition_evolution_w_dyn_spec_w_filter.q.out |   2 +
 ...on_partition_evolution_w_id_spec_w_filter.q.out |   2 +
 .../iceberg_major_compaction_partitioned.q.out     |   4 +
 .../iceberg_major_compaction_query_metadata.q.out  |   1 +
 ...iceberg_major_compaction_schema_evolution.q.out |   2 +
 ...iceberg_major_compaction_single_partition.q.out |   3 +
 ...ompaction_single_partition_with_evolution.q.out |   5 +
 ...mpaction_single_partition_with_evolution2.q.out |   4 +
 .../iceberg_major_compaction_unpartitioned.q.out   |   2 +
 ...rg_major_compaction_unpartitioned_ordered.q.out |   2 +
 ...g_major_compaction_unpartitioned_w_filter.q.out |   2 +
 .../llap/iceberg_minor_compaction_bucket.q.out     |   2 +
 ...berg_minor_compaction_partition_evolution.q.out |   3 +
 .../iceberg_minor_compaction_unpartitioned.q.out   |   3 +
 .../llap/iceberg_rest_catalog_gravitino.q.out      |   2 +
 .../positive/llap/iceberg_rest_catalog_hms.q.out   |   2 +
 .../positive/mv_iceberg_partitioned_orc.q.out      |   2 +
 .../positive/mv_iceberg_partitioned_orc2.q.out     |   2 +
 .../src/test/results/positive/row_count.q.out      |   2 +
 .../positive/show_create_iceberg_table.q.out       |  10 +-
 .../positive/show_iceberg_materialized_views.q.out |   4 +
 .../positive/truncate_force_iceberg_table.q.out    |   2 +
 .../results/positive/truncate_iceberg_table.q.out  |   5 +
 .../truncate_partitioned_iceberg_table.q.out       |   2 +
 .../positive/use_basic_stats_from_iceberg.q.out    |   2 +
 .../hive/TestHiveRESTCatalogClientITBase.java      |   7 +-
 ...bergRESTCatalogGravitinoLlapLocalCliDriver.java |   4 +-
 ...estIcebergRESTCatalogHMSLlapLocalCliDriver.java |   4 +-
 67 files changed, 595 insertions(+), 169 deletions(-)

diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergCatalogProperties.java
similarity index 61%
rename from 
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java
rename to 
iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergCatalogProperties.java
index f2fbd540179..424f8e10c35 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/CatalogUtils.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergCatalogProperties.java
@@ -22,21 +22,14 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
-public class CatalogUtils {
-  public static final String NAME = "name";
-  public static final String LOCATION = "location";
+public class IcebergCatalogProperties {
   public static final String CATALOG_NAME = "iceberg.catalog";
   public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
   public static final String CATALOG_WAREHOUSE_TEMPLATE = 
"iceberg.catalog.%s.warehouse";
@@ -45,63 +38,14 @@ public class CatalogUtils {
   public static final String ICEBERG_HADOOP_TABLE_NAME = 
"location_based_table";
   public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg";
   public static final String NO_CATALOG_TYPE = "no catalog";
-  public static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet.of(
-      // We don't want to push down the metadata location props to Iceberg 
from HMS,
-      // since the snapshot pointer in HMS would always be one step ahead
-      BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
-      BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
 
-  private CatalogUtils() {
+  private IcebergCatalogProperties() {
 
   }
 
-  /**
-   * Calculates the properties we would like to send to the catalog.
-   * <ul>
-   * <li>The base of the properties is the properties stored at the Hive 
Metastore for the given table
-   * <li>We add the {@link CatalogUtils#LOCATION} as the table location
-   * <li>We add the {@link CatalogUtils#NAME} as
-   * TableIdentifier defined by the database name and table name
-   * <li>We add the serdeProperties of the HMS table
-   * <li>We remove some parameters that we don't want to push down to the 
Iceberg table props
-   * </ul>
-   * @param hmsTable Table for which we are calculating the properties
-   * @return The properties we can provide for Iceberg functions
-   */
-  public static Properties 
getCatalogProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
-    Properties properties = new Properties();
-    properties.putAll(toIcebergProperties(hmsTable.getParameters()));
-
-    if (properties.get(LOCATION) == null && hmsTable.getSd() != null &&
-        hmsTable.getSd().getLocation() != null) {
-      properties.put(LOCATION, hmsTable.getSd().getLocation());
-    }
-
-    if (properties.get(NAME) == null) {
-      properties.put(NAME, TableIdentifier.of(hmsTable.getDbName(),
-          hmsTable.getTableName()).toString());
-    }
-
-    SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
-    if (serdeInfo != null) {
-      properties.putAll(toIcebergProperties(serdeInfo.getParameters()));
-    }
-
-    // Remove HMS table parameters we don't want to propagate to Iceberg
-    PROPERTIES_TO_REMOVE.forEach(properties::remove);
-
-    return properties;
-  }
-
-  private static Properties toIcebergProperties(Map<String, String> 
parameters) {
-    Properties properties = new Properties();
-    parameters.entrySet().stream()
-        .filter(e -> e.getKey() != null && e.getValue() != null)
-        .forEach(e -> {
-          String icebergKey = 
HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
-          properties.put(icebergKey, e.getValue());
-        });
-    return properties;
+  public static Map<String, String> getCatalogProperties(Configuration conf) {
+    String catalogName = getCatalogName(conf);
+    return getCatalogProperties(conf, catalogName);
   }
 
   /**
@@ -112,15 +56,18 @@ private static Properties toIcebergProperties(Map<String, 
String> parameters) {
    */
   public static Map<String, String> getCatalogProperties(Configuration conf, 
String catalogName) {
     Map<String, String> catalogProperties = Maps.newHashMap();
-    String keyPrefix = CATALOG_CONFIG_PREFIX + catalogName;
+    String namedCatalogPrefix = CATALOG_CONFIG_PREFIX + catalogName + ".";
+    String namedCatalogTablePrefix = CATALOG_CONFIG_PREFIX + catalogName + 
".table-default.";
+
     conf.forEach(config -> {
-      if 
(config.getKey().startsWith(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX)) {
+      if 
(config.getKey().startsWith(IcebergCatalogProperties.CATALOG_DEFAULT_CONFIG_PREFIX))
 {
         catalogProperties.putIfAbsent(
-            
config.getKey().substring(CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX.length()),
+            
config.getKey().substring(IcebergCatalogProperties.CATALOG_DEFAULT_CONFIG_PREFIX.length()),
             config.getValue());
-      } else if (config.getKey().startsWith(keyPrefix)) {
+      } else if (config.getKey().startsWith(namedCatalogPrefix) &&
+          !config.getKey().startsWith(namedCatalogTablePrefix)) {
         catalogProperties.put(
-            config.getKey().substring(keyPrefix.length() + 1),
+            config.getKey().substring(namedCatalogPrefix.length()),
             config.getValue());
       }
     });
@@ -133,7 +80,7 @@ public static String getCatalogName(Configuration conf) {
   }
 
   public static String getCatalogType(Configuration conf) {
-    return getCatalogType(conf, CatalogUtils.getCatalogName(conf));
+    return getCatalogType(conf, IcebergCatalogProperties.getCatalogName(conf));
   }
 
   public static boolean isHadoopTable(Configuration conf, Properties 
catalogProperties) {
@@ -177,7 +124,7 @@ public static String getCatalogType(Configuration conf, 
String catalogName) {
       }
     } else {
       String catalogType = conf.get(CatalogUtil.ICEBERG_CATALOG_TYPE);
-      if (catalogType != null && catalogType.equals(LOCATION)) {
+      if (catalogType != null && 
catalogType.equals(IcebergTableProperties.LOCATION)) {
         return NO_CATALOG_TYPE;
       } else {
         return catalogType;
@@ -188,7 +135,7 @@ public static String getCatalogType(Configuration conf, 
String catalogName) {
   public static String getCatalogImpl(Configuration conf, String catalogName) {
     return Optional.ofNullable(catalogName)
         .filter(StringUtils::isNotEmpty)
-        .map(name -> String.format(CatalogUtils.CATALOG_IMPL_TEMPLATE, name))
+        .map(name -> 
String.format(IcebergCatalogProperties.CATALOG_IMPL_TEMPLATE, name))
         .map(conf::get)
         .orElse(null);
   }
@@ -196,14 +143,15 @@ public static String getCatalogImpl(Configuration conf, 
String catalogName) {
   public static boolean assertCatalogType(Configuration conf, Properties 
props, String expectedType,
       String expectedImpl) {
     String catalogName = props.getProperty(CATALOG_NAME);
-    String catalogType = Optional.ofNullable(CatalogUtils.getCatalogType(conf, 
catalogName))
-        .orElseGet(() -> CatalogUtils.getCatalogType(conf, 
ICEBERG_DEFAULT_CATALOG_NAME));
+    String catalogType = 
Optional.ofNullable(IcebergCatalogProperties.getCatalogType(conf, catalogName))
+        .orElseGet(() -> IcebergCatalogProperties.getCatalogType(conf, 
ICEBERG_DEFAULT_CATALOG_NAME));
 
     if (catalogType != null) {
       return expectedType.equalsIgnoreCase(catalogType);
     }
 
-    String actualImpl = CatalogUtils.getCatalogProperties(conf, 
catalogName).get(CatalogProperties.CATALOG_IMPL);
+    String actualImpl = IcebergCatalogProperties.getCatalogProperties(conf, 
catalogName)
+        .get(CatalogProperties.CATALOG_IMPL);
 
     // Return true immediately if the strings are equal (this also handles 
both being null).
     if (StringUtils.equals(expectedImpl, actualImpl)) {
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergTableProperties.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergTableProperties.java
new file mode 100644
index 00000000000..45dfff46eb6
--- /dev/null
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergTableProperties.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class IcebergTableProperties {
+  public static final String NAME = "name";
+  public static final String LOCATION = "location";
+  public static final String TABLE_DEFAULT_CONFIG_PREFIX = 
"iceberg.table-default.";
+  public static final Set<String> PROPERTIES_TO_REMOVE = ImmutableSet.of(
+      // We don't want to push down the metadata location props to Iceberg 
from HMS,
+      // since the snapshot pointer in HMS would always be one step ahead
+      BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+      BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP);
+
+  private IcebergTableProperties() {
+
+  }
+
+  /**
+   * Calculates the Iceberg table properties.
+   * <ul>
+   * <li>The base of the properties is the properties stored at the Hive 
Metastore for the given table
+   * <li>We add the {@link IcebergTableProperties#LOCATION} as the table 
location
+   * <li>We add the {@link IcebergTableProperties#NAME} as
+   * TableIdentifier defined by the database name and table name
+   * <li>We add the serdeProperties of the HMS table
+   * <li>We remove some parameters that we don't want to push down to the 
Iceberg table props
+   * </ul>
+   * @param hmsTable Table for which we are calculating the properties
+   * @return The properties we can provide for Iceberg functions
+   */
+  public static Properties 
getTableProperties(org.apache.hadoop.hive.metastore.api.Table hmsTable, 
Configuration conf) {
+    Properties properties = new Properties();
+    overrideIcebergDefaults(properties);
+
+    getTableProperties(conf, IcebergCatalogProperties.getCatalogName(conf))
+        .forEach(properties::setProperty);
+
+    properties.putAll(toIcebergProperties(hmsTable.getParameters()));
+
+    if (hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null) {
+      properties.putIfAbsent(LOCATION, hmsTable.getSd().getLocation());
+    }
+
+    properties.putIfAbsent(NAME, TableIdentifier.of(hmsTable.getDbName(), 
hmsTable.getTableName()).toString());
+
+    SerDeInfo serdeInfo = hmsTable.getSd().getSerdeInfo();
+    if (serdeInfo != null) {
+      properties.putAll(toIcebergProperties(serdeInfo.getParameters()));
+    }
+
+    // Remove HMS table parameters we don't want to propagate to Iceberg
+    PROPERTIES_TO_REMOVE.forEach(properties::remove);
+
+    return properties;
+  }
+
+  private static void overrideIcebergDefaults(Properties properties) {
+    properties.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
"true");
+  }
+
+  private static Properties toIcebergProperties(Map<String, String> 
parameters) {
+    Properties properties = new Properties();
+    parameters.entrySet().stream()
+        .filter(e -> e.getKey() != null && e.getValue() != null)
+        .forEach(e -> {
+          String icebergKey = 
HMSTablePropertyHelper.translateToIcebergProp(e.getKey());
+          properties.put(icebergKey, e.getValue());
+        });
+    return properties;
+  }
+
+  /**
+   * Collect all the table specific configuration from the global hive 
configuration.
+   * @param conf a Hadoop configuration
+   * @param catalogName name of the catalog
+   * @return complete map of catalog properties
+   */
+  public static Map<String, String> getTableProperties(Configuration conf, 
String catalogName) {
+    Map<String, String> tableProperties = Maps.newHashMap();
+    String namedCatalogTablePrefix = 
IcebergCatalogProperties.CATALOG_CONFIG_PREFIX + catalogName + 
".table-default.";
+
+    conf.forEach(config -> {
+      if 
(config.getKey().startsWith(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX))
 {
+        tableProperties.putIfAbsent(
+            
config.getKey().substring(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX.length()),
+            config.getValue());
+      } else if (config.getKey().startsWith(namedCatalogTablePrefix)) {
+        tableProperties.put(
+            config.getKey().substring(namedCatalogTablePrefix.length()),
+            config.getValue());
+      }
+    });
+
+    return tableProperties;
+  }
+}
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
index 9c3f10ffea9..95e1e5b3662 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
@@ -140,9 +140,9 @@ public static Table toHiveTable(org.apache.iceberg.Table 
table, Configuration co
         HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
     
HMSTablePropertyHelper.updateHmsTableForIcebergTable(metadata.metadataFileLocation(),
 result, metadata,
         null, true, maxHiveTablePropertySize, null);
-    String catalogType = CatalogUtils.getCatalogType(conf);
-    if (!StringUtils.isEmpty(catalogType) && 
!CatalogUtils.NO_CATALOG_TYPE.equals(catalogType)) {
-      result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtils.getCatalogType(conf));
+    String catalogType = IcebergCatalogProperties.getCatalogType(conf);
+    if (!StringUtils.isEmpty(catalogType) && 
!IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) {
+      result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
IcebergCatalogProperties.getCatalogType(conf));
     }
     result.setSd(getHiveStorageDescriptor(table));
     return result;
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
index d1e3d289d5f..4390d5a0bca 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
@@ -44,9 +44,10 @@
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.iceberg.hive.CatalogUtils;
 import org.apache.iceberg.hive.HMSTablePropertyHelper;
 import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
+import org.apache.iceberg.hive.IcebergTableProperties;
 import org.apache.iceberg.hive.MetastoreUtil;
 import org.apache.iceberg.hive.RuntimeMetaException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -80,7 +81,7 @@ public HiveRESTCatalogClient(Configuration conf) {
   public void reconnect()  {
     close();
     String catName = MetaStoreUtils.getDefaultCatalog(conf);
-    Map<String, String> properties = CatalogUtils.getCatalogProperties(conf, 
CatalogUtils.getCatalogName(conf));
+    Map<String, String> properties = 
IcebergCatalogProperties.getCatalogProperties(conf);
     restCatalog = (RESTCatalog) CatalogUtil.buildIcebergCatalog(catName, 
properties, null);
   }
 
@@ -161,7 +162,7 @@ public Database getDatabase(String catName, String dbName) 
throws NoSuchObjectEx
           Database database = new Database();
           database.setName(String.join(NAMESPACE_SEPARATOR, 
namespace.levels()));
           Map<String, String> namespaceMetadata = 
restCatalog.loadNamespaceMetadata(Namespace.of(dbName));
-          
database.setLocationUri(namespaceMetadata.get(CatalogUtils.LOCATION));
+          
database.setLocationUri(namespaceMetadata.get(IcebergTableProperties.LOCATION));
           database.setCatalogName(restCatalog.name());
           database.setOwnerName(namespaceMetadata.get(DB_OWNER));
           try {
@@ -194,20 +195,20 @@ public void createTable(CreateTableRequest request) 
throws TException {
     if (table.isSetPartitionKeys() && !table.getPartitionKeys().isEmpty()) {
       cols.addAll(table.getPartitionKeys());
     }
-    Properties catalogProperties = CatalogUtils.getCatalogProperties(table);
+    Properties tableProperties = 
IcebergTableProperties.getTableProperties(table, conf);
     Schema schema = HiveSchemaUtil.convert(cols, Collections.emptyMap(), true);
     Map<String, String> envCtxProps = 
Optional.ofNullable(request.getEnvContext())
         .map(EnvironmentContext::getProperties)
         .orElse(Collections.emptyMap());
     org.apache.iceberg.PartitionSpec partitionSpec =
         HMSTablePropertyHelper.getPartitionSpec(envCtxProps, schema);
-    SortOrder sortOrder = 
HMSTablePropertyHelper.getSortOrder(catalogProperties, schema);
+    SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(tableProperties, 
schema);
 
     restCatalog.buildTable(TableIdentifier.of(table.getDbName(), 
table.getTableName()), schema)
         .withPartitionSpec(partitionSpec)
-        .withLocation(catalogProperties.getProperty(CatalogUtils.LOCATION))
+        
.withLocation(tableProperties.getProperty(IcebergTableProperties.LOCATION))
         .withSortOrder(sortOrder)
-        .withProperties(Maps.fromProperties(catalogProperties))
+        .withProperties(Maps.fromProperties(tableProperties))
         .create();
   }
 
@@ -215,7 +216,7 @@ public void createTable(CreateTableRequest request) throws 
TException {
   public void createDatabase(Database db) {
     validateCurrentCatalog(db.getCatalogName());
     Map<String, String> props = ImmutableMap.of(
-        CatalogUtils.LOCATION, db.getLocationUri(),
+        IcebergTableProperties.LOCATION, db.getLocationUri(),
         DB_OWNER, db.getOwnerName(),
         DB_OWNER_TYPE, db.getOwnerType().toString()
     );
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
index e431e0323a4..9a416f8277a 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -34,8 +34,8 @@
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.CatalogUtils;
 import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -187,7 +187,7 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
    * @return true if the Catalog is HiveCatalog
    */
   public static boolean hiveCatalog(Configuration conf, Properties props) {
-    return CatalogUtils.assertCatalogType(conf, props, 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, null);
+    return IcebergCatalogProperties.assertCatalogType(conf, props, 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, null);
   }
 
   /**
@@ -229,13 +229,13 @@ public static void renameTable(Configuration conf, 
Properties props, TableIdenti
   }
 
   static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
-    String catalogType = CatalogUtils.getCatalogType(conf, catalogName);
+    String catalogType = IcebergCatalogProperties.getCatalogType(conf, 
catalogName);
     if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) {
       return Optional.empty();
     } else {
       String name = catalogName == null ? ICEBERG_DEFAULT_CATALOG_NAME : 
catalogName;
       return Optional.of(CatalogUtil.buildIcebergCatalog(name,
-          CatalogUtils.getCatalogProperties(conf, name), conf));
+          IcebergCatalogProperties.getCatalogProperties(conf, name), conf));
     }
   }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
index 58908b00661..180e233c947 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
@@ -60,9 +60,10 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.hive.CatalogUtils;
 import org.apache.iceberg.hive.HMSTablePropertyHelper;
 import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
+import org.apache.iceberg.hive.IcebergTableProperties;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -94,7 +95,7 @@ public class BaseHiveIcebergMetaHook implements HiveMetaHook {
 
   protected final Configuration conf;
   protected Table icebergTable = null;
-  protected Properties catalogProperties;
+  protected Properties tableProperties;
   protected boolean createHMSTableInHook = false;
 
   public enum FileFormat {
@@ -127,14 +128,14 @@ public void preCreateTable(CreateTableRequest request) {
     if (hmsTable.isTemporary()) {
       throw new UnsupportedOperationException("Creation of temporary iceberg 
tables is not supported.");
     }
-    this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+    this.tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
 
     // Set the table type even for non HiveCatalog based tables
     hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
         BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
 
-    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
-      if 
(Boolean.parseBoolean(this.catalogProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT)))
 {
+    if (!Catalogs.hiveCatalog(conf, tableProperties)) {
+      if 
(Boolean.parseBoolean(this.tableProperties.getProperty(hive_metastoreConstants.TABLE_IS_CTLT)))
 {
         throw new UnsupportedOperationException("CTLT target table must be a 
HiveCatalog table.");
       }
       // For non-HiveCatalog tables too, we should set the input and output 
format
@@ -144,15 +145,15 @@ public void preCreateTable(CreateTableRequest request) {
 
       // If not using HiveCatalog check for existing table
       try {
-        this.icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, 
true);
+        this.icebergTable = IcebergTableUtil.getTable(conf, tableProperties, 
true);
 
-        if (CatalogUtils.hadoopCatalog(conf, catalogProperties) && 
hmsTable.getSd() != null &&
+        if (IcebergCatalogProperties.hadoopCatalog(conf, tableProperties) && 
hmsTable.getSd() != null &&
                 hmsTable.getSd().getLocation() == null) {
           hmsTable.getSd().setLocation(icebergTable.location());
         }
-        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
+        
Preconditions.checkArgument(tableProperties.getProperty(InputFormatConfig.TABLE_SCHEMA)
 == null,
             "Iceberg table already created - can not use provided schema");
-        
Preconditions.checkArgument(catalogProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
+        
Preconditions.checkArgument(tableProperties.getProperty(InputFormatConfig.PARTITION_SPEC)
 == null,
             "Iceberg table already created - can not use provided partition 
specification");
 
         LOG.info("Iceberg table already exists {}", icebergTable);
@@ -171,7 +172,7 @@ public void preCreateTable(CreateTableRequest request) {
             
primaryKeys.stream().map(SQLPrimaryKey::getColumn_name).collect(Collectors.toSet()))
         .orElse(Collections.emptySet());
 
-    Schema schema = schema(catalogProperties, hmsTable, identifierFields, 
request.getDefaultConstraints());
+    Schema schema = schema(tableProperties, hmsTable, identifierFields, 
request.getDefaultConstraints());
     PartitionSpec spec = spec(conf, schema, hmsTable);
 
     // If there are partition keys specified remove them from the HMS table 
and add them to the column list
@@ -180,9 +181,9 @@ public void preCreateTable(CreateTableRequest request) {
       hmsTable.setPartitionKeysIsSet(false);
     }
 
-    catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
+    tableProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(schema));
     String specString = PartitionSpecParser.toJson(spec);
-    catalogProperties.put(InputFormatConfig.PARTITION_SPEC, specString);
+    tableProperties.put(InputFormatConfig.PARTITION_SPEC, specString);
     validateCatalogConfigsDefined();
 
     if (request.getEnvContext() == null) {
@@ -195,13 +196,13 @@ public void preCreateTable(CreateTableRequest request) {
       createHMSTableInHook = true;
     }
 
-    
assertFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+    
assertFileFormat(tableProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
 
     // Set whether the format is ORC, to be used during vectorization.
     setOrcOnlyFilesParam(hmsTable);
     // Remove hive primary key columns from table request, as iceberg doesn't 
support hive primary key.
     request.setPrimaryKeys(null);
-    setSortOrder(hmsTable, schema, catalogProperties);
+    setSortOrder(hmsTable, schema, tableProperties);
   }
 
   /**
@@ -214,11 +215,11 @@ public void preCreateTable(CreateTableRequest request) {
    *
    */
   private void validateCatalogConfigsDefined() {
-    String catalogName = 
catalogProperties.getProperty(InputFormatConfig.CATALOG_NAME);
+    String catalogName = 
tableProperties.getProperty(InputFormatConfig.CATALOG_NAME);
     if (!StringUtils.isEmpty(catalogName) && 
!Catalogs.ICEBERG_HADOOP_TABLE_NAME.equals(catalogName)) {
 
-      boolean configsExist = 
!StringUtils.isEmpty(CatalogUtils.getCatalogType(conf, catalogName)) ||
-          !StringUtils.isEmpty(CatalogUtils.getCatalogImpl(conf, catalogName));
+      boolean configsExist = 
!StringUtils.isEmpty(IcebergCatalogProperties.getCatalogType(conf, 
catalogName)) ||
+          !StringUtils.isEmpty(IcebergCatalogProperties.getCatalogImpl(conf, 
catalogName));
 
       Preconditions.checkArgument(configsExist, "Catalog type or impl must be 
set for catalog: %s", catalogName);
     }
@@ -343,12 +344,12 @@ private static void assertFileFormat(String format) {
   }
 
   protected void 
setCommonHmsTablePropertiesForIceberg(org.apache.hadoop.hive.metastore.api.Table
 hmsTable) {
-    if (CatalogUtils.isHadoopTable(conf, catalogProperties)) {
+    if (IcebergCatalogProperties.isHadoopTable(conf, tableProperties)) {
       String location = (hmsTable.getSd() != null) ? 
hmsTable.getSd().getLocation() : null;
-      if (location == null && CatalogUtils.hadoopCatalog(conf, 
catalogProperties)) {
+      if (location == null && IcebergCatalogProperties.hadoopCatalog(conf, 
tableProperties)) {
         location = IcebergTableUtil.defaultWarehouseLocation(
             TableIdentifier.of(hmsTable.getDbName(), hmsTable.getTableName()),
-            conf, catalogProperties);
+            conf, tableProperties);
         hmsTable.getSd().setLocation(location);
       }
       Preconditions.checkArgument(location != null, "Table location not set");
@@ -445,9 +446,9 @@ protected void setWriteModeDefaults(Table icebergTbl, 
Map<String, String> newPro
       List<String> writeModeList = ImmutableList.of(
           TableProperties.DELETE_MODE, TableProperties.UPDATE_MODE, 
TableProperties.MERGE_MODE);
       writeModeList.stream()
-          .filter(writeMode -> catalogProperties.get(writeMode) == null)
+          .filter(writeMode -> tableProperties.get(writeMode) == null)
           .forEach(writeMode -> {
-            catalogProperties.put(writeMode, MERGE_ON_READ.modeName());
+            tableProperties.put(writeMode, MERGE_ON_READ.modeName());
             newProps.put(writeMode, MERGE_ON_READ.modeName());
           });
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index fee98b185d4..e4bc7f32cc4 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -109,10 +109,10 @@
 import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.expressions.UnboundTerm;
 import org.apache.iceberg.hive.CachedClientPool;
-import org.apache.iceberg.hive.CatalogUtils;
 import org.apache.iceberg.hive.HiveLock;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.hive.IcebergTableProperties;
 import org.apache.iceberg.hive.MetastoreLock;
 import org.apache.iceberg.hive.NoLock;
 import org.apache.iceberg.io.CloseableIterable;
@@ -181,22 +181,22 @@ public void 
rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTa
   public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
     if (icebergTable == null) {
 
-      
setFileFormat(catalogProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
+      
setFileFormat(tableProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
 
       String metadataLocation = 
hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
       Table table;
       if (metadataLocation != null) {
-        table = Catalogs.registerTable(conf, catalogProperties, 
metadataLocation);
+        table = Catalogs.registerTable(conf, tableProperties, 
metadataLocation);
       } else {
-        table = Catalogs.createTable(conf, catalogProperties);
+        table = Catalogs.createTable(conf, tableProperties);
       }
 
-      if (!HiveTableUtil.isCtas(catalogProperties)) {
+      if (!HiveTableUtil.isCtas(tableProperties)) {
         return;
       }
 
       // set this in the query state so that we can rollback the table in the 
lifecycle hook in case of failures
-      String tableIdentifier = catalogProperties.getProperty(Catalogs.NAME);
+      String tableIdentifier = tableProperties.getProperty(Catalogs.NAME);
       SessionStateUtil.addResource(conf, InputFormatConfig.CTAS_TABLE_NAME, 
tableIdentifier);
       SessionStateUtil.addResource(conf, tableIdentifier, table);
 
@@ -211,15 +211,15 @@ public void 
preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
 
   @Override
   public void preDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, boolean deleteData) {
-    this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+    this.tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
     this.deleteIcebergTable = hmsTable.getParameters() != null &&
         
"TRUE".equalsIgnoreCase(hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
 
-    if (deleteIcebergTable && Catalogs.hiveCatalog(conf, catalogProperties) && 
deleteData) {
+    if (deleteIcebergTable && Catalogs.hiveCatalog(conf, tableProperties) && 
deleteData) {
       // Store the metadata and the io for deleting the actual table data
       try {
         String metadataLocation = 
hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
-        this.deleteIo = Catalogs.loadTable(conf, catalogProperties).io();
+        this.deleteIo = Catalogs.loadTable(conf, tableProperties).io();
         this.deleteMetadata = TableMetadataParser.read(deleteIo, 
metadataLocation);
       } catch (Exception e) {
         LOG.error("preDropTable: Error during loading Iceberg table or parsing 
its metadata for HMS table: {}.{}. " +
@@ -240,9 +240,9 @@ public void 
rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTabl
   public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, boolean deleteData) {
     if (deleteData && deleteIcebergTable) {
       try {
-        if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
+        if (!Catalogs.hiveCatalog(conf, tableProperties)) {
           LOG.info("Dropping with purge all the data for table {}.{}", 
hmsTable.getDbName(), hmsTable.getTableName());
-          Catalogs.dropTable(conf, catalogProperties);
+          Catalogs.dropTable(conf, tableProperties);
         } else {
           // do nothing if metadata folder has been deleted already (Hive 4 
behaviour for purge=TRUE)
           if (deleteMetadata != null && 
deleteIo.newInputFile(deleteMetadata.location()).exists()) {
@@ -261,10 +261,10 @@ public void 
commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
   @Override
   public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, EnvironmentContext context)
       throws MetaException {
-    catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+    tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
     setupAlterOperationType(hmsTable, context);
     if (AlterTableType.RENAME.equals(currentAlterTableOp)) {
-      catalogProperties.put(Catalogs.NAME, 
TableIdentifier.of(context.getProperties().get(OLD_DB_NAME),
+      tableProperties.put(Catalogs.NAME, 
TableIdentifier.of(context.getProperties().get(OLD_DB_NAME),
           context.getProperties().get(OLD_TABLE_NAME)).toString());
     }
     if (commitLock == null) {
@@ -290,8 +290,8 @@ private HiveLock 
lockObject(org.apache.hadoop.hive.metastore.api.Table hmsTable)
     } else {
       return new MetastoreLock(
           conf,
-          new CachedClientPool(conf, Maps.fromProperties(catalogProperties)),
-          catalogProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(),
+          new CachedClientPool(conf, Maps.fromProperties(tableProperties)),
+          tableProperties.getProperty(Catalogs.NAME), hmsTable.getDbName(),
           hmsTable.getTableName());
     }
   }
@@ -299,7 +299,7 @@ private HiveLock 
lockObject(org.apache.hadoop.hive.metastore.api.Table hmsTable)
   private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, EnvironmentContext context)
       throws MetaException {
     try {
-      icebergTable = IcebergTableUtil.getTable(conf, catalogProperties, true);
+      icebergTable = IcebergTableUtil.getTable(conf, tableProperties, true);
     } catch (NoSuchTableException nte) {
       context.getProperties().put(MIGRATE_HIVE_TO_ICEBERG, "true");
       // If the iceberg table does not exist, then this is an ALTER command 
aimed at migrating the table to iceberg
@@ -315,7 +315,7 @@ private void 
doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
       preAlterTableProperties.tableLocation = sd.getLocation();
       preAlterTableProperties.format = sd.getInputFormat();
       preAlterTableProperties.schema =
-          schema(catalogProperties, hmsTable, Collections.emptySet(), 
Collections.emptyList());
+          schema(tableProperties, hmsTable, Collections.emptySet(), 
Collections.emptyList());
       preAlterTableProperties.partitionKeys = hmsTable.getPartitionKeys();
 
       context.getProperties().put(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE, 
"true");
@@ -491,12 +491,12 @@ public void 
commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
     }
     commitLock.unlock();
     if (isTableMigration) {
-      catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
-      catalogProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(preAlterTableProperties.schema));
-      catalogProperties.put(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(preAlterTableProperties.spec));
+      tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
+      tableProperties.put(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(preAlterTableProperties.schema));
+      tableProperties.put(InputFormatConfig.PARTITION_SPEC, 
PartitionSpecParser.toJson(preAlterTableProperties.spec));
       setFileFormat(preAlterTableProperties.format);
       HiveTableUtil.importFiles(preAlterTableProperties.tableLocation, 
preAlterTableProperties.format,
-          preAlterTableProperties.partitionSpecProxy, 
preAlterTableProperties.partitionKeys, catalogProperties, conf);
+          preAlterTableProperties.partitionSpecProxy, 
preAlterTableProperties.partitionKeys, tableProperties, conf);
     } else if (currentAlterTableOp != null) {
       switch (currentAlterTableOp) {
         case DROP_COLUMN:
@@ -516,7 +516,7 @@ public void 
commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
           IcebergTableUtil.updateSpec(conf, icebergTable);
           break;
         case RENAME:
-          Catalogs.renameTable(conf, catalogProperties, 
TableIdentifier.of(hmsTable.getDbName(),
+          Catalogs.renameTable(conf, tableProperties, 
TableIdentifier.of(hmsTable.getDbName(),
               hmsTable.getTableName()));
           break;
       }
@@ -543,9 +543,9 @@ public void 
rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab
       LOG.debug("Initiating rollback for table {} at location {}",
           hmsTable.getTableName(), hmsTable.getSd().getLocation());
       context.getProperties().put(INITIALIZE_ROLLBACK_MIGRATION, "true");
-      this.catalogProperties = CatalogUtils.getCatalogProperties(hmsTable);
+      this.tableProperties = 
IcebergTableProperties.getTableProperties(hmsTable, conf);
       try {
-        this.icebergTable = Catalogs.loadTable(conf, catalogProperties);
+        this.icebergTable = Catalogs.loadTable(conf, tableProperties);
       } catch (NoSuchTableException nte) {
         // iceberg table was not yet created, no need to delete the metadata 
dir separately
         return;
@@ -568,8 +568,8 @@ public void 
rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab
   public void preTruncateTable(org.apache.hadoop.hive.metastore.api.Table 
table, EnvironmentContext context,
       List<String> partNames)
       throws MetaException {
-    this.catalogProperties = CatalogUtils.getCatalogProperties(table);
-    this.icebergTable = Catalogs.loadTable(conf, catalogProperties);
+    this.tableProperties = IcebergTableProperties.getTableProperties(table, 
conf);
+    this.icebergTable = Catalogs.loadTable(conf, tableProperties);
     Map<String, PartitionField> partitionFieldMap = 
icebergTable.spec().fields().stream()
         .collect(Collectors.toMap(PartitionField::name, Function.identity()));
     Expression finalExp = CollectionUtils.isEmpty(partNames) ? 
Expressions.alwaysTrue() : Expressions.alwaysFalse();
@@ -648,7 +648,7 @@ private void 
setupAlterOperationType(org.apache.hadoop.hive.metastore.api.Table
                 SUPPORTED_ALTER_OPS);
       }
 
-      if (currentAlterTableOp != AlterTableType.ADDPROPS && 
Catalogs.hiveCatalog(conf, catalogProperties)) {
+      if (currentAlterTableOp != AlterTableType.ADDPROPS && 
Catalogs.hiveCatalog(conf, tableProperties)) {
         context.getProperties().put(SKIP_METASTORE_ALTER, "true");
       }
     }
@@ -662,7 +662,7 @@ private void setFileFormat(String format) {
     String lowerCaseFormat = format.toLowerCase();
     for (FileFormat fileFormat : FileFormat.values()) {
       if (lowerCaseFormat.contains(fileFormat.getLabel())) {
-        catalogProperties.put(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.getLabel());
+        tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, 
fileFormat.getLabel());
       }
     }
   }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index d5c58e63f6e..7b1aabf9915 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -176,9 +176,9 @@
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.expressions.StrictMetricsEvaluator;
 import org.apache.iceberg.hadoop.ConfigProperties;
-import org.apache.iceberg.hive.CatalogUtils;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.HiveTableOperations;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.hive.MetastoreUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.mr.Catalogs;
@@ -267,7 +267,7 @@ public Class<? extends AbstractSerDe> getSerDeClass() {
   public HiveMetaHook getMetaHook() {
     // Make sure to always return a new instance here, as HiveIcebergMetaHook 
might hold state relevant for the
     // operation.
-    String catalogType = CatalogUtils.getCatalogType(conf);
+    String catalogType = IcebergCatalogProperties.getCatalogType(conf);
     if (StringUtils.isEmpty(catalogType) || 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equals(catalogType)) {
       return new HiveIcebergMetaHook(conf);
     } else {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index e0f70b81a53..41e73e9211d 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -90,7 +90,7 @@
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
-import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
@@ -664,7 +664,7 @@ public static String 
defaultWarehouseLocation(TableIdentifier tableIdentifier,
       Configuration conf, Properties catalogProperties) {
     StringBuilder sb = new StringBuilder();
     String warehouseLocation = conf.get(String.format(
-        CatalogUtils.CATALOG_WAREHOUSE_TEMPLATE, 
catalogProperties.getProperty(CATALOG_NAME))
+        IcebergCatalogProperties.CATALOG_WAREHOUSE_TEMPLATE, 
catalogProperties.getProperty(CATALOG_NAME))
     );
     sb.append(warehouseLocation).append('/');
     for (String level : tableIdentifier.namespace().levels()) {
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
index 060ffa8fba8..182c9192079 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
@@ -35,8 +35,8 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.CatalogUtils;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
 import org.junit.Assert;
@@ -210,7 +210,7 @@ public void testLoadCatalogDefault() {
   @Test
   public void testLoadCatalogHive() {
     String catalogName = "barCatalog";
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
         CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
     Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
     Assert.assertTrue(hiveCatalog.isPresent());
@@ -223,9 +223,9 @@ public void testLoadCatalogHive() {
   @Test
   public void testLoadCatalogHadoop() {
     String catalogName = "barCatalog";
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
         CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
         "/tmp/mylocation");
     Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
     Assert.assertTrue(hadoopCatalog.isPresent());
@@ -239,9 +239,9 @@ public void testLoadCatalogHadoop() {
   @Test
   public void testLoadCatalogCustom() {
     String catalogName = "barCatalog";
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogProperties.CATALOG_IMPL),
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogProperties.CATALOG_IMPL),
         CustomHadoopCatalog.class.getName());
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
         "/tmp/mylocation");
     Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, 
catalogName);
     Assert.assertTrue(customHadoopCatalog.isPresent());
@@ -259,7 +259,8 @@ public void testLoadCatalogLocation() {
   @Test
   public void testLoadCatalogUnknown() {
     String catalogName = "barCatalog";
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE), "fooType");
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+        "fooType");
 
     Assertions.assertThatThrownBy(() -> Catalogs.loadCatalog(conf, 
catalogName))
         .isInstanceOf(UnsupportedOperationException.class)
@@ -270,7 +271,7 @@ public void testLoadCatalogUnknown() {
   public void testDefaultCatalogProperties() {
     String catalogProperty = "io.manifest.cache-enabled";
     // Set global property
-    final String defaultCatalogProperty = 
CatalogUtils.CATALOG_DEFAULT_CONFIG_PREFIX + catalogProperty;
+    final String defaultCatalogProperty = 
IcebergCatalogProperties.CATALOG_DEFAULT_CONFIG_PREFIX + catalogProperty;
     conf.setBoolean(defaultCatalogProperty, true);
     HiveCatalog defaultCatalog = (HiveCatalog) Catalogs.loadCatalog(conf, 
null).get();
     Assert.assertEquals("true", 
defaultCatalog.properties().get(catalogProperty));
@@ -302,9 +303,9 @@ public CustomHadoopCatalog(Configuration conf, String 
warehouseLocation) {
   }
 
   private void setCustomCatalogProperties(String catalogName, String 
warehouseLocation) {
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogProperties.WAREHOUSE_LOCATION),
         warehouseLocation);
-    conf.set(CatalogUtils.catalogPropertyConfigKey(catalogName, 
CatalogProperties.CATALOG_IMPL),
+    conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(catalogName, 
CatalogProperties.CATALOG_IMPL),
         CustomHadoopCatalog.class.getName());
     conf.set(InputFormatConfig.CATALOG_NAME, catalogName);
   }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index 30f7bb08763..5e59633505e 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -57,7 +57,7 @@
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
@@ -373,9 +373,9 @@ public void testCustomCatalog() throws IOException {
     String warehouseLocation = 
temp.newFolder("hadoop_catalog").getAbsolutePath();
     conf.set("warehouse.location", warehouseLocation);
     conf.set(InputFormatConfig.CATALOG_NAME, 
Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
-    
conf.set(CatalogUtils.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+    
conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
         CatalogUtil.ICEBERG_CATALOG_TYPE), 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
-    
conf.set(CatalogUtils.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
+    
conf.set(IcebergCatalogProperties.catalogPropertyConfigKey(Catalogs.ICEBERG_DEFAULT_CATALOG_NAME,
         CatalogProperties.WAREHOUSE_LOCATION), warehouseLocation);
 
     Catalog catalog = new HadoopCatalog(conf, conf.get("warehouse.location"));
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 09229d3b91b..5e66f795a7e 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -1013,6 +1013,7 @@ public void testIcebergAndHmsTableProperties() throws 
Exception {
     expectedIcebergProperties.put("custom_property", "initial_val");
     expectedIcebergProperties.put("EXTERNAL", "TRUE");
     expectedIcebergProperties.put("storage_handler", 
HiveIcebergStorageHandler.class.getName());
+    
expectedIcebergProperties.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
 "true");
     expectedIcebergProperties.put(serdeConstants.SERIALIZATION_FORMAT, "1");
     expectedIcebergProperties.put(
         TableProperties.PARQUET_COMPRESSION,
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergWriteMetadataCleanup.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergWriteMetadataCleanup.java
new file mode 100644
index 00000000000..6c1b050f124
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergWriteMetadataCleanup.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
+import org.apache.iceberg.hive.IcebergTableProperties;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+
+import static org.apache.iceberg.TableMetadataParser.getFileExtension;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests Format specific features, such as reading/writing tables, using 
delete files, etc.
+ */
+public class TestHiveIcebergWriteMetadataCleanup {
+
+  protected static TestHiveShell shell;
+  protected TestTables testTables;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Rule
+  public Timeout timeout = new Timeout(500_000, TimeUnit.MILLISECONDS);
+
+  @BeforeClass
+  public static void beforeClass() {
+    shell = HiveIcebergStorageHandlerTestUtils.shell();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    shell.stop();
+  }
+
+  @Before
+  public void before() throws IOException {
+    testTables = HiveIcebergStorageHandlerTestUtils.testTables(shell, 
TestTables.TestTableType.HIVE_CATALOG, temp);
+    HiveIcebergStorageHandlerTestUtils.init(shell, testTables, temp);
+    HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+  }
+
+  @After
+  public void after() throws Exception {
+    HiveIcebergStorageHandlerTestUtils.close(shell);
+    // Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs 
set the ExecMapper status to done=false
+    // at the beginning and to done=true at the end. However, tez jobs also 
rely on this value to see if they should
+    // proceed, but they do not reset it to done=false at the beginning. 
Therefore, without calling this after each test
+    // case, any tez job that follows a completed mr job will erroneously read 
done=true and will not proceed.
+    ExecMapper.setDone(false);
+  }
+
+  private void insertFirstFiveCustomers() {
+    shell.executeStatement("insert into customers values (0, 'Alice', 
'Brown')");
+    shell.executeStatement("insert into customers values (1, 'Bob', 'Brown')");
+    shell.executeStatement("insert into customers values (2, 'Charlie', 
'Brown')");
+    shell.executeStatement("insert into customers values (3, 'David', 
'Brown')");
+    shell.executeStatement("insert into customers values (4, 'Eve', 'Brown')");
+  }
+
+  private void insertNextFiveCustomers() {
+    shell.executeStatement("insert into customers values (5, 'Frank', 
'Brown')");
+    shell.executeStatement("insert into customers values (6, 'Grace', 
'Brown')");
+    shell.executeStatement("insert into customers values (7, 'Heidi', 
'Brown')");
+    shell.executeStatement("insert into customers values (8, 'Ivan', 
'Brown')");
+    shell.executeStatement("insert into customers values (9, 'Judy', 
'Brown')");
+  }
+
+  @Test
+  public void testWriteMetadataCleanupDisabledByDefault() {
+
+    // Disable write metadata cleanup on session level
+    
shell.getHiveConf().setBoolean(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX
 +
+        TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, false);
+
+    Table table = testTables.createTable(shell, "customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 
PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);
+
+    insertFirstFiveCustomers();
+    insertNextFiveCustomers();
+
+    assertMetadataFiles(table, 11);
+  }
+
+  @Test
+  public void testWriteMetadataCleanupTableDefaultConfigs() {
+
+    // Set default metadata cleanup configs
+    
shell.getHiveConf().setBoolean(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX
 +
+        TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, true);
+    
shell.getHiveConf().setInt(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX +
+        TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 5);
+
+    Table table = testTables.createTable(shell, "customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 
PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);
+
+    insertFirstFiveCustomers();
+    insertNextFiveCustomers();
+
+    assertMetadataFiles(table, 6);
+  }
+
+  @Test
+  public void testWriteMetadataCleanupCatalogConfigsOverrideDefaults() {
+
+    // Set default metadata cleanup configs
+    
shell.getHiveConf().setBoolean(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX
 +
+        TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, false);
+    
shell.getHiveConf().setInt(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX +
+        TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 5);
+
+    // Override metadata cleanup configs on catalog default level
+    String namedCatalogPrefix = IcebergCatalogProperties.CATALOG_CONFIG_PREFIX 
+ "hive.table-default.";
+
+    shell.getHiveConf().setBoolean(namedCatalogPrefix + 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, true);
+    shell.getHiveConf().setInt(namedCatalogPrefix + 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 3);
+
+    Table table = testTables.createTable(shell, "customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 
PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);
+
+    insertFirstFiveCustomers();
+    insertNextFiveCustomers();
+
+    assertMetadataFiles(table, 4);
+  }
+
+  @Test
+  public void testWriteMetadataCleanupTablePropsOverrideOtherLevels() {
+
+    // Set default metadata cleanup configs
+    
shell.getHiveConf().setBoolean(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX
 +
+        TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, false);
+    
shell.getHiveConf().setInt(IcebergTableProperties.TABLE_DEFAULT_CONFIG_PREFIX +
+        TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 1);
+
+    // Override metadata cleanup configs on catalog default level
+    String namedCatalogPrefix = IcebergCatalogProperties.CATALOG_CONFIG_PREFIX 
+ "hive.table-default.";
+
+    shell.getHiveConf().setBoolean(namedCatalogPrefix + 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, false);
+    shell.getHiveConf().setInt(namedCatalogPrefix + 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 2);
+
+    Table table = testTables.createTable(shell, "customers",
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 
PartitionSpec.unpartitioned(), FileFormat.ORC, null, 2);
+
+    // Override metadata cleanup configs in table properties
+    shell.executeStatement(String.format("alter table customers set 
tblproperties('%s'='%s', '%s'='%d')",
+        TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true",
+        TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 4));
+
+    insertFirstFiveCustomers();
+    insertNextFiveCustomers();
+
+    assertMetadataFiles(table, 5);
+  }
+
+  private void assertMetadataFiles(Table table, int expectedCount) {
+    List<String> metadataFiles =
+        Arrays.stream(new File(table.location().replaceAll("^[a-zA-Z]+:", "") 
+ "/metadata")
+                .listFiles())
+            .map(File::getAbsolutePath)
+            .filter(f -> 
f.endsWith(getFileExtension(TableMetadataParser.Codec.NONE)))
+            .toList();
+    assertThat(metadataFiles).hasSize(expectedCount);
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 8e885f36ff7..a5e75b3d8ac 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -49,9 +49,9 @@
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.CatalogUtils;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.HiveVersion;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestCatalogs;
@@ -543,9 +543,9 @@ static class CustomCatalogTestTables extends TestTables {
     @Override
     public Map<String, String> properties() {
       return ImmutableMap.of(
-          CatalogUtils.catalogPropertyConfigKey(catalog, 
CatalogProperties.CATALOG_IMPL),
+          IcebergCatalogProperties.catalogPropertyConfigKey(catalog, 
CatalogProperties.CATALOG_IMPL),
           TestCatalogs.CustomHadoopCatalog.class.getName(),
-          CatalogUtils.catalogPropertyConfigKey(catalog, 
CatalogProperties.WAREHOUSE_LOCATION),
+          IcebergCatalogProperties.catalogPropertyConfigKey(catalog, 
CatalogProperties.WAREHOUSE_LOCATION),
           warehouseLocation
       );
     }
@@ -574,9 +574,9 @@ static class HadoopCatalogTestTables extends TestTables {
     @Override
     public Map<String, String> properties() {
       return ImmutableMap.of(
-          CatalogUtils.catalogPropertyConfigKey(catalog, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
+          IcebergCatalogProperties.catalogPropertyConfigKey(catalog, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
           CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
-          CatalogUtils.catalogPropertyConfigKey(catalog, 
CatalogProperties.WAREHOUSE_LOCATION),
+          IcebergCatalogProperties.catalogPropertyConfigKey(catalog, 
CatalogProperties.WAREHOUSE_LOCATION),
           warehouseLocation
       );
     }
@@ -628,8 +628,8 @@ static class HiveTestTables extends TestTables {
 
     @Override
     public Map<String, String> properties() {
-      return ImmutableMap.of(CatalogUtils.catalogPropertyConfigKey(catalog, 
CatalogUtil.ICEBERG_CATALOG_TYPE),
-              CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+      return 
ImmutableMap.of(IcebergCatalogProperties.catalogPropertyConfigKey(catalog,
+              CatalogUtil.ICEBERG_CATALOG_TYPE), 
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
     }
 
     @Override
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/alter_multi_part_table_to_iceberg.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/alter_multi_part_table_to_iceberg.q.out
index c4c1a87209a..118552ac4d5 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/alter_multi_part_table_to_iceberg.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/alter_multi_part_table_to_iceberg.q.out
@@ -227,6 +227,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -497,6 +498,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -767,6 +769,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1103,6 +1106,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1543,6 +1547,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1983,6 +1988,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/alter_part_table_to_iceberg.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/alter_part_table_to_iceberg.q.out
index e8b56f6ba92..2e55bbd42b7 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/alter_part_table_to_iceberg.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/alter_part_table_to_iceberg.q.out
@@ -182,6 +182,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -455,6 +456,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -805,6 +807,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1155,6 +1158,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1451,6 +1455,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/alter_table_to_iceberg.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/alter_table_to_iceberg.q.out
index 011bb08c358..88ad3396dd7 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/alter_table_to_iceberg.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/alter_table_to_iceberg.q.out
@@ -137,6 +137,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -307,6 +308,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -477,6 +479,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git a/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out
index 2ea758790f8..d85fc67aab8 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/col_stats.q.out
@@ -558,6 +558,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -602,6 +603,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table.q.out
index 63784d8bdfd..287c95f18fa 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table.q.out
@@ -45,6 +45,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_as_fileformat.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_as_fileformat.q.out
index 6e50f26a107..a2657304780 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_as_fileformat.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_as_fileformat.q.out
@@ -52,6 +52,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -124,6 +125,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -196,6 +198,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -268,6 +271,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -335,6 +339,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg.q.out
index 63784d8bdfd..287c95f18fa 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg.q.out
@@ -45,6 +45,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg_with_serdeproperties.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg_with_serdeproperties.q.out
index 5e6ad6dbea1..3e6850812ff 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg_with_serdeproperties.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/create_iceberg_table_stored_by_iceberg_with_serdeproperties.q.out
@@ -46,6 +46,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
index 8cbadc4eace..c0a81219df6 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/ctas_iceberg_partitioned_orc.q.out
@@ -316,6 +316,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/ctlt_iceberg.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/ctlt_iceberg.q.out
index 0f11a452824..93b11461492 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/ctlt_iceberg.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/ctlt_iceberg.q.out
@@ -52,6 +52,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: select count(*) from target
 PREHOOK: type: QUERY
@@ -138,6 +139,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: create table emp_like1 like emp_iceberg stored by iceberg
 PREHOOK: type: CREATETABLE
@@ -181,6 +183,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: create table emp (id int) partitioned by (company string)
 PREHOOK: type: CREATETABLE
@@ -253,6 +256,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: create managed table man_table (id int) Stored as orc 
TBLPROPERTIES ('transactional'='true')
 PREHOOK: type: CREATETABLE
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/delete_all_iceberg.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/delete_all_iceberg.q.out
index 45674c39913..58dde9ecad2 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/delete_all_iceberg.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/delete_all_iceberg.q.out
@@ -134,6 +134,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -285,6 +286,7 @@ Table Parameters:
        write.delete.mode       copy-on-write       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -436,6 +438,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_table.q.out
index 832798c31c8..09ef4d1ac96 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_table.q.out
@@ -93,6 +93,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -154,6 +155,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -216,6 +218,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -266,6 +269,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition.q.out
index d53a3dec7d2..000b8b0ba81 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition.q.out
@@ -518,6 +518,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1712,6 +1713,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -2763,6 +2765,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -3452,6 +3455,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -4141,6 +4145,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -4830,6 +4835,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out
index cdc52706dda..1cbaf6d004b 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_transforms.q.out
@@ -649,6 +649,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1340,6 +1341,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -2031,6 +2033,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -2481,6 +2484,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -2915,6 +2919,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -3137,6 +3142,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_with_evolution.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_with_evolution.q.out
index 425fdb35cf4..0a724298b07 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_with_evolution.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_into_partition_with_evolution.q.out
@@ -202,6 +202,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition.q.out
index e4339bde104..7aaec8f1924 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition.q.out
@@ -300,6 +300,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1268,6 +1269,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1743,6 +1745,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -2206,6 +2209,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -2669,6 +2673,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -3132,6 +3137,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition_transforms.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition_transforms.q.out
index 3ddcd27832b..7b98a534905 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition_transforms.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_insert_overwrite_partition_transforms.q.out
@@ -645,6 +645,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1310,6 +1311,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -1979,6 +1981,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -2429,6 +2432,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_v2_deletes.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_v2_deletes.q.out
index 118cb04f2ec..a00317017d4 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_v2_deletes.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_v2_deletes.q.out
@@ -39,6 +39,7 @@ TBLPROPERTIES (
   'write.delete.mode'='merge-on-read', 
   'write.format.default'='orc', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: insert into ice01 values (1),(2),(3),(4)
 PREHOOK: type: QUERY
@@ -152,6 +153,7 @@ TBLPROPERTIES (
   'write.delete.mode'='merge-on-read', 
   'write.format.default'='orc', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: delete from ice01 where id=5
 PREHOOK: type: QUERY
@@ -299,6 +301,7 @@ TBLPROPERTIES (
   'write.delete.mode'='merge-on-read', 
   'write.format.default'='orc', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: delete from icepart01 where id=5
 PREHOOK: type: QUERY
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_v3_deletion_vectors.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_v3_deletion_vectors.q.out
index d6246c0d5a1..cd64a41b942 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_v3_deletion_vectors.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_v3_deletion_vectors.q.out
@@ -38,6 +38,7 @@ TBLPROPERTIES (
   'write.delete.mode'='merge-on-read', 
   'write.format.default'='orc', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: insert into ice01 values (1),(2),(3),(4)
 PREHOOK: type: QUERY
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
index 8896cf716ce..83947b35e9a 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_alter_locally_zordered_table.q.out
@@ -46,6 +46,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -210,6 +211,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_ordered_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_ordered_table.q.out
index 391e5b12a02..ed49513445f 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_ordered_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_ordered_table.q.out
@@ -56,6 +56,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -150,6 +151,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
index f5afffc29c2..d5ab8aafda8 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_create_locally_zordered_table.q.out
@@ -56,6 +56,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -301,6 +302,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -426,6 +428,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -560,6 +563,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
index b7494bd383b..b0b02dda8a8 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution.q.out
@@ -313,6 +313,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -905,6 +906,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out
index 1d57fc24996..d0195d0d475 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution2.q.out
@@ -178,6 +178,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -261,6 +262,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out
index a4d270f0aa2..4b5738e6b7a 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out
@@ -173,6 +173,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -318,6 +319,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out
index e5ae9faca15..1f6be0ebf87 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out
@@ -268,6 +268,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -373,6 +374,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out
index 206e7b223a9..96f9e6fe34c 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out
@@ -220,6 +220,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -327,6 +328,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out
index d4c81a69862..ef881a53ba3 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partitioned.q.out
@@ -213,6 +213,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -312,6 +313,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -544,6 +546,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -647,6 +650,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_query_metadata.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_query_metadata.q.out
index 72794fdc0e6..26d7eca677b 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_query_metadata.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_query_metadata.q.out
@@ -116,6 +116,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out
index 8ccaabaab9f..9e59529a9e2 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_schema_evolution.q.out
@@ -249,6 +249,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -349,6 +350,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
index 711a0a3ada8..ba16b014e3c 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition.q.out
@@ -221,6 +221,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -327,6 +328,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -439,6 +441,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution.q.out
index 6ddbc10a80e..23ff2c3bafa 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution.q.out
@@ -203,6 +203,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -313,6 +314,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -423,6 +425,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -532,6 +535,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -641,6 +645,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution2.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution2.q.out
index abeeaa5dadc..f3a5e2964b5 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution2.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_single_partition_with_evolution2.q.out
@@ -148,6 +148,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -237,6 +238,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -340,6 +342,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -443,6 +446,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out
index 0d7ed2d6113..7bf7f084572 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned.q.out
@@ -205,6 +205,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -317,6 +318,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out
index 81ccad496e3..97892820926 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_ordered.q.out
@@ -102,6 +102,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -215,6 +216,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_w_filter.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_w_filter.q.out
index 74b976ec101..ce004c9547e 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_w_filter.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_unpartitioned_w_filter.q.out
@@ -205,6 +205,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -296,6 +297,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
index 137d435d86b..2a1860b2b3d 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
@@ -106,6 +106,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -219,6 +220,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_partition_evolution.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_partition_evolution.q.out
index 2687fd3fb2a..52ef7768046 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_partition_evolution.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_partition_evolution.q.out
@@ -134,6 +134,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -214,6 +215,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -318,6 +320,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_unpartitioned.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_unpartitioned.q.out
index 84d93ccf927..fabbe1a82c7 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_unpartitioned.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_unpartitioned.q.out
@@ -113,6 +113,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -196,6 +197,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -293,6 +295,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
index 64a9d65f878..8bba659e8fd 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
@@ -98,6 +98,7 @@ TBLPROPERTIES (
   'write.delete.mode'='merge-on-read', 
   'write.format.default'='orc', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: insert into ice_orc2 partition (company_id=100) 
 VALUES ('fn1','ln1', 1, 10), ('fn2','ln2', 2, 20), ('fn3','ln3', 3, 30)
@@ -157,6 +158,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
index a814fe8f149..409eb484480 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
@@ -98,6 +98,7 @@ TBLPROPERTIES (
   'write.delete.mode'='merge-on-read', 
   'write.format.default'='orc', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: insert into ice_orc2 partition (company_id=100) 
 VALUES ('fn1','ln1', 1, 10), ('fn2','ln2', 2, 20), ('fn3','ln3', 3, 30)
@@ -157,6 +158,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc.q.out
index 7c274a85a94..0bb8f5837f7 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc.q.out
@@ -84,6 +84,7 @@ Table Parameters:
 #### A masked pattern was here ####
        uuid                    #Masked#
        write.format.default    orc                 
+       write.metadata.delete-after-commit.enabled      true                
                 
 # Storage Information           
 SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
@@ -173,6 +174,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc2.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc2.q.out
index 94b707fe73f..d08fb3ec109 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc2.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_partitioned_orc2.q.out
@@ -85,6 +85,7 @@ Table Parameters:
 #### A masked pattern was here ####
        uuid                    #Masked#
        write.format.default    orc                 
+       write.metadata.delete-after-commit.enabled      true                
                 
 # Storage Information           
 SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
@@ -175,6 +176,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git a/iceberg/iceberg-handler/src/test/results/positive/row_count.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/row_count.q.out
index 6e0458eb69e..1c4d2e05cdb 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/row_count.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/row_count.q.out
@@ -124,6 +124,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -210,6 +211,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/show_create_iceberg_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/show_create_iceberg_table.q.out
index 4393179f405..3ecbc531d27 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/show_create_iceberg_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/show_create_iceberg_table.q.out
@@ -44,6 +44,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: DROP TABLE IF EXISTS ice_tv1
 PREHOOK: type: DROPTABLE
@@ -88,7 +89,8 @@ TBLPROPERTIES (
   'snapshot-count'='0', 
   'table_type'='ICEBERG', 
 #### A masked pattern was here ####
-  'uuid'='#Masked#')
+  'uuid'='#Masked#', 
+  'write.metadata.delete-after-commit.enabled'='true')
 PREHOOK: query: DROP TABLE IF EXISTS ice_tv2
 PREHOOK: type: DROPTABLE
 PREHOOK: Output: database:default
@@ -135,6 +137,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: DROP TABLE IF EXISTS ice_t_transform
 PREHOOK: type: DROPTABLE
@@ -194,6 +197,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: DROP TABLE IF EXISTS ice_t_transform_prop
 PREHOOK: type: DROPTABLE
@@ -254,6 +258,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: DROP TABLE IF EXISTS ice_t_identity_part
 PREHOOK: type: DROPTABLE
@@ -302,6 +307,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: DROP TABLE IF EXISTS ice_data
 PREHOOK: type: DROPTABLE
@@ -359,6 +365,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: DROP TABLE IF EXISTS ice_noHive
 PREHOOK: type: DROPTABLE
@@ -404,6 +411,7 @@ TBLPROPERTIES (
   'uuid'='#Masked#', 
   'write.delete.mode'='merge-on-read', 
   'write.merge.mode'='merge-on-read', 
+  'write.metadata.delete-after-commit.enabled'='true', 
   'write.update.mode'='merge-on-read')
 PREHOOK: query: INSERT INTO ice_noHive VALUES (1, 'ABC'),(2, 'CCC'),(3, 'DBD')
 PREHOOK: type: QUERY
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/show_iceberg_materialized_views.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/show_iceberg_materialized_views.q.out
index dd9c7561b7f..1dc569be87c 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/show_iceberg_materialized_views.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/show_iceberg_materialized_views.q.out
@@ -379,6 +379,7 @@ Table Parameters:
 #### A masked pattern was here ####
        uuid                    #Masked#
        write.format.default    orc                 
+       write.metadata.delete-after-commit.enabled      true                
                 
 # Storage Information           
 SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
@@ -430,6 +431,7 @@ Table Parameters:
 #### A masked pattern was here ####
        uuid                    #Masked#
        write.format.default    orc                 
+       write.metadata.delete-after-commit.enabled      true                
                 
 # Storage Information           
 SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
@@ -482,6 +484,7 @@ Table Parameters:
 #### A masked pattern was here ####
        uuid                    #Masked#
        write.format.default    orc                 
+       write.metadata.delete-after-commit.enabled      true                
                 
 # Storage Information           
 SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
@@ -587,6 +590,7 @@ Table Parameters:
 #### A masked pattern was here ####
        uuid                    #Masked#
        write.format.default    orc                 
+       write.metadata.delete-after-commit.enabled      true                
                 
 # Storage Information           
 SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/truncate_force_iceberg_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/truncate_force_iceberg_table.q.out
index 81e8ad5a2ba..44d2c059a19 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/truncate_force_iceberg_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/truncate_force_iceberg_table.q.out
@@ -114,6 +114,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -189,6 +190,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    parquet             
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/truncate_iceberg_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/truncate_iceberg_table.q.out
index 4b1521349c1..6c5831383ba 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/truncate_iceberg_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/truncate_iceberg_table.q.out
@@ -114,6 +114,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -189,6 +190,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -262,6 +264,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -337,6 +340,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -428,6 +432,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    orc                 
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/truncate_partitioned_iceberg_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/truncate_partitioned_iceberg_table.q.out
index d32cd0480d6..0d6f6a02a62 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/truncate_partitioned_iceberg_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/truncate_partitioned_iceberg_table.q.out
@@ -126,6 +126,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -229,6 +230,7 @@ Table Parameters:
        write.delete.mode       merge-on-read       
        write.format.default    avro                
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/use_basic_stats_from_iceberg.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/use_basic_stats_from_iceberg.q.out
index bb4d4b662d0..a0ba02dbf03 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/use_basic_stats_from_iceberg.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/use_basic_stats_from_iceberg.q.out
@@ -176,6 +176,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
@@ -220,6 +221,7 @@ Table Parameters:
        uuid                    #Masked#
        write.delete.mode       merge-on-read       
        write.merge.mode        merge-on-read       
+       write.metadata.delete-after-commit.enabled      true                
        write.update.mode       merge-on-read       
                 
 # Storage Information           
diff --git 
a/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
 
b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
index 788850c7b24..fd30223c993 100644
--- 
a/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
+++ 
b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
@@ -43,7 +43,7 @@
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
 import org.junit.jupiter.api.AfterEach;
@@ -70,7 +70,8 @@ public abstract class TestHiveRESTCatalogClientITBase {
   static final String TABLE_NAME = "ice_tbl";
   static final String CATALOG_NAME = "ice01";
   static final String HIVE_ICEBERG_STORAGE_HANDLER = 
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
-  static final String REST_CATALOG_PREFIX = String.format("%s%s.", 
CatalogUtils.CATALOG_CONFIG_PREFIX, CATALOG_NAME);
+  static final String REST_CATALOG_PREFIX = String.format("%s%s.", 
IcebergCatalogProperties.CATALOG_CONFIG_PREFIX, 
+      CATALOG_NAME);
 
   HiveConf hiveConf;
   Configuration conf;
@@ -154,7 +155,7 @@ public void testIceberg() throws Exception {
 
     // --- Create Table --- with an invalid catalog name in table parameters 
(should fail)
     Map<String, String> tableParameters = new java.util.HashMap<>();
-    tableParameters.put(CatalogUtils.CATALOG_NAME, "some_missing_catalog");
+    tableParameters.put(IcebergCatalogProperties.CATALOG_NAME, 
"some_missing_catalog");
     assertThrows(IllegalArgumentException.class, () -> 
         createPartitionedTable(msClient, CATALOG_NAME, DB_NAME, TABLE_NAME + 
"_2", tableParameters));
 
diff --git 
a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java
 
b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java
index 07a49d3a271..89545f83a07 100644
--- 
a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java
+++ 
b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogGravitinoLlapLocalCliDriver.java
@@ -28,7 +28,7 @@
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.hive.client.HiveRESTCatalogClient;
 import org.apache.iceberg.rest.extension.OAuth2AuthorizationServer;
 import org.junit.After;
@@ -121,7 +121,7 @@ public void setup() throws IOException {
 
     String host = gravitinoContainer.getHost();
     Integer port = gravitinoContainer.getMappedPort(GRAVITINO_HTTP_PORT);
-    String restCatalogPrefix = String.format("%s%s.", 
CatalogUtils.CATALOG_CONFIG_PREFIX, CATALOG_NAME);
+    String restCatalogPrefix = String.format("%s%s.", 
IcebergCatalogProperties.CATALOG_CONFIG_PREFIX, CATALOG_NAME);
 
     // Suppress IntelliJ warning about using HTTP since this is a local test 
container connection
     @SuppressWarnings("HttpUrlsUsage")
diff --git 
a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogHMSLlapLocalCliDriver.java
 
b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogHMSLlapLocalCliDriver.java
index 2f5031601de..429661e390f 100644
--- 
a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogHMSLlapLocalCliDriver.java
+++ 
b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestIcebergRESTCatalogHMSLlapLocalCliDriver.java
@@ -27,7 +27,7 @@
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.ITestsSchemaInfo;
 import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.hive.CatalogUtils;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -76,7 +76,7 @@ public TestIcebergRESTCatalogHMSLlapLocalCliDriver(String 
name, File qfile) {
 
   @Before
   public void setupHiveConfig() {
-    String restCatalogPrefix = String.format("%s%s.", 
CatalogUtils.CATALOG_CONFIG_PREFIX, CATALOG_NAME);
+    String restCatalogPrefix = String.format("%s%s.", 
IcebergCatalogProperties.CATALOG_CONFIG_PREFIX, CATALOG_NAME);
 
     Configuration conf = SessionState.get().getConf();
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_IMPL,


Reply via email to