This is an automated email from the ASF dual-hosted git repository.

difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 279996b0858 HIVE-29028: Iceberg: Implement auto compaction (#5886)
279996b0858 is described below

commit 279996b0858896a5b953745ec793c89d82378ea6
Author: Dmitriy Fingerman <dmitriy.finger...@gmail.com>
AuthorDate: Wed Jul 2 23:12:10 2025 -0400

    HIVE-29028: Iceberg: Implement auto compaction (#5886)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  22 +-
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |  10 +
 .../mr/hive/compaction/IcebergTableOptimizer.java  | 191 +++++++++++++++++
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  |  11 +
 .../txn/compactor/TestIcebergCompactorOnTez.java   | 117 +++++++++--
 .../hive/ql/txn/compactor/AcidTableOptimizer.java  | 111 ++++++++++
 .../hive/ql/txn/compactor/CompactionException.java |   6 +
 .../hive/ql/txn/compactor/CompactorThread.java     |  32 ---
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 226 +++++----------------
 .../ql/txn/compactor/MetaStoreCompactorThread.java |  31 ---
 .../ql/txn/compactor/RemoteCompactorThread.java    |  28 ---
 .../hive/ql/txn/compactor/TableOptimizer.java      | 219 ++++++++++++++++++++
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   2 +
 .../hive/ql/txn/compactor/CompactorTest.java       |   2 +
 .../hive/ql/txn/compactor/TestInitiator.java       |   4 +-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   5 +
 16 files changed, 712 insertions(+), 305 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index dc5ac290462..b4d9f0bea22 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -140,7 +140,6 @@
 import org.apache.iceberg.FindFiles;
 import org.apache.iceberg.GenericBlobMetadata;
 import org.apache.iceberg.GenericStatisticsFile;
-import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionField;
@@ -459,7 +458,7 @@ public boolean 
supportsAppendData(org.apache.hadoop.hive.metastore.api.Table tab
       return true;
     }
     // If it is a table which has undergone partition evolution, return false;
-    if (hasUndergonePartitionEvolution(icebergTbl)) {
+    if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTbl)) {
       if (withPartClause) {
         throw new SemanticException("Can not Load into an iceberg table, which 
has undergone partition evolution " +
             "using the PARTITION clause");
@@ -1405,7 +1404,7 @@ public void validateSinkDesc(FileSinkDesc sinkDesc) 
throws SemanticException {
       if (IcebergTableUtil.isBucketed(table)) {
         throw new SemanticException("Cannot perform insert overwrite query on 
bucket partitioned Iceberg table.");
       }
-      if (hasUndergonePartitionEvolution(table)) {
+      if (IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
         throw new SemanticException(
             "Cannot perform insert overwrite query on Iceberg table where 
partition evolution happened. In order " +
             "to successfully carry out any insert overwrite operation on this 
table, the data has to be rewritten " +
@@ -2061,7 +2060,7 @@ public void 
validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
   private void validatePartSpecImpl(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable,
       Map<String, String> partitionSpec, List<PartitionField> partitionFields) 
throws SemanticException {
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    if (hmsTable.getSnapshotRef() != null && 
hasUndergonePartitionEvolution(table)) {
+    if (hmsTable.getSnapshotRef() != null && 
IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
       // for this case we rewrite the query as delete query, so validations 
would be done as part of delete.
       return;
     }
@@ -2108,7 +2107,7 @@ private void 
validatePartSpecImpl(org.apache.hadoop.hive.ql.metadata.Table hmsTa
   public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, Map<String, String> partitionSpec)
       throws SemanticException {
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    if (MapUtils.isEmpty(partitionSpec) || 
!hasUndergonePartitionEvolution(table)) {
+    if (MapUtils.isEmpty(partitionSpec) || 
!IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
       return true;
     } else if (hmsTable.getSnapshotRef() != null) {
       return false;
@@ -2130,15 +2129,6 @@ public boolean 
canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
     return result;
   }
 
-  private boolean hasUndergonePartitionEvolution(Table table) {
-    // The current spec is not necessary the latest which can happen when 
partition spec was changed to one of
-    // table's past specs.
-    return table.currentSnapshot() != null &&
-        table.currentSnapshot().allManifests(table.io()).parallelStream()
-        .map(ManifestFile::partitionSpecId)
-        .anyMatch(id -> id != table.spec().specId());
-  }
-
   @Override
   public List<Partition> 
getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
       Map<String, String> partitionSpec, boolean latestSpecOnly) throws 
SemanticException {
@@ -2159,7 +2149,7 @@ public boolean 
isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable)
     Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
 
     boolean readsNonCurrentSnapshot = snapshot != null && 
!snapshot.equals(table.currentSnapshot());
-    if (readsNonCurrentSnapshot && hasUndergonePartitionEvolution(table)) {
+    if (readsNonCurrentSnapshot && 
IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
       return false;
     }
     return table.spec().isPartitioned();
@@ -2368,7 +2358,7 @@ public void setMergeTaskDeleteProperties(TableDesc 
tableDesc) {
   @Override
   public boolean 
hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable) {
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    return hasUndergonePartitionEvolution(table);
+    return IcebergTableUtil.hasUndergonePartitionEvolution(table);
   }
 
   private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 54b283527a1..2e4e5f0a094 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -59,6 +59,7 @@
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionData;
@@ -562,4 +563,13 @@ public static ExecutorService newDeleteThreadPool(String 
completeName, int numTh
       return thread;
     });
   }
+
+  public static boolean hasUndergonePartitionEvolution(Table table) {
+    // The current spec is not necessary the latest which can happen when 
partition spec was changed to one of
+    // table's past specs.
+    return table.currentSnapshot() != null &&
+        table.currentSnapshot().allManifests(table.io()).parallelStream()
+            .map(ManifestFile::partitionSpecId)
+            .anyMatch(id -> id != table.spec().specId());
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
new file mode 100644
index 00000000000..ab785844461
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.compaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer;
+import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
+import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.mr.hive.IcebergTableUtil;
+import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.thrift.TException;
+
+public class IcebergTableOptimizer extends TableOptimizer {
+  private HiveMetaStoreClient client;
+  private Map<String, Long> snapshotIdCache;
+
+  public IcebergTableOptimizer(HiveConf conf, TxnStore txnHandler, 
MetadataCache metadataCache) throws MetaException {
+    super(conf, txnHandler, metadataCache);
+    init();
+  }
+
+  /**
+   * Scans all databases and tables in the Hive Metastore to identify Iceberg 
tables
+   * that are potential candidates for compaction.
+   * <p>
+   * The method filters tables based on provided databases and tables skip 
lists and a snapshot ID cache to avoid
+   * re-processing tables that haven't changed. For eligible Iceberg tables, 
it determines
+   * the appropriate compaction targets (table or specific partitions) and 
adds them to
+   * the {@link CompactionInfo} set.
+   * </p>
+   * @param lastChecked A timestamp of previous auto compaction's invocation.
+   * @param currentCompactions A {@link ShowCompactResponse} containing 
currently active or pending
+   * compaction requests, used to avoid duplicates.
+   * @param skipDBs A {@link Set} of database names to explicitly skip during 
the scan.
+   * @param skipTables A {@link Set} of fully qualified table names to 
explicitly skip during the scan.
+   * @return A {@link Set} of {@link CompactionInfo} objects representing 
tables and/or partitions
+   * identified as eligible for compaction.
+   * @throws MetaException If an unrecoverable error occurs during Metastore 
communication or
+   * during {@link SessionState} initialization.
+   */
+  @Override
+  public Set<CompactionInfo> findPotentialCompactions(long lastChecked, 
ShowCompactResponse currentCompactions,
+      Set<String> skipDBs, Set<String> skipTables) throws MetaException {
+    Set<CompactionInfo> compactionTargets = Sets.newHashSet();
+    try {
+      SessionState sessionState = SessionState.get();
+      if (sessionState == null) {
+        sessionState = new SessionState(conf);
+        SessionState.start(sessionState);
+      }
+    } catch (Exception e) {
+      throw new MetaException(String.format("Error while finding compaction 
targets for Iceberg tables: %s",
+          e.getMessage()));
+    }
+
+    getDatabases().stream()
+        .filter(dbName -> !skipDBs.contains(dbName))
+        .flatMap(dbName -> getTables(dbName).stream()
+            .map(tableName -> TableName.getDbTable(dbName, tableName)))
+        .filter(qualifiedTableName -> !skipTables.contains(qualifiedTableName))
+        .map(qualifiedTableName -> Pair.of(qualifiedTableName, 
resolveMetastoreTable(qualifiedTableName)))
+        .filter(tablePair -> 
MetaStoreUtils.isIcebergTable(tablePair.getValue().getParameters()))
+        .filter(tablePair -> {
+          long currentSnapshotId = 
Long.parseLong(tablePair.getValue().getParameters().get("current-snapshot-id"));
+          Long cachedSnapshotId = snapshotIdCache.get(tablePair.getKey());
+          return cachedSnapshotId == null || cachedSnapshotId != 
currentSnapshotId;
+        })
+        .forEach(tablePair -> {
+          org.apache.iceberg.Table icebergTable = 
IcebergTableUtil.getTable(conf, tablePair.getValue());
+
+          if (icebergTable.spec().isPartitioned()) {
+            List<String> partitions = getPartitions(icebergTable);
+
+            partitions.forEach(partition -> 
addCompactionTargetIfEligible(tablePair.getValue(), icebergTable,
+                partition, compactionTargets, currentCompactions, skipDBs, 
skipTables));
+          }
+
+          if (icebergTable.spec().isUnpartitioned() || 
IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable)) {
+            addCompactionTargetIfEligible(tablePair.getValue(), icebergTable, 
null, compactionTargets,
+                currentCompactions, skipDBs, skipTables);
+          }
+
+          snapshotIdCache.put(tablePair.getKey(), 
icebergTable.currentSnapshot().snapshotId());
+        });
+
+    return compactionTargets;
+  }
+
+  private List<String> getDatabases() {
+    try {
+      return client.getAllDatabases();
+    } catch (TException e) {
+      throw new RuntimeMetaException(e, "Error getting database names");
+    }
+  }
+
+  private List<String> getTables(String dbName) {
+    try {
+      return client.getAllTables(dbName);
+    } catch (TException e) {
+      throw new RuntimeMetaException(e, "Error getting table names of %s 
database", dbName);
+    }
+  }
+
+  private List<String> getPartitions(org.apache.iceberg.Table icebergTable) {
+    try {
+      return IcebergTableUtil.getPartitionNames(icebergTable, 
Maps.newHashMap(), true);
+    } catch (SemanticException e) {
+      throw new RuntimeMetaException(e, "Error getting partition names for 
Iceberg table %s", icebergTable.name());
+    }
+  }
+
+  private Table resolveMetastoreTable(String qualifiedTableName) {
+    String[] dbTableName = TxnUtils.getDbTableName(qualifiedTableName);
+    try {
+      return metadataCache.computeIfAbsent(qualifiedTableName,
+          () -> CompactorUtil.resolveTable(conf, dbTableName[0], 
dbTableName[1]));
+    } catch (Exception e) {
+      throw new RuntimeMetaException(e, "Error resolving table %s", 
qualifiedTableName);
+    }
+  }
+
+  public void init() throws MetaException {
+    client = new HiveMetaStoreClient(conf);
+    snapshotIdCache = Maps.newConcurrentMap();
+  }
+
+  private void addCompactionTargetIfEligible(Table table, 
org.apache.iceberg.Table icebergTable, String partitionName,
+      Set<CompactionInfo> compactions, ShowCompactResponse currentCompactions, 
Set<String> skipDBs,
+      Set<String> skipTables) {
+
+    CompactionInfo ci = new CompactionInfo(table.getDbName(), 
table.getTableName(), partitionName,
+        CompactionType.SMART_OPTIMIZE);
+
+    // Common Hive compaction eligibility checks
+    if (!isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables)) 
{
+      return;
+    }
+
+    // Iceberg specific compaction checks: determine if compaction is needed 
and which type is needed
+    CompactionEvaluator compactionEvaluator;
+    try {
+      compactionEvaluator = new CompactionEvaluator(icebergTable, ci, 
table.getParameters());
+    } catch (IOException e) {
+      throw new RuntimeMetaException(e, "Error construction compaction 
evaluator for table %s", table.getTableName());
+    }
+
+    if (!compactionEvaluator.isEligibleForCompaction()) {
+      return;
+    }
+
+    ci.type = compactionEvaluator.determineCompactionType();
+    compactions.add(ci);
+  }
+}
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index fdbb454de41..57a7fc442e5 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -52,6 +52,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -69,6 +70,7 @@ public abstract class CompactorOnTezTest {
   protected IMetaStoreClient msClient;
   protected IDriver driver;
   protected boolean mmCompaction = false;
+  private final AtomicBoolean stop = new AtomicBoolean();
 
   @ClassRule
   public static TemporaryFolder folder = new TemporaryFolder();
@@ -541,4 +543,13 @@ protected void dropTable(String tblName) throws Exception {
       executeStatementOnDriver("drop table " + tblName, driver);
     }
   }
+
+  protected void runSingleInitiatorCycle() throws Exception {
+    TestTxnDbUtil.setConfValues(conf);
+    CompactorThread t = new Initiator();
+    t.setConf(conf);
+    stop.set(true);
+    t.init(stop);
+    t.run();
+  }
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
index 9ca0bd9bbe4..f6cc76572f1 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
@@ -21,66 +21,139 @@
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionState;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import static 
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
 
 public class TestIcebergCompactorOnTez extends CompactorOnTezTest {
+  
+  private static final String DB_NAME = "default";
+  private static final String TABLE_NAME = "ice_orc";
+  private static final String QUALIFIED_TABLE_NAME = 
TxnUtils.getFullTableName(DB_NAME, TABLE_NAME);
+
+  @Override
+  @Before
+  public void setup() throws Exception {
+    super.setup();
+    executeStatementOnDriver("drop table if exists " + QUALIFIED_TABLE_NAME, 
driver);
+  }
 
   @Test
   public void testIcebergCompactorWithAllPartitionFieldTypes() throws 
Exception{
     conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, 
CUSTOM_COMPACTION_QUEUE);
     msClient = new HiveMetaStoreClient(conf);
 
-    String dbName = "default";
-    String tableName = "ice_orc";
-    String qualifiedTableName = dbName + "." + tableName;
-
-    executeStatementOnDriver("drop table if exists " + qualifiedTableName, 
driver);
     executeStatementOnDriver(String.format("create table %s " +
         "(id int, a string, b int, c bigint, d float, e double, f decimal(4, 
2), g boolean, h date, i date, j date, k timestamp) " +
         "partitioned by spec(a, truncate(3, a), bucket(4, a), b, c, d, e, f, 
g, h, year(h), month(i), day(j), k, hour(k)) stored by iceberg stored as orc " +
-        "tblproperties ('compactor.threshold.min.input.files'='1')", 
qualifiedTableName), driver);
+        "tblproperties ('compactor.threshold.min.input.files'='1')", 
QUALIFIED_TABLE_NAME), driver);
 
     // 6 records, one records per file --> 3 partitions, 2 files per partition
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 
'aaa111', 1, 100, 1.0, 2.0, 4.00, true,  DATE '2024-05-01', DATE '2024-05-01', 
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), 
driver);
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 
'aaa111', 1, 100, 1.0, 2.0, 4.00, true,  DATE '2024-05-01', DATE '2024-05-01', 
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), 
driver);
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', 
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), 
driver);
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', 
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), 
driver);
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null, 
null, null, null, null, null, null, null, null, null, null)", 
qualifiedTableName), driver);
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null, 
null, null, null, null, null, null, null, null, null, null)", 
qualifiedTableName), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 
'aaa111', 1, 100, 1.0, 2.0, 4.00, true,  DATE '2024-05-01', DATE '2024-05-01', 
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 
'aaa111', 1, 100, 1.0, 2.0, 4.00, true,  DATE '2024-05-01', DATE '2024-05-01', 
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', 
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', 
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null, 
null, null, null, null, null, null, null, null, null, null)", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null, 
null, null, null, null, null, null, null, null, null, null)", 
QUALIFIED_TABLE_NAME), driver);
 
-    Assert.assertEquals(6, getFilesCount(qualifiedTableName));
-    List<String> recordsBefore = getAllRecords(qualifiedTableName);
+    Assert.assertEquals(6, getFilesCount());
+    List<String> recordsBefore = getAllRecords();
 
-    CompactorTestUtil.runCompaction(conf, dbName, tableName, 
CompactionType.MINOR, false, 
+    CompactorTestUtil.runCompaction(conf, DB_NAME, TABLE_NAME, 
CompactionType.MINOR, false, 
         
"a=aaa111/a_trunc=aaa/a_bucket=0/b=1/c=100/d=1.0/e=2.0/f=4.00/g=true/h=2024-05-01/h_year=2024/i_month=2024-05/j_day=2024-05-01/k=2024-05-02T10%3A00%3A00/k_hour=2024-05-02-10",
         
"a=bbb222/a_trunc=bbb/a_bucket=3/b=2/c=200/d=2.0/e=3.0/f=8.00/g=false/h=2024-05-03/h_year=2024/i_month=2024-05/j_day=2024-05-03/k=2024-05-04T13%3A00%3A00/k_hour=2024-05-04-13",
         
"a=null/a_trunc=null/a_bucket=null/b=null/c=null/d=null/e=null/f=null/g=null/h=null/h_year=null/i_month=null/j_day=null/k=null/k_hour=null"
     );
     
-    Assert.assertEquals(3, getFilesCount(qualifiedTableName));
+    Assert.assertEquals(3, getFilesCount());
     verifySuccessfulCompaction(3);
-    List<String> recordsAfter = getAllRecords(qualifiedTableName);
+    List<String> recordsAfter = getAllRecords();
     
     Assert.assertEquals(recordsBefore, recordsAfter);
   }
-  
-  private int getFilesCount(String qualifiedTableName) throws Exception {
-    driver.run(String.format("select count(*) from %s.files", 
qualifiedTableName));
+
+  @Test
+  public void testIcebergAutoCompactionPartitionEvolution() throws Exception {
+    executeStatementOnDriver(String.format("create table %s " +
+        "(id int, a string) " +
+        "partitioned by spec(id) stored by iceberg stored as orc " +
+        "tblproperties ('compactor.threshold.min.input.files'='1')", 
QUALIFIED_TABLE_NAME), driver);
+
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'a')", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'b')", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'c')", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'd')", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, 'e')", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, 'd')", 
QUALIFIED_TABLE_NAME), driver);
+
+    executeStatementOnDriver(String.format("alter table %s set partition 
spec(truncate(3, a))", QUALIFIED_TABLE_NAME), driver);
+
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7, 
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8, 
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", 
QUALIFIED_TABLE_NAME), driver);
+
+    runSingleInitiatorCycle();
+    ShowCompactResponse rsp = msClient.showCompactions();
+    Assert.assertEquals(4, rsp.getCompactsSize());
+
+    // Compaction should be initiated for each partition from the latest spec
+    Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=aaa", 
CompactionType.MINOR, CompactionState.INITIATED));
+    Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=bbb", 
CompactionType.MINOR, CompactionState.INITIATED));
+    Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=null", 
CompactionType.MINOR, CompactionState.INITIATED));
+
+    // Additional compaction should be initiated for all partitions from past 
partition specs
+    Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, 
CompactionState.INITIATED));
+  }
+
+  @Test
+  public void testIcebergAutoCompactionUnpartitioned() throws Exception {
+    executeStatementOnDriver(String.format("create table %s " +
+        "(id int, a string) " +
+        "stored by iceberg stored as orc " +
+        "tblproperties ('compactor.threshold.min.input.files'='1')", 
QUALIFIED_TABLE_NAME), driver);
+
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7, 
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8, 
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", 
QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", 
QUALIFIED_TABLE_NAME), driver);
+
+    runSingleInitiatorCycle();
+    ShowCompactResponse rsp = msClient.showCompactions();
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, 
CompactionState.INITIATED));
+  }
+
+  private int getFilesCount() throws Exception {
+    driver.run(String.format("select count(*) from %s.files", 
QUALIFIED_TABLE_NAME));
     List<String> res = new ArrayList<>();
     driver.getFetchTask().fetch(res);
     return Integer.parseInt(res.get(0));
   }
 
-  private List<String> getAllRecords(String qualifiedTableName) throws 
Exception {
-    driver.run(String.format("select * from %s order by id", 
qualifiedTableName));
+  private List<String> getAllRecords() throws Exception {
+    driver.run(String.format("select * from %s order by id", 
QUALIFIED_TABLE_NAME));
     List<String> res = new ArrayList<>();
     driver.getFetchTask().fetch(res);
     return res;
   }
+  
+  private boolean isCompactExist(ShowCompactResponse rsp, String partName, 
CompactionType type, CompactionState state) {
+    return rsp.getCompacts().stream().anyMatch(c ->
+        c.getDbname().equals(DB_NAME) && c.getTablename().equals(TABLE_NAME) &&
+            Objects.equals(c.getPartitionname(), partName) && 
c.getType().equals(type) &&
+            c.getState().equals(state.name().toLowerCase()));
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java
new file mode 100644
index 00000000000..424f50a2763
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class AcidTableOptimizer extends TableOptimizer {
+  private static final String CLASS_NAME = AcidTableOptimizer.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+  public AcidTableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache 
metadataCache) {
+    super(conf, txnHandler, metadataCache);
+  }
+
+  @Override
+  public Set<CompactionInfo> findPotentialCompactions(long lastChecked, 
ShowCompactResponse currentCompactions, 
+      Set<String> skipDBs, Set<String> skipTables) throws MetaException {
+
+    int abortedThreshold = HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+    long abortedTimeThreshold = HiveConf
+        .getTimeVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+            TimeUnit.MILLISECONDS);
+
+    Set<CompactionInfo> potentials = 
txnHandler.findPotentialCompactions(abortedThreshold, abortedTimeThreshold, 
lastChecked)
+            .parallelStream()
+            .filter(ci -> isEligibleForCompaction(ci, currentCompactions, 
skipDBs, skipTables))
+            .collect(Collectors.toSet());
+
+    if (!potentials.isEmpty()) {
+      ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(
+          txnHandler.getOpenTxns(), 0);
+      conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    }
+    
+    return potentials;
+  }
+
+  @Override
+  protected boolean isEligibleForCompaction(CompactionInfo ci, 
ShowCompactResponse currentCompactions, 
+      Set<String> skipDBs, Set<String> skipTables) {
+    try {
+      if (!super.isEligibleForCompaction(ci, currentCompactions, skipDBs, 
skipTables)) {
+        return false;
+      }
+      String qualifiedTableName = ci.getFullTableName();
+      Table t = metadataCache.computeIfAbsent(qualifiedTableName, () -> 
+          CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName));
+      if (AcidUtils.isInsertOnlyTable(t.getParameters()) && !HiveConf
+          .getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
+        skipTables.add(ci.getFullTableName());
+        LOG.info("Table {} is insert only and {}=false so we will not compact 
it.", qualifiedTableName,
+            HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM.varname);
+        return false;
+      }
+      if (isDynPartIngest(t, ci)) {
+        return false;
+      }
+
+    } catch (Exception e) {
+      LOG.error("Caught exception while checking compaction eligibility.", e);
+      try {
+        ci.errorMessage = e.getMessage();
+        txnHandler.markFailed(ci);
+      } catch (MetaException ex) {
+        LOG.error("Caught exception while marking compaction as failed.", e);
+      }
+      return false;
+    }
+    return true;
+  }
+
+  // Check if it's a dynamic partitioning case. If so, do not initiate 
compaction for streaming ingest, only for aborts.
+  protected static boolean isDynPartIngest(Table t, CompactionInfo ci) {
+    if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty() &&
+        ci.partName  == null && !ci.hasOldAbort) {
+      LOG.info("Skipping entry for {} as it is from dynamic partitioning", 
ci.getFullTableName());
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
index 302345f0284..498b3e55819 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
@@ -91,4 +91,10 @@ public ErrorMsg getCanonicalErrorMsg() {
   }
 
   public String getRemoteErrorMsg() { return remoteErrorMsg; }
+
+  public CompactionException(Throwable throwable, String message, Object... 
args) {
+    super(String.format(message, args), throwable);
+    canonicalErrorMsg = ErrorMsg.GENERIC_ERROR;
+    remoteErrorMsg = null;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index ec0619f7997..b3f5d2a7eeb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -25,17 +25,13 @@
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -89,25 +85,6 @@ public void init(AtomicBoolean stop) throws Exception {
    */
   abstract Table resolveTable(CompactionInfo ci) throws MetaException;
 
-  abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws 
TException;
-
-  /**
-   * Get list of partitions by name.
-   * @param ci compaction info.
-   * @return list of partitions
-   * @throws MetaException if an error occurs.
-   */
-  abstract List<Partition> getPartitionsByNames(CompactionInfo ci) throws 
MetaException;
-
-  /**
-   * Get the partition being compacted.
-   * @param ci compaction info returned from the compaction queue
-   * @return metastore partition, or null if there is not partition in this 
compaction info
-   * @throws MetaException if underlying calls throw, or if the partition name 
resolves to more than
-   * one partition.
-   */
-  abstract protected Partition resolvePartition(CompactionInfo ci) throws 
MetaException;
-
   protected String tableName(Table t) {
     return Warehouse.getQualifiedName(t);
   }
@@ -123,15 +100,6 @@ public static void 
initializeAndStartThread(CompactorThread thread, Configuratio
     thread.start();
   }
 
-  protected boolean replIsCompactionDisabledForTable(Table tbl) {
-    // Compaction is disabled until after first successful incremental load. 
Check HIVE-21197 for more detail.
-    boolean isCompactDisabled = 
ReplUtils.isFirstIncPending(tbl.getParameters());
-    if (isCompactDisabled) {
-      LOG.info("Compaction is disabled for table " + tbl.getTableName());
-    }
-    return isCompactDisabled;
-  }
-
   @VisibleForTesting
   protected String getRuntimeVersion() {
     return this.getClass().getPackage().getImplementationVersion();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index f166d677e40..b72eea8ce43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -20,7 +20,6 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.hive.common.ServerUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -28,21 +27,20 @@
 import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
 import org.apache.hadoop.hive.metastore.txn.NoMutex;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NAME_FORMAT;
 
@@ -51,13 +49,14 @@
  * It's critical that there exactly 1 of these in a given warehouse.
  */
 public class Initiator extends MetaStoreCompactorThread {
-  static final private String CLASS_NAME = Initiator.class.getName();
-  static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  private static final String CLASS_NAME = Initiator.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   private ExecutorService compactionExecutor;
 
   private boolean metricsEnabled;
   private boolean shouldUseMutex = true;
+  private List<TableOptimizer> optimizers;
 
   @Override
   public void run() {
@@ -66,12 +65,6 @@ public void run() {
     // so wrap it in a big catch Throwable statement.
     try {
       recoverFailedCompactions(false);
-
-      int abortedThreshold = HiveConf.getIntVar(conf,
-          HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
-      long abortedTimeThreshold = HiveConf
-          .getTimeVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
-              TimeUnit.MILLISECONDS);
       TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : 
new NoMutex();
 
       // Make sure we run through the loop once before checking to stop as 
this makes testing
@@ -113,11 +106,11 @@ public void run() {
           Set<String> skipDBs = Sets.newConcurrentHashSet();
           Set<String> skipTables = Sets.newConcurrentHashSet();
 
-          Set<CompactionInfo> potentials = compactionExecutor.submit(() ->
-            txnHandler.findPotentialCompactions(abortedThreshold, 
abortedTimeThreshold, prevStart)
-              .parallelStream()
-              .filter(ci -> isEligibleForCompaction(ci, currentCompactions, 
skipDBs, skipTables))
-              .collect(Collectors.toSet())).get();
+          Set<CompactionInfo> potentials = Sets.newHashSet();
+          for (TableOptimizer optimizer : optimizers) {
+            potentials.addAll(compactionExecutor.submit(() ->
+                optimizer.findPotentialCompactions(prevStart, 
currentCompactions, skipDBs, skipTables)).get());
+          }
           LOG.debug("Found {} potential compactions, checking to see if we 
should compact any of them", potentials.size());
 
           CompactorUtil.checkInterrupt(CLASS_NAME);
@@ -125,12 +118,6 @@ public void run() {
           Map<String, String> tblNameOwners = new HashMap<>();
           List<CompletableFuture<Void>> compactionList = new ArrayList<>();
 
-          if (!potentials.isEmpty()) {
-            ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(
-                txnHandler.getOpenTxns(), 0);
-            conf.set(ValidTxnList.VALID_TXNS_KEY, 
validTxnList.writeToString());
-          }
-
           for (CompactionInfo ci : potentials) {
             try {
               //Check for interruption before scheduling each compactionInfo 
and return if necessary
@@ -142,8 +129,8 @@ public void run() {
               ci.poolName = CompactorUtil.getPoolName(conf, t, metadataCache);
               Partition p = resolvePartition(ci);
               if (p == null && ci.partName != null) {
-                LOG.info("Can't find partition " + ci.getFullPartitionName() +
-                    ", assuming it has been dropped and moving on.");
+                LOG.info("Can't find partition {}, assuming it has been 
dropped and moving on.",
+                    ci.getFullPartitionName());
                 continue;
               }
               String runAs = resolveUserToRunAs(tblNameOwners, t, p);
@@ -208,10 +195,6 @@ protected boolean isCacheEnabled() {
             MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
   }
 
-  private Database resolveDatabase(CompactionInfo ci) throws MetaException, 
NoSuchObjectException {
-    return CompactorUtil.resolveDatabase(conf, ci.dbname);
-  }
-
   @VisibleForTesting
   protected String resolveUserToRunAs(Map<String, String> cache, Table t, 
Partition p)
       throws IOException, InterruptedException {
@@ -236,158 +219,32 @@ public void init(AtomicBoolean stop) throws Exception {
             COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
     metricsEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METRICS_ENABLED) &&
         MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+    optimizers = Arrays.stream(MetastoreConf.getTrimmedStringsVar(conf,
+            MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS))
+        .map(this::instantiateTableOptimizer).toList();
   }
+  
+  private TableOptimizer instantiateTableOptimizer(String className) {
+    try {
+      Class<? extends TableOptimizer> icebergInitiatorClazz = (Class<? extends 
TableOptimizer>)
+          Class.forName(className, true,
+              Utilities.getSessionSpecifiedClassLoader());
 
-  private void recoverFailedCompactions(boolean remoteOnly) throws 
MetaException {
-    if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname());
-    txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
-        HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 
TimeUnit.MILLISECONDS));
-  }
-
-  private boolean foundCurrentOrFailedCompactions(ShowCompactResponse 
compactions, CompactionInfo ci) throws MetaException {
-    if (compactions.getCompacts() == null) {
-      return false;
-    }
-
-    //In case of an aborted Dynamic partition insert, the created entry in the 
compaction queue does not contain
-    //a partition name even for partitioned tables. As a result it can happen 
that the ShowCompactResponse contains
-    //an element without partition name for partitioned tables. Therefore, it 
is necessary to null check the partition
-    //name of the ShowCompactResponseElement even if the 
CompactionInfo.partName is not null. These special compaction
-    //requests are skipped by the worker, and only cleaner will pick them up, 
so we should allow to schedule a 'normal'
-    //compaction for partitions of those tables which has special (DP abort) 
entry with undefined partition name.
-    List<ShowCompactResponseElement> filteredElements = 
compactions.getCompacts().stream()
-      .filter(e -> e.getDbname().equals(ci.dbname)
-        && e.getTablename().equals(ci.tableName)
-        && (e.getPartitionname() == null && ci.partName == null ||
-              (Objects.equals(e.getPartitionname(),ci.partName))))
-      .collect(Collectors.toList());
-
-    // Figure out if there are any currently running compactions on the same 
table or partition.
-    if (filteredElements.stream().anyMatch(
-        e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) || 
TxnStore.INITIATED_RESPONSE.equals(e.getState()))) {
-
-      LOG.info("Found currently initiated or working compaction for " +
-        ci.getFullPartitionName() + " so we will not initiate another 
compaction");
-      return true;
-    }
-
-    // Check if there is already sufficient number of consecutive failures for 
this table/partition
-    // so that no new automatic compactions needs to be scheduled.
-    int failedThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-
-    LongSummaryStatistics failedStats = filteredElements.stream()
-      .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) || 
TxnStore.FAILED_RESPONSE.equals(e.getState()))
-      
.sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed())
-      .limit(failedThreshold)
-
-      .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState()))
-      
.collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime));
-
-    // If the last attempt was too long ago, ignore the failed threshold and 
try compaction again
-    long retryTime = MetastoreConf.getTimeVar(conf,
-      MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME, 
TimeUnit.MILLISECONDS);
-
-    boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime 
< System.currentTimeMillis());
-    if (failedStats.getCount() == failedThreshold && !needsRetry) {
-      LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() 
+ " since last " +
-        MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " 
attempts to compact it failed.");
-
-      ci.errorMessage = "Compaction is not initiated since last " +
-        MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " 
consecutive compaction attempts failed)";
+      Class<?>[] constructorParameterTypes = {HiveConf.class, TxnStore.class, 
MetadataCache.class};
+      Constructor<?> constructor = 
icebergInitiatorClazz.getConstructor(constructorParameterTypes);
 
-      txnHandler.markFailed(ci);
-      return true;
+      Object[] constructorArgs = new Object[] {conf, txnHandler, 
metadataCache};
+      return (TableOptimizer) constructor.newInstance(constructorArgs);
     }
-    return false;
-  }
-
-  // Check if it's a dynamic partitioning case. If so, do not initiate 
compaction for streaming ingest, only for aborts.
-  private static boolean isDynPartIngest(Table t, CompactionInfo ci){
-    if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
-            ci.partName  == null && !ci.hasOldAbort) {
-      LOG.info("Skipping entry for " + ci.getFullTableName() + " as it is from 
dynamic" +
-              " partitioning");
-      return  true;
+    catch (Exception e) {
+      throw new CompactionException(e, "Failed instantiating and calling table 
optimizer %s", className);
     }
-    return false;
   }
 
-  private boolean isEligibleForCompaction(CompactionInfo ci,
-      ShowCompactResponse currentCompactions, Set<String> skipDBs, Set<String> 
skipTables) {
-    try {
-      if (skipDBs.contains(ci.dbname)) {
-        LOG.info("Skipping {}::{}, skipDBs::size:{}", ci.dbname, ci.tableName, 
skipDBs.size());
-        return false;
-      } else {
-        if (replIsCompactionDisabledForDatabase(ci.dbname)) {
-          skipDBs.add(ci.dbname);
-          LOG.info("Skipping {} as compaction is disabled due to repl; 
skipDBs::size:{}",
-              ci.dbname, skipDBs.size());
-          return false;
-        }
-      }
-
-      if (skipTables.contains(ci.getFullTableName())) {
-        return false;
-      }
-
-      LOG.info("Checking to see if we should compact " + 
ci.getFullPartitionName());
-
-      // Check if we have already initiated or are working on a compaction for 
this table/partition.
-      // Also make sure we haven't exceeded configured number of consecutive 
failures.
-      // If any of the above applies, skip it.
-      // Note: if we are just waiting on cleaning we can still check, as it 
may be time to compact again even though we haven't cleaned.
-      if (foundCurrentOrFailedCompactions(currentCompactions, ci)) {
-        return false;
-      }
-
-      Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci));
-      if (t == null) {
-        LOG.info("Can't find table " + ci.getFullTableName() + ", assuming 
it's a temp " +
-            "table or has been dropped and moving on.");
-        return false;
-      }
-
-      if (replIsCompactionDisabledForTable(t)) {
-        skipTables.add(ci.getFullTableName());
-        return false;
-      }
-
-      Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname, 
() -> resolveDatabase(ci)).getParameters();
-      if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) {
-        if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) {
-          skipDBs.add(ci.dbname);
-          LOG.info("DB " + ci.dbname + " marked " + 
hive_metastoreConstants.NO_AUTO_COMPACT +
-              "=true so we will not compact it.");
-        } else {
-          skipTables.add(ci.getFullTableName());
-          LOG.info("Table " + tableName(t) + " marked " + 
hive_metastoreConstants.NO_AUTO_COMPACT +
-              "=true so we will not compact it.");
-        }
-        return false;
-      }
-      if (AcidUtils.isInsertOnlyTable(t.getParameters()) && !HiveConf
-          .getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
-        skipTables.add(ci.getFullTableName());
-        LOG.info("Table " + tableName(t) + " is insert only and " + 
HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM.varname
-            + "=false so we will not compact it.");
-        return false;
-      }
-      if (isDynPartIngest(t, ci)) {
-        return false;
-      }
-
-    } catch (Throwable e) {
-      LOG.error("Caught exception while checking compaction eligibility.", e);
-      try {
-        ci.errorMessage = e.getMessage();
-        txnHandler.markFailed(ci);
-      } catch (MetaException ex) {
-        LOG.error("Caught exception while marking compaction as failed.", e);
-      }
-      return false;
-    }
-    return true;
+  private void recoverFailedCompactions(boolean remoteOnly) throws 
MetaException {
+    if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname());
+    txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 
TimeUnit.MILLISECONDS));
   }
 
   private static class InitiatorCycleUpdater implements Runnable {
@@ -428,4 +285,25 @@ public void run() {
   public void enforceMutex(boolean enableMutex) {
     this.shouldUseMutex = enableMutex;
   }
+
+  @VisibleForTesting
+  protected Partition resolvePartition(CompactionInfo ci) throws Exception {
+    Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () ->
+        CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName));
+
+    if (!MetaStoreUtils.isIcebergTable(table.getParameters())) {
+      return CompactorUtil.resolvePartition(conf, null, ci.dbname, 
ci.tableName, ci.partName,
+          CompactorUtil.METADATA_FETCH_MODE.LOCAL);
+    } else {
+      if (ci.partName == null) {
+        return null;
+      }
+
+      org.apache.hadoop.hive.metastore.api.Partition partition = new 
org.apache.hadoop.hive.metastore.api.Partition();
+      partition.setSd(table.getSd().deepCopy());
+      partition.setParameters(com.google.common.collect.Maps.newHashMap());
+
+      return partition;
+    }
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 0878330cc3e..12662e3c6e9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -20,19 +20,13 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.thrift.TException;
 
-import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -40,7 +34,6 @@
 
 import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
 import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_USE_CUSTOM_POOL;
-import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 
 /**
  * Compactor threads that runs in the metastore. It uses a {@link TxnStore}
@@ -69,30 +62,6 @@ public void init(AtomicBoolean stop) throws Exception {
     return CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName);
   }
 
-  @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws 
TException {
-    try {
-      Database database = 
getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName);
-      // Compaction is disabled until after first successful incremental load. 
Check HIVE-21197 for more detail.
-      boolean isReplCompactDisabled = 
ReplUtils.isFirstIncPending(database.getParameters());
-      if (isReplCompactDisabled) {
-        LOG.info("Compaction is disabled for database " + dbName);
-      }
-      return isReplCompactDisabled;
-    } catch (NoSuchObjectException e) {
-      LOG.info("Unable to find database " + dbName);
-      return true;
-    }
-  }
-
-  @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws 
MetaException {
-    return CompactorUtil.getPartitionsByNames(conf, ci.dbname, ci.tableName, 
ci.partName);
-  }
-
-  protected Partition resolvePartition(CompactionInfo ci) throws MetaException 
{
-    return CompactorUtil.resolvePartition(conf, null, ci.dbname, ci.tableName, 
ci.partName, 
-        CompactorUtil.METADATA_FETCH_MODE.LOCAL);
-  }
-
   protected abstract boolean isCacheEnabled();
 
   protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
index f95834ac23d..38493939b03 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
@@ -19,20 +19,12 @@
 
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.thrift.TException;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
-
 /**
  * Compactor thread that can run outside the metastore. It will
  * use the metastore thrift API which will default to a remote connection
@@ -60,24 +52,4 @@ public void init(AtomicBoolean stop) throws Exception {
   @Override Table resolveTable(CompactionInfo ci) throws MetaException {
     return RemoteCompactorUtil.resolveTable(conf, msc, ci);
   }
-
-  @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws 
TException {
-    try {
-      Database database = msc.getDatabase(getDefaultCatalog(conf), dbName);
-      // Compaction is disabled until after first successful incremental load. 
Check HIVE-21197 for more detail.
-      return ReplUtils.isFirstIncPending(database.getParameters());
-    } catch (NoSuchObjectException e) {
-      LOG.info("Unable to find database " + dbName);
-      return true;
-    }
-  }
-
-  @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws 
MetaException {
-    return RemoteCompactorUtil.getPartitionsByNames(msc, ci.dbname, 
ci.tableName, ci.tableName);
-  }
-
-  protected Partition resolvePartition(CompactionInfo ci) throws MetaException 
{
-    return CompactorUtil.resolvePartition(conf, msc, ci.dbname, ci.tableName, 
ci.partName, 
-        CompactorUtil.METADATA_FETCH_MODE.REMOTE);
-  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java
new file mode 100644
index 00000000000..3211939ea4d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
+
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+public abstract class TableOptimizer {
+  private static final String CLASS_NAME = TableOptimizer.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+  public abstract Set<CompactionInfo> findPotentialCompactions(long 
lastChecked, ShowCompactResponse currentCompactions,
+    Set<String> skipDBs, Set<String> skipTables) throws MetaException;
+
+  protected final HiveConf conf;
+  protected final TxnStore txnHandler;
+  protected final MetadataCache metadataCache;
+
+  protected TableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache 
metadataCache) {
+    this.conf = conf;
+    this.txnHandler = txnHandler;
+    this.metadataCache = metadataCache;
+  }
+  
+  protected boolean isEligibleForCompaction(CompactionInfo ci, 
ShowCompactResponse currentCompactions,
+      Set<String> skipDBs, Set<String> skipTables) {
+    try {
+      if (skipDBs.contains(ci.dbname)) {
+        LOG.info("Skipping {}::{}, skipDBs::size:{}", ci.dbname, ci.tableName, 
skipDBs.size());
+        return false;
+      } else {
+        if (replIsCompactionDisabledForDatabase(ci.dbname)) {
+          skipDBs.add(ci.dbname);
+          LOG.info("Skipping {} as compaction is disabled due to repl; 
skipDBs::size:{}",
+              ci.dbname, skipDBs.size());
+          return false;
+        }
+      }
+
+      String qualifiedTableName = ci.getFullTableName();
+      if (skipTables.contains(qualifiedTableName)) {
+        return false;
+      }
+
+      LOG.info("Checking to see if we should compact {}", 
ci.getFullPartitionName());
+
+      // Check if we have already initiated or are working on a compaction for 
this table/partition.
+      // Also make sure we haven't exceeded configured number of consecutive 
failures.
+      // If any of the above applies, skip it.
+      // Note: if we are just waiting on cleaning we can still check, as it 
may be time to compact again even though we haven't cleaned.
+      if (foundCurrentOrFailedCompactions(currentCompactions, ci)) {
+        return false;
+      }
+
+      Table t = metadataCache.computeIfAbsent(qualifiedTableName, () -> 
+          CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName));
+      if (t == null) {
+        LOG.info("Can't find table {}, assuming it's a temp table or has been 
dropped and moving on.",
+            qualifiedTableName);
+        return false;
+      }
+
+      if (replIsCompactionDisabledForTable(t)) {
+        skipTables.add(qualifiedTableName);
+        return false;
+      }
+
+      Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname, 
() -> resolveDatabase(ci)).getParameters();
+      if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) {
+        if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) {
+          skipDBs.add(ci.dbname);
+          LOG.info("DB {} marked {}=true so we will not compact it.", 
hive_metastoreConstants.NO_AUTO_COMPACT, ci.dbname);
+        } else {
+          skipTables.add(qualifiedTableName);
+          LOG.info("Table {} marked {}=true so we will not compact it.", 
hive_metastoreConstants.NO_AUTO_COMPACT,
+              qualifiedTableName);
+        }
+        return false;
+      }
+    } catch (Exception e) {
+      LOG.error("Caught exception while checking compaction eligibility.", e);
+      try {
+        ci.errorMessage = e.getMessage();
+        txnHandler.markFailed(ci);
+      } catch (MetaException ex) {
+        LOG.error("Caught exception while marking compaction as failed.", e);
+      }
+      return false;
+    }
+    return true;
+  }
+
+  private boolean replIsCompactionDisabledForTable(Table tbl) {
+    // Compaction is disabled until after first successful incremental load. 
Check HIVE-21197 for more detail.
+    boolean isCompactDisabled = 
ReplUtils.isFirstIncPending(tbl.getParameters());
+    if (isCompactDisabled) {
+      LOG.info("Compaction is disabled for table {}", tbl.getTableName());
+    }
+    return isCompactDisabled;
+  }
+
+  private boolean replIsCompactionDisabledForDatabase(String dbName) throws 
TException {
+    try {
+      Database database = 
getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName);
+      // Compaction is disabled until after first successful incremental load. 
Check HIVE-21197 for more detail.
+      boolean isReplCompactDisabled = 
ReplUtils.isFirstIncPending(database.getParameters());
+      if (isReplCompactDisabled) {
+        LOG.info("Compaction is disabled for database {}", dbName);
+      }
+      return isReplCompactDisabled;
+    } catch (NoSuchObjectException e) {
+      LOG.info("Unable to find database {}", dbName);
+      return true;
+    }
+  }
+
+  protected boolean foundCurrentOrFailedCompactions(ShowCompactResponse 
compactions, CompactionInfo ci) throws MetaException {
+    if (compactions.getCompacts() == null) {
+      return false;
+    }
+
+    //In case of an aborted Dynamic partition insert, the created entry in the 
compaction queue does not contain
+    //a partition name even for partitioned tables. As a result it can happen 
that the ShowCompactResponse contains
+    //an element without partition name for partitioned tables. Therefore, it 
is necessary to null check the partition
+    //name of the ShowCompactResponseElement even if the 
CompactionInfo.partName is not null. These special compaction
+    //requests are skipped by the worker, and only cleaner will pick them up, 
so we should allow to schedule a 'normal'
+    //compaction for partitions of those tables which has special (DP abort) 
entry with undefined partition name.
+    List<ShowCompactResponseElement> filteredElements = 
compactions.getCompacts().stream()
+        .filter(e -> e.getDbname().equals(ci.dbname)
+            && e.getTablename().equals(ci.tableName)
+            && (e.getPartitionname() == null && ci.partName == null ||
+            (Objects.equals(e.getPartitionname(),ci.partName))))
+        .toList();
+
+    // Figure out if there are any currently running compactions on the same 
table or partition.
+    if (filteredElements.stream().anyMatch(
+        e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) || 
TxnStore.INITIATED_RESPONSE.equals(e.getState()))) {
+
+      LOG.info("Found currently initiated or working compaction for {} so we 
will not initiate another compaction",
+          ci.getFullPartitionName());
+      return true;
+    }
+
+    // Check if there is already sufficient number of consecutive failures for 
this table/partition
+    // so that no new automatic compactions needs to be scheduled.
+    int failedThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+
+    LongSummaryStatistics failedStats = filteredElements.stream()
+        .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) || 
TxnStore.FAILED_RESPONSE.equals(e.getState()))
+        
.sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed())
+        .limit(failedThreshold)
+
+        .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState()))
+        
.collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime));
+
+    // If the last attempt was too long ago, ignore the failed threshold and 
try compaction again
+    long retryTime = MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME, 
TimeUnit.MILLISECONDS);
+
+    boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime 
< System.currentTimeMillis());
+    if (failedStats.getCount() == failedThreshold && !needsRetry) {
+      LOG.warn("Will not initiate compaction for {} since last {} attempts to 
compact it failed.",
+          ci.getFullPartitionName(), 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+
+      ci.errorMessage = "Compaction is not initiated since last " +
+          MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " 
consecutive compaction attempts failed)";
+
+      txnHandler.markFailed(ci);
+      return true;
+    }
+    return false;
+  }
+
+  protected Database resolveDatabase(CompactionInfo ci) throws MetaException, 
NoSuchObjectException {
+    return CompactorUtil.resolveDatabase(conf, ci.dbname);
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 21f7ddfadf3..fe10a762bf9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -123,6 +123,8 @@ void initHiveConf() {
     hiveConf.set("tez.grouping.max-size", "10");
     hiveConf.set("tez.grouping.min-size", "1");
     databaseProduct = determineDatabaseProduct(DatabaseProduct.DERBY_NAME, 
hiveConf);
+    MetastoreConf.setVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS,
+        "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer");
   }
 
   void setUpInternal() throws Exception {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 38484534b77..ec2f5dacd75 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -142,6 +142,8 @@ protected final void setup(HiveConf conf) throws Exception {
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, useMinHistoryWriteId());
+    MetastoreConf.setVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS,
+        "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer");
     // Set this config to true in the base class, there are extended test 
classes which set this config to false.
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
     TestTxnDbUtil.setConfValues(conf);
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index c98b310ca25..636550aeaca 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -1001,7 +1001,7 @@ public void resolveUserToRunAs() throws Exception {
     initiator.setConf(conf);
     initiator.init(new AtomicBoolean(true));
     doThrow(new RuntimeException("This was thrown on purpose by 
testInitiatorFailure"))
-        .when(initiator).resolveTable(any());
+        .when(initiator).resolvePartition(any());
     initiator.run();
 
     // verify status of table compaction
@@ -1134,7 +1134,7 @@ public void testMetaCache() throws Exception {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(2, compacts.size());
-    Mockito.verify(initiator, times(1)).resolveTable(Mockito.any());
+    Mockito.verify(initiator, times(2)).resolvePartition(Mockito.any());
   }
 
   private static FindNextCompactRequest aFindNextCompactRequest(String 
workerId, String workerVersion) {
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 9cf5361c4f8..3627517b441 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -653,6 +653,11 @@ public enum ConfVars {
     
COMPACTOR_INITIATOR_TABLECACHE_ON("metastore.compactor.initiator.tablecache.on",
       "hive.compactor.initiator.tablecache.on", true,
       "Enable table caching in the initiator. Currently the cache is cleaned 
after each cycle."),
+    COMPACTOR_INITIATOR_TABLE_OPTIMIZERS("compactor.table.optimizers",
+        "hive.compactor.table.optimizers",
+        "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer," +
+            "org.apache.iceberg.mr.hive.compaction.IcebergTableOptimizer",
+        "Comma separated list of table optimizers executed by compaction 
Initiator."),
     COMPACTOR_WORKER_THREADS("metastore.compactor.worker.threads",
         "hive.compactor.worker.threads", 0,
         "How many compactor worker threads to run on this metastore instance. 
Set this to a\n" +

Reply via email to